Compare commits

...

48 Commits

Author SHA1 Message Date
xingyue b783027406 fix: remove stray lockfiles + refactor gateway auth with Hono group
- Remove root and workflow-gateway pnpm-lock.yaml (workspace mode)
- Replace startsWith auth skip with Hono route group
- Gateway management routes use GATEWAY_SECRET (per-route)
- /api/gateway/endpoints + /api/agents/* use dashboard auth
- No more global /api/* middleware with path-based exceptions
2026-05-11 15:59:42 +08:00
xingyue 93145cf08c refactor: reorganize gateway routes under /api/ prefix (closes #178, closes #179)
- Gateway management: /api/gateway/register, /api/gateway/endpoints
- Agent proxy: /api/agents/:agent/*
- /healthz stays at root (CF/k8s convention)
- Skip dashboard auth for gateway register routes
- Update CLI serve tunnel registration paths
- Update dashboard API client paths

Ref: #177
2026-05-11 15:48:13 +08:00
xiaoju da6bcb10d6 feat(workflow): add declarative ModeratorTable type and migrate templates
Migrate workflow-template-develop and workflow-template-solve-issue
moderators to use the declarative ModeratorTable/tableToModerator
pattern. Update workflow-runtime re-exports and workflow-execute
engine to use renamed types.

Fixes #172
2026-05-11 06:25:39 +00:00
xiaoju 6fc97fc8c8 feat(workflow-protocol): add declarative moderator table types and tableToModerator
Add ModeratorCondition, FALLBACK, ModeratorTransition, ModeratorTable types
and tableToModerator converter function. Export from workflow-protocol and
re-export from workflow-runtime for backward compat.

Refs #172
2026-05-11 06:22:24 +00:00
xiaoju 93d9821f64 docs: update CLI skill with serve command, thread status, defaults, env vars
小橘 <xiaoju@shazhou.work>
2026-05-10 01:57:42 +00:00
xiaoju 29367cbe31 chore: remove stray bundle artifacts from repo
小橘 <xiaoju@shazhou.work>
2026-05-10 01:44:40 +00:00
xiaoju ec397aecd3 chore: remove stray bundle artifacts from repo
小橘 <xiaoju@shazhou.work>
2026-05-10 01:42:18 +00:00
xiaoju 2e9d939f8e fix: thread detail API returns correct status instead of source
小橘 <xiaoju@shazhou.work>
2026-05-10 01:39:09 +00:00
xiaoju 064a24f093 fix: no-ctl threads should be failed, not active
小橘 <xiaoju@shazhou.work>
2026-05-10 01:36:14 +00:00
xiaoju fede623a82 dashboard: remove 'All agents' dropdown option, auto-select first agent
小橘 <xiaoju@shazhou.work>
2026-05-09 13:26:11 +00:00
xiaoju 2a52b930b9 chore: raise default maxRounds from 5 to 10 (CLI, matches API default)
小橘 <xiaoju@shazhou.work>
2026-05-09 13:17:57 +00:00
xiaoju bf2f790e6e fix: detect crashed threads even when .running marker is already gone
Check worker PID liveness as final fallback — if worker is dead
and thread has no __end__ node, it crashed.

小橘 <xiaoju@shazhou.work>
2026-05-09 12:52:39 +00:00
xiaoju 08a79b77db fix: SSE sends 'done' event for non-running threads, frontend stops reconnecting
- routes-live: emit 'done' event before closing SSE for non-running threads
- use-sse: handle 'done' event — set completed, disconnect, stop reconnect
- Prevents 'Live' badge flash on failed/completed threads

小橘 <xiaoju@shazhou.work>
2026-05-09 12:49:20 +00:00
xiaoju 22a6200b69 fix: close SSE stream for non-running threads, fix Live badge
- routes-live: check .running marker before keeping SSE open;
  if thread is not running, emit existing records and close
- thread-detail: only show Live badge when connected AND not completed

小橘 <xiaoju@shazhou.work>
2026-05-09 12:45:58 +00:00
xiaoju 7e7f6aa6d6 fix: detect crashed threads by checking worker PID liveness
When .running marker exists but no __end__ in CAS chain,
check if the worker process is actually alive. Dead PID
means the worker crashed without cleanup → status 'failed'.

Fixes #170

小橘 <xiaoju@shazhou.work>
2026-05-09 12:38:18 +00:00
xiaoju d6fe3f844c fix: detect crashed threads as failed instead of stuck running
- resolveThreadListStatus() checks CAS chain for __end__ node
- Stale .running markers no longer cause false 'running' status
- Distinguish 'failed' (returnCode != 0) from 'completed'
- Worker signal handlers (SIGINT/SIGTERM) clean up .running files
- listRunningThreads filters out terminated threads with stale markers

Fixes #170

小橘 <xiaoju@shazhou.work>
2026-05-09 12:28:33 +00:00
xiaoju d0803019b5 feat: ephemeral agent token for serve ↔ gateway auth
- serve generates random UUID on startup
- registration sends agentToken to gateway, stored in KV
- gateway injects X-Agent-Token header when proxying to agent
- serve rejects /api/* requests without valid token
- healthz remains unauthenticated
- tunnel URL is now protected — direct access returns 401

小橘 <xiaoju@shazhou.work>
2026-05-09 12:05:10 +00:00
xiaoju f16e7641fd chore: add .env.production for dashboard gateway URL
小橘 <xiaoju@shazhou.work>
2026-05-09 11:58:51 +00:00
xiaoju 3b41625001 feat: dashboard API key authentication
- Gateway: DASHBOARD_API_KEY middleware on /endpoints and /api/* routes
- Dashboard: login page with key validation, stored in localStorage
- SSE: key passed as ?key= query param (EventSource can't set headers)
- Sidebar: logout button to clear key

Refs #169
小橘 <xiaoju@shazhou.work>
2026-05-09 11:56:25 +00:00
xiaoju c602d2284b fix(dashboard): pass content as children to ReactMarkdown
Self-closing <ReactMarkdown /> renders nothing — need children.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:58:33 +00:00
xiaoju d96e10b0fc feat(dashboard): structured record rendering with markdown support (#169)
- API returns structured fields for thread-start (workflow, prompt, status)
  and workflow-result (returnCode, content, timestamp)
- New RecordCard component renders by type:
  - StartCard: workflow name badge + prompt blockquote
  - RoleMessage: role-colored badges (preparer/agent/extractor) + markdown
  - ResultCard: success/fail status badge + summary
- Added react-markdown + shiki for markdown rendering with syntax highlighting
- Replaces generic <pre> blocks with proper structured rendering

Refs #169
小橘 <xiaoju@shazhou.work>
2026-05-09 10:41:13 +00:00
xiaoju 8e36d3e1f5 fix: use getContentMerklePayload to extract prompt text
Was showing raw YAML of the CAS node instead of the payload string.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:34:43 +00:00
xiaoju bbe4fe0ed1 fix: include prompt text in thread-start record
Read prompt from StartNode refs[0] CAS blob and display it.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:32:59 +00:00
xiaoju e105c5cac1 fix: show workflow name instead of bundle hash in thread-start record
小橘 <xiaoju@shazhou.work>
2026-05-09 10:31:08 +00:00
xiaoju 578776fccf fix: add standard fields to thread-start record
小橘 <xiaoju@shazhou.work>
2026-05-09 10:27:03 +00:00
xiaoju cb756a999a fix: normalize workflow-result records to match ThreadRecord shape
Both REST and SSE endpoints now return workflow-result with standard
fields (role, content, timestamp) instead of non-standard (summary).
Fixes 'Invalid Date' and empty content in dashboard.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:24:48 +00:00
xiaoju e0577ceefe fix: add /api/healthz alias for gateway proxy health check
Gateway proxies /api/neko/healthz → /api/healthz on the agent,
but healthz was only on /healthz. Dashboard status bar showed
permanent Offline.

小橘 🍊(NEKO Team)
2026-05-09 10:05:46 +00:00
xiaoju 024dd8c1e8 Merge pull request 'feat: auto-tunnel + CF Worker gateway + dashboard multi-agent' (#168) from feat/164-cf-worker-gateway into main 2026-05-09 10:02:36 +00:00
xiaoju 9e98119145 feat: dashboard multi-agent support + CF Pages deploy
Phase C of #164:
- Dashboard fetches agents from gateway /endpoints
- Sidebar shows agent selector with online/offline status
- All API calls routed through gateway /api/:agent/*
- Hash routing: #agent/threads/id format
- SSE live streaming via gateway proxy
- VITE_GATEWAY_URL env var for gateway configuration
- Deployed to CF Pages: workflow-dashboard-54r.pages.dev
- Custom domain: workflow.shazhou.work (pending SSL)

Ref: #164, closes #167

小橘 🍊(NEKO Team)
2026-05-09 10:01:27 +00:00
xiaoju fd8943f131 feat: serve auto-tunnel + gateway registration
Phase B of #164:
- serve --name <agent> starts cloudflared quick tunnel automatically
- Registers with CF Worker gateway, heartbeat every 60s
- Graceful unregister on SIGINT/SIGTERM
- --no-tunnel flag for local dev
- Default name from hostname

Ref: #164, closes #166

小橘 🍊(NEKO Team)
2026-05-09 09:53:08 +00:00
xiaoju f7253d5948 feat: CF Worker API gateway with KV endpoint registry
Phase A of #164:
- Hono-based CF Worker at workflow-gateway.shazhou.workers.dev
- POST /register — agent registration with shared secret
- DELETE /register/:name — unregister
- GET /endpoints — list online agents
- GET /api/:agent/* — proxy to agent tunnel URL
- KV-backed with TTL auto-expiry

Ref: #164, closes #165

小橘 🍊(NEKO Team)
2026-05-09 09:48:49 +00:00
xiaoju 1c5636c270 Merge pull request 'fix: content node refs field + backward compat' (#163) from fix/161-162-cas-content-refs into main 2026-05-09 09:10:09 +00:00
xiaoju ca0403c8ab fix: content node refs field + thread head update
Fixes #161

Fixes #162

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 08:53:02 +00:00
xiaoju aa25f55f63 fix: add workflow-protocol and workflow-util to bundle validator allowlist
小橘 <xiaoju@shazhou.work>
2026-05-09 08:36:39 +00:00
xiaoju e29d1bf345 feat: Phase 5 — CLI + Dashboard CAS adaptation, cleanup .data.jsonl
- Align REST API contracts for Dashboard (threads list, detail, SSE)
- Add content resolution from CAS in thread show + API responses
- Rename dataWatcher → threadsJsonWatcher in SSE routes
- Update docs (CLAUDE.md, architecture.md, skill.ts) to reflect CAS storage
- Zero .data.jsonl code paths in production code
- All 166 tests pass, bun run check clean

Refs #155, closes #160

小橘 <xiaoju@shazhou.work>
2026-05-09 08:16:04 +00:00
xiaoju f3aedf8d6c feat: Phase 4 — CAS-based fork + mark-and-sweep GC
- Rewrite fork to create StateNode pointing to fork point (zero duplication)
- Rewrite GC as mark-and-sweep: roots from threads.json + history, findReachableHashes via refs[]
- Remove .data.jsonl code paths
- Fix all 7 previously failing CLI tests
- New: gc-mark-sweep.test.ts verifying shared nodes survive GC
- All 166 tests pass

Refs #155, closes #159

小橘 <xiaoju@shazhou.work>
2026-05-09 08:12:49 +00:00
xiaoju 26cf51366f feat: Phase 3 — engine read path + runtime context builder
- Add buildThreadContext(headHash, cas) to workflow-runtime
- Expand extract phase to return { meta, contentPayload, refs[] }
- Add parseCasThreadNode() to workflow-cas for node type parsing
- Update createWorkflow to write ContentMerkleNode with artifact refs
- Tests: 4 pass (build-context + extract-refs)
- Biome format pass on all files

Refs #155, closes #158

小橘 <xiaoju@shazhou.work>
2026-05-09 08:00:24 +00:00
xiaoju 81c582ae0e feat: Phase 2 — engine write path (CAS nodes + threads.json)
- Engine writes StartNode, StateNode, ContentMerkleNode as CAS blobs
- threads.json tracks active threads, completed → history/{date}.jsonl
- No more .data.jsonl writes
- ancestors skip-list: [parent, ...parentAncestors] capped at 11
- Tests: 4 pass (engine write path)

Refs #155, closes #157

小橘 <xiaoju@shazhou.work>
2026-05-09 07:53:44 +00:00
xiaoju 6f000512d2 feat: Phase 1 — CAS thread storage types + helpers
- Add StartNode, StateNode, ContentMerkleNode types to workflow-protocol
- Add collectRefs() to workflow-cas — extracts CAS hashes from StateNode payload
- Add findReachableHashes() to workflow-cas — recursive mark traversal via refs[]
- Tests: 7 pass (collect-refs + reachable)

Refs #155, closes #156

小橘 <xiaoju@shazhou.work>
2026-05-09 07:30:47 +00:00
xiaoju 8f78a00063 docs: RFC v3 — named payload fields, refs as GC index, merge parent+ancestors
- payload is source of truth with named fields (start, content, ancestors, compact)
- refs[] auto-derived by collectRefs(), pure GC index
- parent merged into ancestors[0]

小橘 <xiaoju@shazhou.work>
2026-05-09 07:12:29 +00:00
xiaoju 6c2a137aef docs: update CAS thread storage RFC
- StartNode prompt via refs[0] instead of inline
- threads.json active-only, completed → history/{date}.jsonl
- Content Merkle node carries role artifact refs
- Extract phase expanded to produce refs[]

小橘 <xiaoju@shazhou.work>
2026-05-09 07:08:10 +00:00
xiaoju 6cd856ca99 docs: add RFC for CAS-based thread storage
小橘 <xiaoju@shazhou.work>
2026-05-09 06:34:24 +00:00
xiaoju 064696c558 docs: update architecture docs and package READMEs for post-split structure
- Rewrite docs/architecture.md with 15-package map, dependency graph, updated engine paths
- Update CLAUDE.md monorepo structure section
- Add READMEs for: workflow-protocol, workflow-runtime, workflow-util, workflow-cas, workflow-register, workflow-execute, workflow-reactor
- Fix agent READMEs: update deps from @uncaged/workflow to actual packages
- Mark workflow-as-agent plan as outdated

Fixes #153

小橘 <xiaoju@shazhou.work>
2026-05-09 04:39:57 +00:00
xiaoju 0f28e9b61a refactor: remove non-index re-exports
Remove re-exports from non-index files to enforce folder module discipline:
- workflow-agent-hermes: remove buildAgentPrompt re-export
- workflow-agent-cursor: remove buildAgentPrompt re-export
- cli-workflow/cli-dispatch: remove CommandEntry/CommandGroup/DispatchFn/getCommandRegistry re-exports

小橘 <xiaoju@shazhou.work>
2026-05-09 04:19:49 +00:00
xiaoju 1ea56009a2 Merge pull request 'chore: rename dashboard folder' (#152) from chore/rename-dashboard-folder into main 2026-05-09 03:59:32 +00:00
xiaoju 6cc2481a16 chore: remove accidental pnpm-lock.yaml 2026-05-09 03:58:30 +00:00
xiaoju 44018bd17d chore: rename packages/dashboard → packages/workflow-dashboard
Align folder name with package name @uncaged/workflow-dashboard,
consistent with all other packages in the monorepo.
2026-05-09 03:57:49 +00:00
xingyue 28c35bb3e0 Merge pull request 'refactor: 七包拆分 — protocol / runtime / util / cas / reactor / register / execute' (#151) from refactor/143-split-packages into main 2026-05-09 03:53:54 +00:00
135 changed files with 6128 additions and 2016 deletions
+1
View File
@@ -4,3 +4,4 @@ bun.lock
*.tgz
tsconfig.tsbuildinfo
.npmrc
+20 -7
View File
@@ -2,7 +2,7 @@
## Project Overview
**@uncaged/workflow** is a workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file with an XXH64 hash as its version identifier.
This monorepo implements a workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file with an XXH64 hash as its version identifier. Shared types live in `@uncaged/workflow-protocol`; bundle authors typically depend on `@uncaged/workflow-runtime`.
### Key Terms
@@ -10,7 +10,7 @@
|---------|-----------|
| **Workflow** | A single-file ESM module that exports `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash (Crockford Base32). |
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
| **Thread** | A single execution of a workflow, identified by a ULID. State lives in CAS (linked nodes); active threads indexed in `threads.json`; completed rows in `history/*.jsonl`. Debug logs use `.info.jsonl`. |
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. |
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
@@ -19,14 +19,27 @@
```
workflow/
packages/
workflow/ # @uncaged/workflow — core lib (types, hash, ULID, JSONL, registry)
cli-workflow/ # @uncaged/cli-workflow — CLI (uncaged-workflow command)
workflow-protocol/ # @uncaged/workflow-protocol — shared types + Result
workflow-runtime/ # @uncaged/workflow-runtime — createWorkflow, type re-exports
workflow-util/ # @uncaged/workflow-util — Base32, ULID, logger, storage paths, refs helpers
workflow-reactor/ # @uncaged/workflow-reactor — LLM fn + thread reactor (tool calls)
workflow-cas/ # @uncaged/workflow-cas — CAS store, hash, Merkle
workflow-register/ # @uncaged/workflow-register — bundle validation, registry YAML, model resolution
workflow-execute/ # @uncaged/workflow-execute — engine, extract, fork, GC, workflowAsAgent
cli-workflow/ # @uncaged/cli-workflow — uncaged-workflow CLI
workflow-agent-cursor/ # @uncaged/workflow-agent-cursor
workflow-agent-hermes/ # @uncaged/workflow-agent-hermes
workflow-agent-llm/ # @uncaged/workflow-agent-llm
workflow-util-agent/ # @uncaged/workflow-util-agent — buildAgentPrompt, spawnCli
workflow-template-develop/ # @uncaged/workflow-template-develop
workflow-template-solve-issue/ # @uncaged/workflow-template-solve-issue
workflow-dashboard/ # @uncaged/workflow-dashboard — React dashboard (private app)
docs/ # RFCs, conventions
biome.json # root Biome config
tsconfig.json # root TypeScript config
```
- `workflow` is the core; `cli-workflow` depends on it
- Execution stack layers: `workflow-protocol` → (`workflow-runtime`, `workflow-util`, `workflow-reactor`) → (`workflow-cas`, `workflow-register`) → `workflow-execute``cli-workflow`
- Packages use `workspace:*` protocol
## Language & Paradigm
@@ -167,10 +180,10 @@ type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
Never use `console.log/warn/error` directly — Biome's `noConsole` rule enforces this.
All logging goes through the structured logger from `@uncaged/workflow`:
All logging goes through the structured logger from `@uncaged/workflow-util`:
```typescript
import { createLogger } from "@uncaged/workflow";
import { createLogger } from "@uncaged/workflow-util";
const log = createLogger();
+1 -1
View File
@@ -8,7 +8,7 @@ A workflow engine that executes single-file ESM bundles. Each workflow is a self
|---------|-------------|
| **Workflow** | A single-file ESM module exporting `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash. |
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
| **Thread** | A single execution of a workflow, identified by a ULID. CAS-backed chain plus `threads.json` / `history/*.jsonl`; `.info.jsonl` for debug logs. |
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. Roles live inside template packages (`src/roles/`). |
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
| **CAS** | Content-Addressed Storage — bundles are immutable and addressed by hash. |
+151 -139
View File
@@ -1,6 +1,6 @@
# @uncaged/workflow — Architecture
# Uncaged workflow — Architecture
**Last updated:** 2026-05-06 by 小橘 🍊(NEKO Team)
**Last updated:** 2026-05-09
---
@@ -8,72 +8,106 @@
A workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file identified by its XXH64 hash (Crockford Base32). No daemon — processes start on demand and exit when done.
## Package Structure
The implementation lives in **15** Bun workspace packages under `packages/`, using the `workspace:*` protocol.
| Package | npm Name | Purpose |
|---------|----------|---------|
| `workflow` | `@uncaged/workflow` | Core: types, engine, ExtractFn, hash/ULID/registry |
| `cli-workflow` | `@uncaged/cli-workflow` | CLI: `uncaged-workflow` command |
| `workflow-agent-cursor` | `@uncaged/workflow-agent-cursor` | Cursor CLI agent (extracts workspace from ctx) |
| `workflow-agent-hermes` | `@uncaged/workflow-agent-hermes` | Hermes CLI agent |
| `workflow-agent-llm` | `@uncaged/workflow-agent-llm` | OpenAI-compatible LLM agent |
| `workflow-template-develop` | `@uncaged/workflow-template-develop` | Develop workflow template (roles in `src/roles/`) |
| `workflow-template-solve-issue` | `@uncaged/workflow-template-solve-issue` | Solve-issue workflow template (roles in `src/roles/`) |
| `workflow-util-agent` | `@uncaged/workflow-util-agent` | `buildAgentPrompt` + `spawnCli` utilities |
## Package map
Monorepo with **bun workspace**, `workspace:*` protocol.
Grouped by responsibility (npm name → folder).
## Core Types
| Layer | Package | One-line role |
|-------|---------|----------------|
| Contract | `@uncaged/workflow-protocol``workflow-protocol` | Shared TypeScript types and `Result` helpers; peer `zod` only — no other workspace deps. |
| Author API | `@uncaged/workflow-runtime``workflow-runtime` | `createWorkflow` and re-exports of protocol workflow types for bundle authors. |
| Shared infra | `@uncaged/workflow-util``workflow-util` | Base32/ULID, logger, storage root paths, global CAS dir, ref-field helpers. |
| LLM plumbing | `@uncaged/workflow-reactor``workflow-reactor` | `createLlmFn`, `createThreadReactor`, and related tool-call types for threaded LLM invocation. |
| CAS | `@uncaged/workflow-cas``workflow-cas` | `CasStore` implementation, XXH64 hashing, Merkle helpers over CAS payloads. |
| Registry / bundles | `@uncaged/workflow-register``workflow-register` | Bundle validation & dynamic export extraction, `workflow.yaml` registry I/O, provider/model resolution. |
| Engine | `@uncaged/workflow-execute``workflow-execute` | Thread execution, worker entry path, fork/GC, extract pipeline, `workflowAsAgent`. |
| CLI | `@uncaged/cli-workflow``cli-workflow` | `uncaged-workflow` binary (depends on engine, registry, CAS, protocol, util, runtime). |
| Agent adapters | `@uncaged/workflow-agent-cursor``workflow-agent-cursor` | `AgentFn` via `cursor-agent` CLI + workspace extraction. |
| | `@uncaged/workflow-agent-hermes``workflow-agent-hermes` | `AgentFn` via `hermes chat` CLI. |
| | `@uncaged/workflow-agent-llm``workflow-agent-llm` | `AgentFn` via OpenAI-compatible HTTP (`LlmProvider` from runtime). |
| Agent shared | `@uncaged/workflow-util-agent``workflow-util-agent` | `buildAgentPrompt`, `spawnCli` for CLI-backed agents. |
| Templates | `@uncaged/workflow-template-develop``workflow-template-develop` | Develop workflow definition, roles, descriptor builder. |
| | `@uncaged/workflow-template-solve-issue``workflow-template-solve-issue` | Solve-issue workflow definition, roles, descriptor builder. |
| Dashboard | `@uncaged/workflow-dashboard``workflow-dashboard` | Private Vite + React app (`src/main.tsx`); only `react` / `react-dom` dependencies — no workspace packages. |
```typescript
// --- Sentinel values ---
const START = "__start__";
const END = "__end__";
## Dependency graph (workspace packages)
// --- RoleMeta: maps role names → their meta types ---
type RoleMeta = Record<string, Record<string, unknown>>;
Bottom-up layering for the execution stack:
// --- Role Definition: pure data, no execution logic ---
type RoleDefinition<Meta> = {
description: string; // human-readable
systemPrompt: string; // given to agent
extractPrompt: string; // given to extractor
schema: z.ZodType<Meta>; // meta shape (Zod v4)
};
// --- Workflow Definition: pure data, no agent binding ---
type WorkflowDefinition<M extends RoleMeta> = {
description: string;
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
// --- Agent: raw string output, reads role info from context ---
type AgentFn = (ctx: AgentContext) => Promise<string>;
// --- Agent Binding: runtime assignment ---
type AgentBinding = {
agent: AgentFn;
overrides?: Partial<Record<string, AgentFn>>;
};
// --- Extract: structured data from context ---
type ExtractFn = <T>(schema: z.ZodType<T>, prompt: string, ctx: ExtractContext) => Promise<T>;
// --- Moderator: pure routing function ---
type Moderator<M extends RoleMeta> = (ctx: ModeratorContext<M>) => (keyof M & string) | typeof END;
// --- Composition ---
// createWorkflow(def, binding, extract) => WorkflowFn
```mermaid
flowchart BT
subgraph L0["Layer 0 — contract"]
protocol["@uncaged/workflow-protocol"]
end
subgraph L1["Layer 1 — on protocol"]
runtime["@uncaged/workflow-runtime"]
util["@uncaged/workflow-util"]
reactor["@uncaged/workflow-reactor"]
end
subgraph L2["Layer 2 — protocol + util"]
cas["@uncaged/workflow-cas"]
register["@uncaged/workflow-register"]
end
subgraph L3["Layer 3 — engine"]
execute["@uncaged/workflow-execute"]
end
subgraph L4["Layer 4 — CLI"]
cli["@uncaged/cli-workflow"]
end
runtime --> protocol
util --> protocol
reactor --> protocol
cas --> protocol
cas --> util
register --> protocol
register --> util
execute --> protocol
execute --> runtime
execute --> util
execute --> cas
execute --> reactor
execute --> register
cli --> protocol
cli --> util
cli --> cas
cli --> execute
cli --> register
cli --> runtime
```
## Three-Phase Engine Loop
**Adjacent consumers** (not in the main CLI stack):
Each role execution has three distinct phases with progressive context:
- `@uncaged/workflow-util-agent``@uncaged/workflow-runtime`
- `@uncaged/workflow-agent-llm``@uncaged/workflow-runtime`
- `@uncaged/workflow-agent-cursor``@uncaged/workflow-runtime`, `@uncaged/workflow-util-agent`, `zod`
- `@uncaged/workflow-agent-hermes``@uncaged/workflow-runtime`, `@uncaged/workflow-util-agent`
- `@uncaged/workflow-template-develop``@uncaged/workflow-register`, `@uncaged/workflow-runtime`, `zod`
- `@uncaged/workflow-template-solve-issue``@uncaged/workflow-register`, `@uncaged/workflow-runtime`, `zod` (dev-only workspace deps: `@uncaged/workflow-cas`, `@uncaged/workflow-execute` for tests/tooling per `package.json`)
## Package roles (detail)
- **`workflow-protocol`** — Pure types (`WorkflowFn`, contexts, `CasStore` interface, descriptor shapes), `START` / `END`, `ok` / `err`. Depends only on peer `zod` for schema-related types in signatures.
- **`workflow-runtime`** — Workflow author surface: `createWorkflow` from `src/create-workflow.js`, re-exports protocol types/constants used when authoring bundles.
- **`workflow-util`** — Cross-cutting utilities: Crockford Base32, ULID, `createLogger`, `getDefaultWorkflowStorageRoot`, `getGlobalCasDir`, ref normalization; re-exports `ok`/`err` from protocol.
- **`workflow-cas`** — Filesystem CAS (`createCasStore`), `hashString` / `hashWorkflowBundleBytes`, Merkle node serialization and helpers (`merkle.js`).
- **`workflow-register`** — Bundle pipeline (`validateWorkflowBundle`, `extractBundleExports`, descriptor builders), registry YAML read/write, `resolveModel` / `splitProviderModelRef`.
- **`workflow-execute`** — `executeThread`, supervisor/worker wiring (`engine/`), fork/GC/pause gate, `createExtract` + LLM extract helpers (`extract/`), `workflowAsAgent`. Imports `@uncaged/workflow-reactor` for LLM-backed extract/supervisor paths (`extract-fn.ts`, `supervisor.ts`).
- **`workflow-reactor`** — `createLlmFn`, `createThreadReactor`, and thread tool-invocation types — consumed by `workflow-execute`.
- **`cli-workflow`** — CLI commands and HTTP/dashboard-related wiring (`hono`, `yaml`); composes register + execute + CAS + util.
- **`workflow-agent-*`** — Replaceable `AgentFn` implementations (Cursor / Hermes CLIs, or HTTP LLM).
- **`workflow-util-agent`** — Shared prompt assembly and subprocess spawning for CLI agents.
- **`workflow-template-*`** — Concrete `WorkflowDefinition` graphs + Zod role schemas + descriptor builders for publishing bundles.
- **`workflow-dashboard`** — Standalone React UI; no published library entry matching `src/index.ts`.
## Three-phase engine loop
Each role round is implemented in `packages/workflow-runtime/src/create-workflow.ts` (`advanceOneRound`): moderator → agent → extractor, with progressive context types from `@uncaged/workflow-protocol`.
```
┌─→ Phase 1: MODERATOR
│ Context: ModeratorContext { threadId, start, steps }
│ Context: ModeratorContext { threadId, depth, start, steps }
│ Action: moderator(ctx) → role name | END
│ Phase 2: AGENT
@@ -82,98 +116,92 @@ Each role execution has three distinct phases with progressive context:
│ Phase 3: EXTRACTOR
│ Context: ExtractContext = AgentCtx + { agentContent }
│ Action: extract(schema, extractPrompt, ctx) → typed meta
│ Action: runtime.extract(schema, extractPrompt, ctx) → typed meta
│ Merge: RoleStep { role, content, meta, timestamp }
│ Merge: RoleStep { role, contentHash, meta, refs, timestamp }
│ Append to steps
└─────────────────────────────────────────────────────┘
```
### Context Types (progressive)
### Context types (progressive)
Defined in `packages/workflow-protocol/src/types.ts`:
```typescript
// Phase 1: Moderator sees accumulated state only
type ModeratorContext<M> = {
threadId: string;
start: StartStep;
steps: RoleStep<M>[];
};
// Phase 2: Agent knows its identity
type ModeratorContext<M> = ThreadContext<M>;
type AgentContext<M> = ModeratorContext<M> & {
currentRole: { name: string; systemPrompt: string };
};
// Phase 3: Extractor has agent output
type ExtractContext<M> = AgentContext<M> & {
agentContent: string;
};
// ThreadContext is an alias for AgentContext (backward compat)
type ThreadContext<M> = AgentContext<M>;
type ExtractContext<M> = AgentContext<M> & { agentContent: string };
```
### Key Properties
### Key properties
- **Moderator is synchronous and pure** — no I/O, no state mutation
- **Agent gets context, not instructions** — reads `ctx.currentRole.systemPrompt`
- **Extractor is a general tool** — not limited to post-agent extraction; agents can use it too (e.g. Cursor agent extracts workspace path before execution)
- **extractPrompt is a call parameter**, not context state — different callers use different prompts
- **Moderator is synchronous and pure** — no I/O, no state mutation inside `createWorkflow`’s moderator call path.
- **Agent receives `AgentContext`** — reads `ctx.currentRole.systemPrompt`; raw output becomes `agentContent` for extract.
- **Extractor is `WorkflowRuntime.extract`** — supplied by the engine from registry-resolved LLM config (`workflow-execute`); stores agent body in CAS and yields `contentHash` + `refs` on each step (`create-workflow.ts`).
- **`extractPrompt` is a call parameter** on `RoleDefinition`, not implicit context state.
## Agent Information Sources
## Agent information sources
An agent has exactly three information sources:
1. **Prior knowledge** — LLM training, agent memory, agent skills
2. **Thread context**`AgentContext` (start, steps, currentRole)
2. **Thread context**`AgentContext` (`start`, `steps`, `currentRole`)
3. **Derived information** — from 1 & 2 (e.g. tool calls, shell commands)
No hidden environment parameters. If an agent needs something (like a workspace path), it extracts it from context using `ExtractFn`.
No hidden environment parameters. If an agent needs something (like a workspace path), it obtains it via `ExtractFn` (e.g. Cursor agent).
## Bundle Contract
## Bundle contract
A workflow bundle is a single `.esm.js` file with two named exports:
A workflow bundle is a single `.esm.js` file with two named exports (see `WorkflowFn` / `WorkflowDescriptor` in `packages/workflow-protocol/src/types.ts`):
```typescript
// Named exports (no default export)
export const descriptor: WorkflowDescriptor;
export const run: WorkflowFn;
type WorkflowFn = (
input: { prompt: string; steps: RoleOutput[] },
options: { threadId: string; maxRounds: number },
) => AsyncGenerator<RoleOutput, WorkflowResult>;
thread: ThreadContext,
runtime: WorkflowRuntime,
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
```
`RoleOutput` carries `contentHash`, `meta`, and `refs` (agent text lives in CAS, addressed by hash).
### Constraints
- Single `.esm.js` file
- No dynamic `import()`
- All static imports must be Node built-in modules only
- XXH64 hash (Crockford Base32) = globally unique version ID
- No dynamic `import()` in bundles (loader exempt in engine)
- Portable bundle static imports are constrained by validation in `@uncaged/workflow-register` (`validateWorkflowBundle`)
- XXH64 hash (Crockford Base32) = version ID
### Why AsyncGenerator?
- Each `yield` → engine writes to `.data.jsonl`, checks abort/pause
- `return` → engine marks thread complete
- Fork = pass historical steps as `input.steps` to a new generator
- Zero injection — bundle doesn't import from the engine
- Each `yield` lets `workflow-execute` persist state, CAS rows, and enforce pause/abort
- `return` supplies `WorkflowCompletion`
- Fork replays historical steps into a new thread context
- Bundle does not import the engine — only protocol/runtime types at build time
## Storage Layout
## Storage layout
```
~/.uncaged/workflow/
├── cas/ # Global content-addressed blobs (see getGlobalCasDir)
├── bundles/
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
── C9NMV6V2TQT81.yaml # Role descriptor
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
│ └── C9NMV6V2TQT81/ # Per-hash bundle dir (alongside or instead of loose files)
│ ├── threads.json # Active threads: threadId → { head, start, updatedAt }
│ └── history/
│ └── 2026-05-09.jsonl # Completed threads (one JSON object per line)
├── logs/ # One folder per bundle hash
│ └── C9NMV6V2TQT81/
│ ├── 01KQXKW…YG.data.jsonl # Thread state
│ └── 01KQXKW…YG.info.jsonl # Debug log
│ ├── 01KQXKW…YG.running # Present while worker executes this thread (optional)
│ └── 01KQXKW…YG.info.jsonl # Debug log
└── workflow.yaml # Registry
```
### ID Encoding: Crockford Base32
### ID encoding: Crockford Base32
- Case-insensitive, filesystem-safe, no ambiguous chars (0/O, 1/I/L)
- Bundle hash: XXH64 → 13-char
@@ -181,45 +209,31 @@ type WorkflowFn = (
### Registry (`workflow.yaml`)
```yaml
workflows:
solve-issue:
hash: "C9NMV6V2TQT81"
timestamp: 1714963200000
history:
- hash: "A7BKR3M1NPQ40"
timestamp: 1714876800000
```
Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution.
### Thread JSONL
### Thread storage (CAS + index)
**`.data.jsonl`** — Line 1: start record, Line 2+: role outputs
Thread execution state is a chain of immutable CAS nodes (`StartNode`, `StateNode`, content Merkle blobs). Per bundle:
```jsonc
// Start record
{ "name": "solve-issue", "hash": "C9NMV6V2TQT81", "threadId": "01KQXKW…",
"parameters": { "prompt": "Fix bug #3", "options": { "maxRounds": 5 } },
"timestamp": 1714963200000 }
// Role output
{ "role": "planner", "content": "...", "meta": { "phases": [...] }, "timestamp": ... }
```
- **`threads.json`** — only in-flight threads (`head`, `start`, `updatedAt`).
- **`history/{YYYY-MM-DD}.jsonl`** — completed threads (`threadId`, `head`, `start`, `completedAt`).
- **CAS (`cas/`)** — payloads and refs for replay, GC, and fork sharing.
**`.info.jsonl`** — Structured debug log
**`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`:
```jsonc
{ "tag": "4KNMR2PX", "content": "Loading bundle...", "timestamp": ... }
```
Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNMR2PX"` instant code location.
Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNMR2PX"` → code location.
## Execution Model
## Execution model
- **No daemon.** `uncaged-workflow run <name>` starts a worker process
- Same bundle's threads share one process (memory efficiency)
- Process exits when all threads complete
- Thread termination via IPC within the process
- **No daemon.** `uncaged-workflow run <name>` starts a worker process (`workflow-execute` worker entry via `getWorkerHostScriptPath`)
- Threads share bundle-scoped workers as implemented in CLI/engine
- Pause/resume/abort via engine IPC and pause gate (`createThreadPauseGate`)
## CLI Commands
## CLI commands
| Priority | Command | Description |
|----------|---------|-------------|
@@ -239,18 +253,16 @@ Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNM
| P2 | `resume <thread-id>` | Resume a paused thread |
| P3 | `fork <thread-id> [--from-role <role>]` | Fork from historical state |
All commands implemented and tested. ✅
## Design Decisions
## Design decisions
| Decision | Rationale |
|----------|-----------|
| **Role = pure data** | Decouples definition from execution; same role with different agents |
| **Agent bound at runtime** | WorkflowDefinition is reusable; agent choice is deployment concern |
| **Three-phase context** | Each phase sees only what it needs; clean separation |
| **ExtractFn as general tool** | Agents use it for pre-execution extraction; engine uses it for meta |
| **Single-file ESM** | Hash = version, no dependency hell, self-contained |
| **No daemon** | OS handles process lifecycle; unnecessary complexity |
| **Agent bound at runtime** | `WorkflowDefinition` is reusable; agent choice is deployment concern |
| **Three-phase context** | Each phase sees only what it needs; types live in `workflow-protocol` |
| **`WorkflowRuntime.extract` + CAS `contentHash`** | Large agent bodies deduplicated globally; Merkle roots summarize threads |
| **`workflow-reactor` split** | LLM tool-calling loop isolated from filesystem/registry concerns |
| **Single-file ESM** | Hash = version, self-contained bundle |
| **No daemon** | OS handles process lifecycle |
| **Crockford Base32** | Filesystem-safe, readable, compact |
| **No concurrency in registry** | Different workflows have different constraints; belongs at workflow/role level |
| **No dryRun** | Tests use mock agents + mock fetch; simpler architecture |
| **15-package split** | Clear boundaries: protocol ↔ runtime author API ↔ util/CAS/register ↔ execute ↔ CLI ↔ agents/templates/UI |
@@ -1,5 +1,7 @@
# Workflow-as-Agent Implementation Plan
> ⚠️ This plan references the pre-split package structure. File paths have changed.
> **For Hermes:** Use subagent-driven-development skill to implement this plan task-by-task.
**Goal:** Enable workflows to invoke other workflows as agents, backed by global CAS and refs tracking.
+262
View File
@@ -0,0 +1,262 @@
# RFC: CAS-Based Thread Storage
> Status: Draft
> Author: 小橘 🍊(NEKO Team)
> Date: 2026-05-09
## Summary
Replace `.data.jsonl` with a fully CAS-based thread state chain. Threads become linked lists of immutable CAS nodes, indexed by a per-bundle `threads.json`.
## Motivation
`.data.jsonl` is a flat append-only file with three different row formats (start, role step, end). This makes forking expensive (copy file), deduplication impossible (forked threads repeat shared history), and GC complex (must parse every row to find CAS refs).
Threads are inherently immutable append-only sequences — a natural fit for CAS hash chains, similar to git's commit DAG.
## Design
### Node Types
Two CAS node types, using the existing `{ type, payload, refs }` CAS blob structure:
#### StartNode
Contains workflow-level parameters. **No threadId** (because the same StartNode can be shared across forks). Prompt is stored as a CAS blob and referenced via `refs[0]`.
```
CAS blob:
{
type: "start",
payload: {
name: "solve-issue",
hash: "BUNDLE_HASH",
maxRounds: 10,
depth: 0
},
refs: [
<prompt_hash> // refs[0]: initial task prompt (CAS blob)
]
}
```
- No `role`, `content`, `meta` — this is not a step, it's workflow metadata
- Prompt is **not** inline — it lives in CAS and is referenced by hash
#### StateNode
One per role step (including `__end__`).
```
CAS blob:
{
type: "state",
payload: {
role: "coder",
meta: { ... },
start: "<start_hash>",
content: "<content_merkle_hash>",
ancestors: ["<parent_hash>", "<grandparent_hash>", ...],
compact: null,
timestamp: 1234567890
},
refs: [<start_hash>, <content_hash>, <parent_hash>, ...]
}
```
**Payload is the source of truth.** Application code reads named fields from payload. `refs[]` is a **GC index** — automatically derived from payload by collecting all CAS hashes. GC only scans `refs[]` without understanding payload structure.
**Payload fields:**
| Field | Type | Meaning |
|-------|------|---------|
| `role` | `string` | Role name, or `"__end__"` for completion |
| `meta` | `object` | Structured metadata extracted from agent output |
| `start` | `string` | StartNode hash |
| `content` | `string` | Content Merkle node hash (carries role artifact refs) |
| `ancestors` | `string[]` | `[parent, grandparent, ...]` — up to 11 entries (1 parent + 10 skip-list). Empty for first step after start. `ancestors[0]` is the direct parent. |
| `compact` | `string \| null` | CAS hash of a compacted summary of all nodes before this one. When present, LLM context assembly can use this instead of walking the full chain. |
| `timestamp` | `number` | Unix timestamp in ms |
### Content Merkle Node
The content at `refs[2]` of each StateNode is itself a CAS Merkle node. This is where **role artifact references** live:
```
CAS blob:
{
type: "content",
payload: "<role output text>",
refs: [
<artifact_hash_1>, // e.g. a commit, a file, a sub-result
<artifact_hash_2>,
...
]
}
```
The Extractor is responsible for producing both `meta` and `refs` from raw agent output:
```
Agent raw output
Extractor → { meta, contentPayload, refs[] }
CAS put content Merkle: { type: "content", payload: contentPayload, refs }
↓ contentHash
StateNode: { ..., refs: [start, parent, contentHash, ...ancestors] }
```
This keeps StateNode refs fixed and simple. All role-specific artifact references are encapsulated in the content Merkle node. GC follows: `thread head → StateNode.refs → content Merkle.refs → artifacts`, full chain recursive.
### End Node
An end is just a StateNode with `role: "__end__"`:
```
{
type: "state",
payload: {
role: "__end__",
meta: { returnCode: 0, summary: "completed successfully" },
start: "<start_hash>",
content: "<content_hash>",
ancestors: ["<parent_hash>", ...],
compact: null,
timestamp: 1234567891
},
refs: [<start_hash>, <content_hash>, <parent_hash>, ...]
}
```
### Thread Index: `threads.json`
Per-bundle directory, one `threads.json` file. **Only active (in-progress) threads** live here:
```
~/.uncaged/workflow/bundles/<hash>/threads.json
```
```json
{
"01JTHREAD1AAAAAAAAAAAAAAA": {
"head": "<latest_state_node_hash>",
"start": "<start_node_hash>",
"updatedAt": 1234567891
}
}
```
When a thread completes (`__end__`), it is **removed from `threads.json`** and appended to a date-partitioned history file:
```
~/.uncaged/workflow/bundles/<hash>/history/{YYYY-MM-DD}.jsonl
```
Each line:
```json
{"threadId":"01JTHREAD1AAAAAAAAAAAAAAA","head":"<end_node_hash>","start":"<start_node_hash>","completedAt":1234567891}
```
Benefits:
- `threads.json` stays small — only in-flight threads
- Dashboard watches `threads.json` for real-time updates; completed threads don't trigger watches
- History is queryable by date but not actively monitored
- GC roots = all heads from `threads.json` + all heads from `history/*.jsonl`
### Ancestor Skip-List
Each StateNode carries up to 11 entries in `payload.ancestors` (1 parent + 10 skip-list, newest first):
```
Node 15: ancestors = [node14, node13, node12, node11, node10, node9, node8, node7, node6, node5, node4]
^parent ^--- skip-list (10 most recent) ---^
```
This enables:
- **Paginated fetch**: jump to any recent ancestor without walking the full chain
- **Partial replay**: fetch last N steps without loading the entire history
- The list is capped at 10 to keep node size bounded
### Fork
Forking a thread at step N:
1. Create new threadId
2. Create a new StateNode whose `parent` (refs[1]) points to the fork point's StateNode
3. Register the new threadId in `threads.json` with its own head
4. **Zero data duplication** — the forked thread shares all ancestor nodes via CAS
### Compact
When a StateNode has `payload.compact` set:
```json
{
"type": "state",
"payload": {
"role": "coder",
"meta": { ... },
"compact": "<cas_hash_of_summary>",
"timestamp": 1234
},
"refs": [...]
}
```
This means: "everything before this node has been summarized into the blob at `compact`". When building LLM context:
1. Walk back from head
2. If a node has `compact`, stop walking — use the compact summary + all nodes after it
3. If no compact found, use full chain
This enables long-running threads without unbounded context growth.
### GC
Simple mark-and-sweep:
1. **Roots**: all `head` and `start` hashes from `threads.json` + all `history/*.jsonl` files
2. **Mark**: from each root, recursively mark all reachable hashes via `refs[]` (including content Merkle → artifact refs)
3. **Sweep**: delete unmarked CAS blobs
No per-row format parsing needed. GC only needs to understand `refs[]`.
### refs[] Derivation
`refs[]` is auto-derived from payload at write time via a `collectRefs(payload)` function that extracts all CAS hash strings from named fields (`start`, `content`, `ancestors`, `compact`). Application code never reads `refs[]` — it reads named payload fields. This makes `refs[]` a pure GC optimization with zero semantic coupling.
### Extract Phase
The Extractor is expanded from the current design. Currently it only extracts `meta` from agent output. In the new design it extracts:
| Output | Purpose |
|--------|---------|
| `meta` | Structured metadata (same as before) |
| `contentPayload` | The text payload for the content Merkle node |
| `refs[]` | CAS hashes of artifacts produced by this role step |
The `refs[]` become the content Merkle node's refs, enabling GC to trace all role-produced artifacts.
## What Stays Unchanged
- `.info.jsonl` — debug logging stays as-is (high-frequency append, not suitable for CAS)
- CAS blob storage format (`~/.uncaged/workflow/cas/`)
- Bundle registry (`workflow.yaml`)
## Migration
Breaking change. Old `.data.jsonl` files become incompatible. No backward compat fallback (per project convention).
## Changes by Package
| Package | Changes |
|---------|---------|
| `workflow-protocol` | Replace `StartStep`, `RoleStep` types with `StartNode`, `StateNode`. Add `ContentMerkleNode` type. Expand `ExtractResult` to include `refs[]`. |
| `workflow-cas` | Add `findReachableHashes(roots)` for GC mark phase |
| `workflow-execute` | Rewrite engine to write CAS nodes + update `threads.json` instead of appending JSONL. Move completed threads to `history/`. Simplify `gc.ts`. Simplify `fork-thread.ts`. Expand extract phase to produce refs. |
| `workflow-runtime` | `ThreadContext` built by walking chain from head. `start.prompt` resolved from CAS via StartNode.refs[0]. |
| `cli-workflow` | `thread list/show/rm` read from `threads.json` + `history/`. SSE watches `threads.json`. |
| `workflow-dashboard` | Watch `threads.json` instead of `.data.jsonl` |
| Templates & Agents | Update extract definitions to produce `refs[]`. Update `ctx.start.content` → CAS resolved. |
@@ -2,10 +2,9 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow-cas";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/commands/cas/index.js";
import {
cmdAdd,
@@ -1,12 +1,16 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdFork, cmdRun } from "../src/commands/thread/index.js";
import { cmdAdd } from "../src/commands/workflow/index.js";
import { pathExists } from "../src/fs-utils.js";
import { resolveThreadRecord } from "../src/thread-scan.js";
import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
@@ -41,27 +45,6 @@ export const run = async function* (input, options) {
};
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(dataPath: string, minLines: number): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
if (!(await pathExists(runningPath))) {
@@ -71,6 +54,41 @@ async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
}
}
async function waitUntilThreadCompletes(storageRoot: string, threadId: string): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
const row = await resolveThreadRecord(storageRoot, threadId);
if (row?.source === "history") {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function listMeaningfulRoleContents(
storageRoot: string,
threadId: string,
): Promise<Array<{ role: string; content: string }>> {
const row = await resolveThreadRecord(storageRoot, threadId);
if (row === null) {
return [];
}
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, row.head);
const chronological = [...frames].reverse();
const out: Array<{ role: string; content: string }> = [];
for (const fr of chronological) {
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
const content = await getContentMerklePayload(cas, fr.payload.content);
out.push({
role: fr.payload.role,
content: content ?? "",
});
}
return out;
}
describe("cli fork", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -110,10 +128,12 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilThreadCompletes(storageRoot, sourceId);
const histBefore = await resolveThreadRecord(storageRoot, sourceId);
expect(histBefore?.source).toBe("history");
const forked = await cmdFork(storageRoot, sourceId, "planner");
expect(forked.ok).toBe(true);
@@ -121,25 +141,18 @@ describe("cli fork", () => {
return;
}
const newId = forked.value.threadId;
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 5);
await waitUntilThreadCompletes(storageRoot, newId);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(5);
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(start.threadId).toBe(newId);
expect(start.forkFrom).toEqual({ threadId: sourceId });
const forkHist = await resolveThreadRecord(storageRoot, newId);
expect(forkHist?.source).toBe("history");
expect(forkHist?.start).toBe(histBefore?.start);
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1");
const steps = await listMeaningfulRoleContents(storageRoot, newId);
const tail = steps[steps.length - 1];
expect(tail?.role).toBe("reviewer");
expect(tail?.content).toBe("rev-1");
});
test("fork without --from-role retries last role", async () => {
@@ -161,10 +174,8 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${sourceId}.running`));
await waitUntilThreadCompletes(storageRoot, sourceId);
const forked = await cmdFork(storageRoot, sourceId, null);
expect(forked.ok).toBe(true);
@@ -172,26 +183,17 @@ describe("cli fork", () => {
return;
}
const newId = forked.value.threadId;
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 5);
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${newId}.running`));
await waitUntilThreadCompletes(storageRoot, newId);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(5);
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(replayCoder.role).toBe("coder");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2");
const steps = await listMeaningfulRoleContents(storageRoot, newId);
expect(steps.length).toBeGreaterThanOrEqual(3);
const coderReplay = steps[steps.length - 2];
expect(coderReplay?.role).toBe("coder");
expect(coderReplay?.content).toBe("c1");
const tail = steps[steps.length - 1];
expect(tail?.role).toBe("reviewer");
expect(tail?.content).toBe("rev-2");
});
test("fork rejects unknown role with available names", async () => {
@@ -212,10 +214,10 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilRunningAbsent(
join(storageRoot, "logs", added.value.hash, `${sourceId}.running`),
);
await waitUntilThreadCompletes(storageRoot, sourceId);
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
expect(bad.ok).toBe(false);
+60 -63
View File
@@ -1,45 +1,17 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { createCasStore, putStartNode } from "@uncaged/workflow-cas";
import { garbageCollectCas, getBundleDir, upsertThreadEntry } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { cmdThreadRemove } from "../src/commands/thread/index.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
async function writeDemoDataJsonl(params: {
path: string;
threadId: string;
bundleHash: string;
cas: ReturnType<typeof createCasStore>;
activeHash: string;
}): Promise<void> {
const bodyHash = await putContentMerkleNode(params.cas, "p");
const text = [
JSON.stringify({
name: "demo",
hash: params.bundleHash,
threadId: params.threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
contentHash: bodyHash,
meta: {},
refs: [params.activeHash, bodyHash],
timestamp: 101,
}),
"",
].join("\n");
await writeFile(params.path, text, "utf8");
}
describe("gc cli and garbageCollectCas", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -59,22 +31,30 @@ describe("gc cli and garbageCollectCas", () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
test("garbageCollectCas keeps CAS entries reachable from threads.json roots", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01AAA1111111111111111111";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
activeHash,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const gc = await garbageCollectCas(storageRoot);
@@ -82,12 +62,12 @@ describe("gc cli and garbageCollectCas", () => {
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(2);
expect(gc.value.scannedThreads).toBe(2);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${startHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
@@ -110,19 +90,27 @@ describe("gc cli and garbageCollectCas", () => {
test("cli gc prints stats", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01BBB2222222222222222222";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("keep-me");
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await cas.put("drop-me");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
@@ -131,23 +119,32 @@ describe("gc cli and garbageCollectCas", () => {
encoding: "utf8",
});
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries");
expect(String(proc.stdout).trim()).toBe("scanned 2 threads, 2 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01CCC3333333333333333333";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
activeHash,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const orphanHash = await cas.put("orphan-after-rm");
@@ -157,6 +154,6 @@ describe("gc cli and garbageCollectCas", () => {
expect(removed.ok).toBe(true);
expect(await pathExists(orphanPath)).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(false);
});
});
+2 -241
View File
@@ -1,13 +1,10 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawn, spawnSync } from "node:child_process";
import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { spawnSync } from "node:child_process";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import {
formatLiveDebugLine,
formatLiveTimeLabel,
@@ -18,11 +15,6 @@ import {
import { parseLiveArgv } from "../src/live-argv.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */
const LIVE_FIXTURE_PLANNER_BODY =
"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11";
describe("live helpers", () => {
test("formatLiveTimeLabel pads HH:MM:SS", () => {
@@ -86,28 +78,6 @@ describe("live CLI", () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true });
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
);
const cas = createCasStore(getGlobalCasDir(storageRoot));
await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY);
await putContentMerkleNode(cas, "patch");
await putContentMerkleNode(cas, "still running");
});
afterEach(async () => {
@@ -119,170 +89,6 @@ describe("live CLI", () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("prints role steps and summary for a completed thread", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("coder");
expect(stdout).toContain("meta:");
expect(stdout).toContain('"phase":"plan"');
expect(stdout).toContain("LINE10");
expect(stdout).not.toContain("LINE11");
expect(stdout).toContain("more line");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("fixture completed");
});
test("--latest tails the newest thread by start timestamp", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("fixture completed");
expect(stdout).not.toContain("older thread");
});
test("--debug prints .info.jsonl records after data output", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("bundle loaded");
expect(stdout).toContain("[DEBUGTAG2]");
expect(stdout).toContain("multi line");
});
test("--role filters out non-matching roles", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("completed: returnCode=0");
});
test("--latest --debug --role combine", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "--latest", "--debug", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("fixture completed");
});
test("unknown thread id exits 1", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
@@ -292,51 +98,6 @@ describe("live CLI", () => {
expect(r.status).toBe(1);
expect(String(r.stderr ?? "")).toContain("thread not found");
});
test("follows file until WorkflowResult is appended", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const dataPath = join(
storageRoot,
"logs",
"C9NMV6V2TQT81",
"01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl",
);
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
await new Promise((r) => setTimeout(r, 120));
const prior = await readFile(dataPath, "utf8");
await writeFile(
dataPath,
`${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`,
"utf8",
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("caught up");
});
});
describe("live --latest with empty storage", () => {
@@ -1,9 +1,10 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { getBundleDir, readThreadsIndex } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdCasPut } from "../src/commands/cas/index.js";
import {
@@ -18,6 +19,7 @@ import {
} from "../src/commands/thread/index.js";
import { cmdAdd } from "../src/commands/workflow/index.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { resolveThreadRecord } from "../src/thread-scan.js";
import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
@@ -101,34 +103,21 @@ export const run = async function* (_input, options) {
};
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(
dataPath: string,
minLines: number,
maxAttempts: number,
): Promise<void> {
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
if (!(await pathExists(runningPath))) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
async function waitUntilPredicate(
predicate: () => Promise<boolean>,
maxAttempts: number,
): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (!(await pathExists(runningPath))) {
if (await predicate()) {
return;
}
await new Promise((r) => setTimeout(r, 25));
@@ -200,8 +189,7 @@ describe("cli thread commands", () => {
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
expect(await pathExists(dataPath)).toBe(false);
expect(await resolveThreadRecord(storageRoot, threadId)).toBeNull();
});
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
@@ -234,9 +222,9 @@ describe("cli thread commands", () => {
threads = await cmdThreads(storageRoot, []);
}
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 120);
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
const put = await cmdCasPut(storageRoot, "keep-after-thread-rm");
expect(put.ok).toBe(true);
@@ -323,24 +311,20 @@ describe("cli thread commands", () => {
const killed = await cmdKill(storageRoot, threadId);
expect(killed.ok).toBe(true);
await new Promise((r) => setTimeout(r, 900));
await waitUntilPredicate(async () => {
return (await resolveThreadRecord(storageRoot, threadId))?.source === "history";
}, 120);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const text = await readFile(dataPath, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause stops between yields and resume completes thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const srcDir = join(storageRoot, "src");
await mkdir(srcDir, { recursive: true });
const bundlePath = join(srcDir, "demo.esm.js");
await writeFile(bundlePath, pauseResumeBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
@@ -356,24 +340,33 @@ describe("cli thread commands", () => {
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const bundleDir = getBundleDir(storageRoot, added.value.hash);
await waitUntilMinDataLines(dataPath, 2, 80);
expect(await countDataJsonlLines(dataPath)).toBe(2);
await waitUntilPredicate(async () => {
const idx = await readThreadsIndex(bundleDir);
const ent = idx[threadId];
return ent !== undefined && ent.head !== ent.start;
}, 80);
const idxBeforePause = await readThreadsIndex(bundleDir);
const headAtPause = idxBeforePause[threadId]?.head;
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(true);
await new Promise((r) => setTimeout(r, 400));
expect(await countDataJsonlLines(dataPath)).toBe(2);
const idxPaused = await readThreadsIndex(bundleDir);
expect(idxPaused[threadId]?.head).toBe(headAtPause);
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(true);
await waitUntilMinDataLines(dataPath, 4, 120);
expect(await countDataJsonlLines(dataPath)).toBe(4);
await waitUntilPredicate(async () => {
const row = await resolveThreadRecord(storageRoot, threadId);
return row?.source === "history";
}, 120);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
});
@@ -397,8 +390,7 @@ describe("cli thread commands", () => {
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
@@ -9,9 +9,6 @@ import { createThreadDispatcher, dispatchLive, dispatchRun } from "./commands/th
import { createWorkflowDispatcher } from "./commands/workflow/index.js";
import { formatSkillIndex, formatSkillTopic, getSkillTopics } from "./skill.js";
export type { CommandEntry, CommandGroup, DispatchFn } from "./cli-command-types.js";
export { getCommandRegistry } from "./cli-registry.js";
function dispatchGroup(
tableName: string,
table: Record<string, CommandEntry>,
+1 -1
View File
@@ -1,5 +1,5 @@
import type { Result } from "@uncaged/workflow-protocol";
import { type GcResult, garbageCollectCas } from "@uncaged/workflow-execute";
import type { Result } from "@uncaged/workflow-protocol";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasGet(
storageRoot: string,
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasList(storageRoot: string): Promise<Result<string[], string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasPut(
storageRoot: string,
+1 -1
View File
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasRm(storageRoot: string, hash: string): Promise<Result<void, string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -8,7 +8,7 @@ import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string): Hono {
export function createApp(storageRoot: string, agentToken: string | null): Hono {
const app = new Hono();
app.onError((_err, c) => {
@@ -37,7 +37,19 @@ export function createApp(storageRoot: string): Hono {
await next();
});
// ── Agent token auth (skip healthz) ───────────────────────────────
if (agentToken !== null) {
app.use("/api/*", async (c, next) => {
const token = c.req.header("X-Agent-Token");
if (token !== agentToken) {
return c.json({ error: "unauthorized" }, 401);
}
await next();
});
}
app.get("/healthz", (c) => c.json({ ok: true }));
app.get("/api/healthz", (c) => c.json({ ok: true }));
app.route("/api/workflows", createWorkflowRoutes(storageRoot));
app.route("/api/threads", createThreadRoutes(storageRoot));
@@ -1,6 +1,6 @@
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
export function createCasRoutes(storageRoot: string): Hono {
@@ -1,9 +1,18 @@
import { statSync, watch } from "node:fs";
import { dirname, join } from "node:path";
import { existsSync, statSync, watch } from "node:fs";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import {
FORK_BRANCH_ROLE,
readThreadsIndex,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
type PumpState = {
contentOffset: number;
@@ -21,7 +30,6 @@ function fileSize(path: string): number {
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
const size = fileSize(path);
if (size < state.contentOffset) {
// File was truncated — reset
state.contentOffset = 0;
state.carry = "";
}
@@ -42,15 +50,6 @@ function parseJsonLine(line: string): unknown {
}
}
function isWorkflowResult(record: unknown): boolean {
return (
record !== null &&
typeof record === "object" &&
"type" in (record as Record<string, unknown>) &&
(record as Record<string, unknown>).type === "workflow-result"
);
}
function parseNewLines(chunk: string, state: PumpState): string[] {
state.carry += chunk;
@@ -67,52 +66,198 @@ function parseNewLines(chunk: string, state: PumpState): string[] {
return lines;
}
type CasSseState = {
printedHashes: Set<string>;
lastHead: string | null;
completionEmitted: boolean;
};
type LiveSseStream = {
writeSSE: (opts: { event: string; data: string; id: string }) => Promise<void>;
};
function completionFromEndMeta(meta: Record<string, unknown>): {
returnCode: number;
summary: string;
} | null {
const returnCode = meta.returnCode;
const summary = meta.summary;
if (typeof returnCode !== "number" || typeof summary !== "string") {
return null;
}
return { returnCode, summary };
}
async function emitRecordsForHead(params: {
storageRoot: string;
bundleDir: string;
threadId: string;
headHash: string;
sseState: CasSseState;
stream: LiveSseStream;
eventId: { n: number };
}): Promise<boolean> {
const cas = createCasStore(getGlobalCasDir(params.storageRoot));
const frames = await walkStateFramesNewestFirst(cas, params.headHash);
const chronological = [...frames].reverse();
for (const fr of chronological) {
if (params.sseState.printedHashes.has(fr.hash)) {
continue;
}
params.sseState.printedHashes.add(fr.hash);
const role = fr.payload.role;
if (role === FORK_BRANCH_ROLE) {
continue;
}
if (role === END) {
const wf = completionFromEndMeta(fr.payload.meta);
if (wf !== null) {
params.eventId.n++;
await params.stream.writeSSE({
event: "record",
data: JSON.stringify({
type: "workflow-result",
returnCode: wf.returnCode,
content: wf.summary,
timestamp: null,
}),
id: String(params.eventId.n),
});
return true;
}
continue;
}
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
const content =
payloadText !== null
? payloadText
: `(content not in CAS; contentHash=${fr.payload.content})`;
params.eventId.n++;
await params.stream.writeSSE({
event: "record",
data: JSON.stringify({
type: "role",
role: fr.payload.role,
contentHash: fr.payload.content,
content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
}),
id: String(params.eventId.n),
});
}
return false;
}
async function pumpThreadsJsonSse(params: {
storageRoot: string;
bundleDir: string;
threadId: string;
sseState: CasSseState;
stream: LiveSseStream;
eventId: { n: number };
}): Promise<boolean> {
let idx: ThreadIndex;
try {
idx = await readThreadsIndex(params.bundleDir);
} catch {
idx = {};
}
const active = idx[params.threadId];
if (active === undefined) {
if (params.sseState.completionEmitted) {
return false;
}
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
if (hist === null || hist.source !== "history") {
return false;
}
params.sseState.completionEmitted = true;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: hist.head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
const head = active.head;
if (params.sseState.lastHead === null) {
params.sseState.lastHead = head;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
if (head !== params.sseState.lastHead) {
params.sseState.lastHead = head;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
return false;
}
export function createLiveRoutes(storageRoot: string): Hono {
const app = new Hono();
app.get("/:threadId/live", async (c) => {
const threadId = c.req.param("threadId");
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const resolvedDataPath = dataPath;
const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`);
const threadTarget = resolved;
const threadsJsonPath = join(threadTarget.bundleDir, "threads.json");
const infoPath = join(storageRoot, "logs", threadTarget.bundleHash, `${threadId}.info.jsonl`);
return streamSSE(c, async (stream) => {
const dataState: PumpState = { contentOffset: 0, carry: "" };
const infoState: PumpState = { contentOffset: 0, carry: "" };
let eventId = 0;
const sseThreadState: CasSseState = {
printedHashes: new Set<string>(),
lastHead: null,
completionEmitted: false,
};
const eventId = { n: 0 };
async function pumpData(): Promise<boolean> {
let chunk: string | null;
try {
chunk = await readNewBytes(resolvedDataPath, dataState);
} catch {
return false;
}
if (chunk === null) {
return false;
}
const lines = parseNewLines(chunk, dataState);
for (const line of lines) {
const record = parseJsonLine(line);
eventId++;
await stream.writeSSE({
event: "record",
data: JSON.stringify(record),
id: String(eventId),
});
if (isWorkflowResult(record)) {
return true;
}
}
return false;
const finished = await pumpThreadsJsonSse({
storageRoot,
bundleDir: threadTarget.bundleDir,
threadId,
sseState: sseThreadState,
stream,
eventId,
});
return finished;
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: SSE newline framing mirrors legacy pump
async function pumpInfo(): Promise<void> {
let chunk: string | null;
try {
@@ -134,28 +279,58 @@ export function createLiveRoutes(storageRoot: string): Hono {
) {
continue;
}
eventId++;
eventId.n++;
await stream.writeSSE({
event: "info",
data: JSON.stringify(record),
id: String(eventId),
id: String(eventId.n),
});
}
}
// Initial pump
eventId.n++;
await stream.writeSSE({
event: "record",
data: JSON.stringify({
type: "thread-start",
threadId: threadTarget.threadId,
bundleHash: threadTarget.bundleHash,
head: threadTarget.head,
start: threadTarget.start,
source: threadTarget.source,
}),
id: String(eventId.n),
});
const done = await pumpData();
await pumpInfo();
try {
await pumpInfo();
} catch {
// optional info file
}
if (done) {
return;
}
// Watch for changes
// If thread is not actively running, emit all records and close — don't keep SSE open
const runningPath = join(storageRoot, "logs", threadTarget.bundleHash, `${threadId}.running`);
if (!existsSync(runningPath)) {
eventId.n++;
await stream.writeSSE({
event: "done",
data: JSON.stringify({ reason: "not-running" }),
id: String(eventId.n),
});
return;
}
const controller = new AbortController();
let completed = false;
const dataWatcher = watch(resolvedDataPath, async () => {
if (completed) return;
const threadsJsonWatcher = watch(threadsJsonPath, async () => {
if (completed) {
return;
}
const finished = await pumpData();
if (finished) {
completed = true;
@@ -166,7 +341,9 @@ export function createLiveRoutes(storageRoot: string): Hono {
let infoWatcher: ReturnType<typeof watch> | null = null;
try {
infoWatcher = watch(infoPath, async () => {
if (completed) return;
if (completed) {
return;
}
await pumpInfo();
});
} catch {
@@ -175,11 +352,10 @@ export function createLiveRoutes(storageRoot: string): Hono {
stream.onAbort(() => {
completed = true;
dataWatcher.close();
threadsJsonWatcher.close();
infoWatcher?.close();
});
// Keep stream alive until completion or client disconnect
await new Promise<void>((resolve) => {
if (completed) {
resolve();
@@ -189,7 +365,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
stream.onAbort(() => resolve());
});
dataWatcher.close();
threadsJsonWatcher.close();
infoWatcher?.close();
});
});
@@ -1,21 +1,119 @@
import { join } from "node:path";
import { createCasStore, getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
import { readTextFileIfExists } from "../../fs-utils.js";
import { pathExists } from "../../fs-utils.js";
import type { HistoricalThreadRow, ResolvedThreadRecord } from "../../thread-scan.js";
import {
listHistoricalThreads,
listRunningThreads,
resolveThreadDataPath,
resolveThreadListStatus,
resolveThreadRecord,
} from "../../thread-scan.js";
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
import { cmdRun } from "../thread/run.js";
async function readStartInfo(
cas: ReturnType<typeof createCasStore>,
startHash: string,
): Promise<{ name: string | null; prompt: string | null }> {
const raw = await cas.get(startHash);
if (raw === null) return { name: null, prompt: null };
const parsed = parseCasThreadNode(raw);
if (parsed === null || parsed.kind !== "start") return { name: null, prompt: null };
const name = parsed.node.payload.name;
const promptHash = parsed.node.refs[0] ?? null;
let prompt: string | null = null;
if (promptHash !== null) {
prompt = await getContentMerklePayload(cas, promptHash);
}
return { name, prompt };
}
async function buildThreadDetailRecords(
storageRoot: string,
resolved: ResolvedThreadRecord,
runningMarkerPresent: boolean,
statusRow: HistoricalThreadRow,
): Promise<unknown[]> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const { name: workflowName, prompt } = await readStartInfo(cas, resolved.start);
const status = await resolveThreadListStatus(storageRoot, statusRow, runningMarkerPresent);
const records: unknown[] = [
{
type: "thread-start",
workflow: workflowName ?? "unknown",
prompt: prompt ?? null,
threadId: resolved.threadId,
status,
timestamp: null,
},
];
for (const fr of chronological) {
if (fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
if (fr.payload.role === END) {
const returnCode = fr.payload.meta.returnCode;
const summary = fr.payload.meta.summary;
if (typeof returnCode === "number" && typeof summary === "string") {
records.push({
type: "workflow-result",
returnCode,
content: summary,
timestamp: fr.payload.timestamp,
});
}
continue;
}
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
const content =
payloadText !== null
? payloadText
: `(content not in CAS; contentHash=${fr.payload.content})`;
records.push({
type: "role",
role: fr.payload.role,
contentHash: fr.payload.content,
content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
});
}
return records;
}
export function createThreadRoutes(storageRoot: string): Hono {
const app = new Hono();
app.get("/", async (c) => {
const nameFilter = c.req.query("workflow") ?? null;
const rows = await listHistoricalThreads(storageRoot, nameFilter);
return c.json({ threads: rows });
const threads = await Promise.all(
rows.map(async (r) => {
const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`);
const runningMarkerPresent = await pathExists(runningPath);
const status = await resolveThreadListStatus(storageRoot, r, runningMarkerPresent);
return {
threadId: r.threadId,
workflow: r.workflowName,
hash: r.hash,
startedAt: new Date(r.activityTs).toISOString(),
status,
};
}),
);
return c.json({ threads });
});
app.get("/running", async (c) => {
@@ -25,22 +123,26 @@ export function createThreadRoutes(storageRoot: string): Hono {
app.get("/:threadId", async (c) => {
const threadId = c.req.param("threadId");
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return c.json({ error: `thread data missing: ${threadId}` }, 404);
}
const lines = text.trim().split("\n");
const records = lines.map((line) => {
try {
return JSON.parse(line) as unknown;
} catch {
return { raw: line };
}
});
const runningPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.running`);
const runningMarkerPresent = await pathExists(runningPath);
const statusRow = {
threadId: resolved.threadId,
hash: resolved.bundleHash,
workflowName: null,
source: resolved.source,
activityTs: 0,
head: resolved.head,
};
const records = await buildThreadDetailRecords(
storageRoot,
resolved,
runningMarkerPresent,
statusRow,
);
return c.json({ threadId, records });
});
+107 -13
View File
@@ -1,12 +1,27 @@
import { randomUUID } from "node:crypto";
import { hostname as osHostname } from "node:os";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { serve } from "bun";
import { printCliLine } from "../../cli-output.js";
import { createApp } from "./app.js";
import {
registerWithGateway,
startHeartbeat,
startTunnel,
unregisterFromGateway,
} from "./tunnel.js";
import type { ServeOptions } from "./types.js";
export function startServer(storageRoot: string, options: ServeOptions): void {
const app = createApp(storageRoot);
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
const HEARTBEAT_INTERVAL_MS = 60_000;
export function startServer(
storageRoot: string,
options: ServeOptions,
agentToken: string | null,
): void {
const app = createApp(storageRoot, agentToken);
const server = serve({
fetch: app.fetch,
@@ -28,30 +43,51 @@ function parsePortValue(value: string | undefined): Result<number, string> {
return ok(parsed);
}
function requireNextArg(argv: string[], i: number, flag: string): Result<string, string> {
const next = argv[i + 1];
if (next === undefined) {
return err(`${flag} requires a value`);
}
return ok(next);
}
function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
let port = 7860;
let hostname = "127.0.0.1";
let name = osHostname().split(".")[0].toLowerCase();
let noTunnel = false;
let gatewayUrl = DEFAULT_GATEWAY_URL;
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
const stringFlags: Record<string, (v: string) => void> = {
"--host": (v) => {
hostname = v;
},
"--name": (v) => {
name = v;
},
"--gateway": (v) => {
gatewayUrl = v;
},
};
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === "--port" || arg === "-p") {
const portResult = parsePortValue(argv[i + 1]);
if (!portResult.ok) {
return portResult;
}
if (!portResult.ok) return portResult;
port = portResult.value;
i++;
} else if (arg === "--host") {
const next = argv[i + 1];
if (next === undefined) {
return err("--host requires a value");
}
hostname = next;
} else if (arg === "--no-tunnel") {
noTunnel = true;
} else if (arg in stringFlags) {
const r = requireNextArg(argv, i, arg);
if (!r.ok) return r;
stringFlags[arg](r.value);
i++;
}
}
return ok({ port, hostname });
return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret });
}
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
@@ -61,7 +97,65 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
return 1;
}
startServer(storageRoot, parsed.value);
const options = parsed.value;
const agentToken = options.noTunnel ? null : randomUUID();
startServer(storageRoot, options, agentToken);
if (options.noTunnel) {
printCliLine("tunnel disabled (--no-tunnel)");
await new Promise(() => {});
return 0;
}
// Start cloudflared quick tunnel
printCliLine("starting cloudflared quick tunnel...");
const tunnel = await startTunnel(options.port);
if (!tunnel) {
printCliLine("failed to create tunnel — continuing without gateway registration");
await new Promise(() => {});
return 0;
}
printCliLine(`tunnel: ${tunnel.url}`);
// Register with gateway
if (options.gatewaySecret) {
const registered = await registerWithGateway(
options.gatewayUrl,
options.name,
tunnel.url,
options.gatewaySecret,
agentToken!,
);
if (registered) {
printCliLine(`registered with gateway as "${options.name}"`);
}
// Start heartbeat
const heartbeatTimer = startHeartbeat(
options.gatewayUrl,
options.name,
tunnel.url,
options.gatewaySecret,
agentToken!,
HEARTBEAT_INTERVAL_MS,
);
// Cleanup on exit
const cleanup = async () => {
clearInterval(heartbeatTimer);
printCliLine("unregistering from gateway...");
await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret);
tunnel.process.kill();
process.exit(0);
};
process.on("SIGINT", cleanup);
process.on("SIGTERM", cleanup);
} else {
printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration");
}
// Keep process alive
await new Promise(() => {});
@@ -0,0 +1,88 @@
import { printCliLine } from "../../cli-output.js";
type TunnelHandle = {
process: ReturnType<typeof Bun.spawn>;
url: string;
};
export async function startTunnel(port: number): Promise<TunnelHandle | null> {
const proc = Bun.spawn(["cloudflared", "tunnel", "--url", `http://localhost:${port}`], {
stdout: "pipe",
stderr: "pipe",
});
// cloudflared prints the URL to stderr
const reader = proc.stderr.getReader();
const decoder = new TextDecoder();
let buffer = "";
const deadline = Date.now() + 30_000;
while (Date.now() < deadline) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const match = buffer.match(/https:\/\/[a-z0-9-]+\.trycloudflare\.com/);
if (match) {
// Release the reader so stderr keeps flowing without backpressure
reader.releaseLock();
return { process: proc, url: match[0] };
}
}
reader.releaseLock();
proc.kill();
return null;
}
export async function registerWithGateway(
gatewayUrl: string,
name: string,
tunnelUrl: string,
secret: string,
agentToken: string,
): Promise<boolean> {
try {
const resp = await fetch(`${gatewayUrl}/api/gateway/register`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name, url: tunnelUrl, secret, agentToken }),
});
if (!resp.ok) {
const body = await resp.text();
printCliLine(`gateway registration failed: ${resp.status} ${body}`);
return false;
}
return true;
} catch (e) {
printCliLine(`gateway registration error: ${e}`);
return false;
}
}
export async function unregisterFromGateway(
gatewayUrl: string,
name: string,
secret: string,
): Promise<void> {
try {
await fetch(`${gatewayUrl}/api/gateway/register/${name}`, {
method: "DELETE",
headers: { Authorization: `Bearer ${secret}` },
});
} catch {
// Best effort — process is exiting
}
}
export function startHeartbeat(
gatewayUrl: string,
name: string,
tunnelUrl: string,
secret: string,
agentToken: string,
intervalMs: number,
): ReturnType<typeof setInterval> {
return setInterval(() => {
registerWithGateway(gatewayUrl, name, tunnelUrl, secret, agentToken).catch(() => {});
}, intervalMs);
}
@@ -1,4 +1,8 @@
export type ServeOptions = {
port: number;
hostname: string;
name: string;
noTunnel: boolean;
gatewayUrl: string;
gatewaySecret: string;
};
@@ -1,11 +1,11 @@
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import { prepareCasFork } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { buildForkPlan } from "@uncaged/workflow-execute";
import { generateUlid, getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { pathExists } from "../../fs-utils.js";
import { resolveThreadRecord } from "../../thread-scan.js";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
export async function cmdFork(
@@ -13,49 +13,51 @@ export async function cmdFork(
threadId: string,
fromRole: string | null,
): Promise<Result<{ threadId: string }, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return err(`thread data missing: ${threadId}`);
const bundlePath = join(storageRoot, "bundles", `${resolved.bundleHash}.esm.js`);
if (!(await pathExists(bundlePath))) {
return err(`bundle file missing for thread hash ${resolved.bundleHash}`);
}
const plan = buildForkPlan(text, fromRole);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const newThreadId = generateUlid(Date.now());
const plan = await prepareCasFork({
cas,
bundleDir: resolved.bundleDir,
bundleHash: resolved.bundleHash,
sourceThreadId: threadId,
headHash: resolved.head,
startHash: resolved.start,
newThreadId,
fromRole,
});
if (!plan.ok) {
return plan;
}
const bundlePath = join(storageRoot, "bundles", `${plan.value.hash}.esm.js`);
if (!(await pathExists(bundlePath))) {
return err(`bundle file missing for thread hash ${plan.value.hash}`);
}
const worker = await ensureWorkerForHash(storageRoot, plan.value.hash, bundlePath);
if (!worker.ok) {
return worker;
}
const newThreadId = generateUlid(Date.now());
const stepsOnWire = plan.value.historicalSteps.map((s) => ({
role: s.role,
contentHash: s.contentHash,
meta: s.meta,
refs: s.refs,
timestamp: s.timestamp,
}));
const p = plan.value;
const sent = await sendWorkerTcpCommand(
worker.value.port,
{
type: "run",
threadId: newThreadId,
workflowName: plan.value.workflowName,
prompt: plan.value.prompt,
options: plan.value.runOptions,
steps: stepsOnWire,
forkSourceThreadId: plan.value.sourceThreadId,
workflowName: p.workflowName,
prompt: p.prompt,
options: p.runOptions,
steps: p.steps,
stepTimestamps: p.stepTimestamps.length > 0 ? p.stepTimestamps : null,
forkSourceThreadId: threadId,
forkContinuation: p.forkContinuation,
},
{ awaitResponseLine: false },
);
+186 -119
View File
@@ -1,17 +1,26 @@
import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { mkdir, readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { tryParseRoleStepRecord, tryParseWorkflowResultRecord } from "@uncaged/workflow-execute";
import {
FORK_BRANCH_ROLE,
readThreadsIndex,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { dimGreyLine, highlightLiveRole } from "../../cli-color.js";
import { printCliError, printCliLine } from "../../cli-output.js";
import { pathExists } from "../../fs-utils.js";
import type { ParsedLiveArgv } from "../../live-argv.js";
import { findLatestThreadDataPath, resolveThreadDataPath } from "../../thread-scan.js";
import {
findLatestThreadBundleTarget,
type LatestThreadTarget,
resolveThreadRecord,
} from "../../thread-scan.js";
import type { LiveRoleRow } from "./types.js";
export const LIVE_CONTENT_MAX_LINES = 10;
@@ -49,16 +58,15 @@ function printSummary(result: WorkflowCompletion): void {
printCliLine(`completed: returnCode=${result.returnCode}${result.summary}`);
}
type LiveSessionState = {
sawStart: boolean;
completed: boolean;
type InfoLiveState = {
carry: string;
contentOffset: number;
};
type InfoLiveState = {
carry: string;
contentOffset: number;
type CasLiveState = {
printedHashes: Set<string>;
lastHead: string | null;
completionEmitted: boolean;
};
function tryParseInfoRecord(obj: Record<string, unknown>): {
@@ -80,102 +88,140 @@ function tryParseInfoRecord(obj: Record<string, unknown>): {
return { tag, content, timestamp };
}
async function handleJsonlLine(
rawLine: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> {
const trimmed = rawLine.trim();
if (trimmed === "") {
return { parseError: null, workflowResult: null };
function completionFromEndMeta(meta: Record<string, unknown>): WorkflowCompletion | null {
const returnCode = meta.returnCode;
const summary = meta.summary;
if (typeof returnCode !== "number" || typeof summary !== "string") {
return null;
}
return { returnCode, summary };
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
return { parseError: "invalid JSON in thread data file", workflowResult: null };
async function emitRoleStepPrint(params: {
cas: CasStore;
role: string;
contentHash: string;
meta: Record<string, unknown>;
timestamp: number;
roleFilter: string | null;
}): Promise<void> {
if (params.roleFilter !== null && params.role !== params.roleFilter) {
return;
}
if (rec === null || typeof rec !== "object") {
return { parseError: "invalid record in thread data file", workflowResult: null };
}
const obj = rec as Record<string, unknown>;
if (!state.sawStart) {
state.sawStart = true;
return { parseError: null, workflowResult: null };
}
const wf = tryParseWorkflowResultRecord(obj);
if (wf !== null) {
state.completed = true;
return { parseError: null, workflowResult: wf };
}
const roleRow = tryParseRoleStepRecord(obj);
if (roleRow === null) {
return {
parseError: "unrecognized record in thread data (expected role step or result)",
workflowResult: null,
};
}
if (roleFilter !== null && roleRow.role !== roleFilter) {
return { parseError: null, workflowResult: null };
}
const payload = await getContentMerklePayload(cas, roleRow.contentHash);
const payload = await getContentMerklePayload(params.cas, params.contentHash);
const content =
payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`;
payload !== null ? payload : `(content not in CAS; contentHash=${params.contentHash})`;
const row: LiveRoleRow = {
role: roleRow.role,
role: params.role,
content,
meta: roleRow.meta,
timestamp: roleRow.timestamp,
meta: params.meta,
timestamp: params.timestamp,
};
for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) {
printCliLine(outLine);
}
return { parseError: null, workflowResult: null };
}
async function pumpNewContent(
dataPath: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<number | null> {
let text: string;
async function emitStatesReachableFromHead(params: {
cas: CasStore;
headHash: string;
state: CasLiveState;
roleFilter: string | null;
}): Promise<WorkflowCompletion | null> {
const frames = await walkStateFramesNewestFirst(params.cas, params.headHash);
const chronological = [...frames].reverse();
for (const fr of chronological) {
if (params.state.printedHashes.has(fr.hash)) {
continue;
}
params.state.printedHashes.add(fr.hash);
const role = fr.payload.role;
if (role === FORK_BRANCH_ROLE) {
continue;
}
if (role === END) {
const wf = completionFromEndMeta(fr.payload.meta);
if (wf !== null) {
printSummary(wf);
return wf;
}
continue;
}
await emitRoleStepPrint({
cas: params.cas,
role,
contentHash: fr.payload.content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
roleFilter: params.roleFilter,
});
}
return null;
}
async function pumpThreadsJson(params: {
storageRoot: string;
bundleDir: string;
bundleHash: string;
threadId: string;
state: CasLiveState;
roleFilter: string | null;
cas: CasStore;
}): Promise<number | null> {
let idx: ThreadIndex;
try {
text = await readFile(dataPath, "utf8");
idx = await readThreadsIndex(params.bundleDir);
} catch {
return null;
idx = {};
}
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
const active = idx[params.threadId];
if (active === undefined) {
if (params.state.completionEmitted) {
return null;
}
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
if (hist === null || hist.source !== "history") {
return null;
}
params.state.completionEmitted = true;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: hist.head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
state.carry += chunk;
const head = active.head;
if (params.state.lastHead === null) {
params.state.lastHead = head;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
const parts = state.carry.split("\n");
state.carry = parts.pop() ?? "";
for (const line of parts) {
const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas);
if (parseError !== null) {
printCliError(parseError);
return 1;
}
if (workflowResult !== null) {
printSummary(workflowResult);
return 0;
}
if (head !== params.state.lastHead) {
params.state.lastHead = head;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
return null;
@@ -292,9 +338,9 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
schedulePump(path, pump);
});
watchers.push(watcher);
watcher.on("error", (err: Error) => {
watcher.on("error", (errObj: Error) => {
closeAll();
reject(err);
reject(errObj);
});
}
@@ -310,17 +356,14 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
});
}
type LiveThreadTarget = {
threadId: string;
dataPath: string;
};
type LiveThreadTarget = LatestThreadTarget;
async function resolveLiveThreadTarget(
storageRoot: string,
parsed: ParsedLiveArgv,
): Promise<LiveThreadTarget | null> {
if (parsed.latest) {
const found = await findLatestThreadDataPath(storageRoot);
const found = await findLatestThreadBundleTarget(storageRoot);
if (found === null) {
printCliError("live: no threads found");
return null;
@@ -333,36 +376,56 @@ async function resolveLiveThreadTarget(
printCliError("live: internal error: missing thread id");
return null;
}
const resolved = await resolveThreadDataPath(storageRoot, id);
const resolved = await resolveThreadRecord(storageRoot, id);
if (resolved === null) {
printCliError(`thread not found: ${id}`);
return null;
}
return { threadId: id, dataPath: resolved };
return {
threadId: id,
bundleHash: resolved.bundleHash,
bundleDir: resolved.bundleDir,
threadsJsonPath: join(resolved.bundleDir, "threads.json"),
};
}
async function buildLiveWatchTasks(params: {
dataPath: string;
infoPath: string;
storageRoot: string;
target: LiveThreadTarget;
debug: boolean;
dataState: LiveSessionState;
dataState: CasLiveState;
infoState: InfoLiveState;
roleFilter: string | null;
cas: CasStore;
}): Promise<WatchPumpTask[]> {
const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params;
const infoPath = join(
params.storageRoot,
"logs",
params.target.bundleHash,
`${params.target.threadId}.info.jsonl`,
);
const tasks: WatchPumpTask[] = [
{
path: dataPath,
pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas),
path: params.target.threadsJsonPath,
pump: () =>
pumpThreadsJson({
storageRoot: params.storageRoot,
bundleDir: params.target.bundleDir,
bundleHash: params.target.bundleHash,
threadId: params.target.threadId,
state: params.dataState,
roleFilter: params.roleFilter,
cas: params.cas,
}),
},
];
if (debug && (await pathExists(infoPath))) {
if (params.debug && (await pathExists(infoPath))) {
tasks.push({
path: infoPath,
pump: async () => {
await pumpNewInfoContent(infoPath, infoState);
await pumpNewInfoContent(infoPath, params.infoState);
return null;
},
});
@@ -377,16 +440,13 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
return 1;
}
const { threadId, dataPath } = target;
const roleFilter = parsed.role;
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const dataState: LiveSessionState = {
sawStart: false,
completed: false,
carry: "",
contentOffset: 0,
const dataState: CasLiveState = {
printedHashes: new Set<string>(),
lastHead: null,
completionEmitted: false,
};
const infoState: InfoLiveState = {
@@ -401,22 +461,29 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
process.on("SIGINT", onSigInt);
try {
const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas);
if (firstData === 1) {
return 1;
}
await mkdir(dirname(target.threadsJsonPath), { recursive: true });
const firstData = await pumpThreadsJson({
storageRoot,
bundleDir: target.bundleDir,
bundleHash: target.bundleHash,
threadId: target.threadId,
state: dataState,
roleFilter,
cas,
});
const infoPath = join(storageRoot, "logs", target.bundleHash, `${target.threadId}.info.jsonl`);
if (parsed.debug && (await pathExists(infoPath))) {
await pumpNewInfoContent(infoPath, infoState);
}
if (firstData === 0 || dataState.completed) {
if (firstData === 0) {
return 0;
}
const tasks = await buildLiveWatchTasks({
dataPath,
infoPath,
storageRoot,
target,
debug: parsed.debug,
dataState,
infoState,
+20 -10
View File
@@ -1,25 +1,35 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { join } from "node:path";
import {
garbageCollectCas,
removeThreadEntry,
removeThreadHistoryEntries,
} from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
export async function cmdThreadRemove(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
if (resolved.source === "active") {
await removeThreadEntry(resolved.bundleDir, threadId);
} else {
const hist = await removeThreadHistoryEntries(resolved.bundleDir, threadId);
if (!hist.ok) {
return hist;
}
}
const infoPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.info.jsonl`);
const runningPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.running`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
@@ -1,8 +1,8 @@
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { generateUlid } from "@uncaged/workflow-util";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,19 +1,49 @@
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
export async function cmdThreadShow(
storageRoot: string,
threadId: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return err(`thread data missing: ${threadId}`);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const steps: Array<{ role: string; hash: string; timestamp: number; content: string }> = [];
for (const fr of chronological) {
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
steps.push({
role: fr.payload.role,
hash: fr.hash,
timestamp: fr.payload.timestamp,
content:
payloadText !== null
? payloadText
: `(content not in CAS; contentHash=${fr.payload.content})`,
});
}
return ok(text.endsWith("\n") ? text.slice(0, -1) : text);
const payload = {
threadId: resolved.threadId,
bundleHash: resolved.bundleHash,
head: resolved.head,
start: resolved.start,
source: resolved.source,
steps,
};
return ok(JSON.stringify(payload, null, 2));
}
@@ -1,8 +1,7 @@
import { readFile, stat } from "node:fs/promises";
import { basename, resolve } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { hashWorkflowBundleBytes } from "@uncaged/workflow-cas";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
extractBundleExports,
readWorkflowRegistry,
+1 -1
View File
@@ -34,7 +34,7 @@ function parseFlagAt(argv: string[], index: number): Result<FlagOk, string> | nu
export function parseRunArgv(argv: string[]): Result<ParsedRunArgv, string> {
let name: string | undefined;
let prompt = "";
let maxRounds = 5;
let maxRounds = 10;
let i = 0;
const first = argv[0];
+26 -3
View File
@@ -70,8 +70,8 @@ function formatSkillCli(): string {
|---------|-------------|
| **Workflow** | A single-file ESM bundle (\`.esm.js\`) that exports \`run\` and \`descriptor\`. Identified by name and XXH64 hash. |
| **Bundle** | The physical \`.esm.js\` file stored in the bundles directory. Immutable once written. |
| **Thread** | A single execution of a workflow, identified by a ULID. Persists state as JSONL files. |
| **CAS** | Content-Addressable Storage. Per-thread key-value store keyed by content hash. |
| **Thread** | A single execution of a workflow, identified by a ULID. CAS state chain; \`threads.json\` for active; \`history/*.jsonl\` when done; \`.info.jsonl\` for debug logs. |
| **CAS** | Global content-addressable blob store (\`cas/\`), keyed by hash. |
| **Registry** | \`workflow.yaml\` — maps workflow names to their current and historical bundle hashes. |
## Commands
@@ -85,6 +85,12 @@ ${commandSections.join("\n\n")}
| \`run\` | \`thread run\` | Shortcut to start a thread |
| \`live\` | \`thread live\` | Shortcut to attach to a thread |
### serve
| Command | Args | Description |
|---------|------|-------------|
| \`serve\` | \`[--port N] [--host ADDR] [--name NAME]\` | Start HTTP API server with auto-tunnel. \`--name\` registers with the gateway. |
## Typical Workflow
1. \`uncaged-workflow workflow add my-wf ./my-wf.esm.js\` — register a workflow
@@ -92,6 +98,21 @@ ${commandSections.join("\n\n")}
3. \`uncaged-workflow live --latest\` — attach and watch output
4. \`uncaged-workflow thread show <thread-id>\` — inspect completed thread
## Thread Status
| Status | Meaning |
|--------|---------|
| \`running\` | Worker process is alive (\`.running\` marker + live PID) |
| \`active\` | In \`threads.json\` but not currently running (paused or waiting) |
| \`completed\` | Finished with \`returnCode === 0\` (has \`__end__\` frame in CAS) |
| \`failed\` | Finished with non-zero return code, or worker crashed (dead PID / no ctl) |
## Defaults
| Setting | CLI | HTTP API |
|---------|-----|----------|
| \`maxRounds\` | 10 | 10 |
## Exit Codes
| Code | Meaning |
@@ -103,7 +124,9 @@ ${commandSections.join("\n\n")}
| Variable | Description |
|----------|-------------|
| \`UNCAGED_WORKFLOW_STORAGE_ROOT\` | Override the default storage directory for all workflow data |
| \`WORKFLOW_STORAGE_ROOT\` | Override the default storage directory for all workflow data |
| \`UNCAGED_WORKFLOW_STORAGE_ROOT\` | Same as above (takes priority) |
| \`WORKFLOW_LLM_API_KEY\` | API key for LLM calls during workflow execution |
`;
}
+371 -93
View File
@@ -1,23 +1,90 @@
import { readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
import {
readThreadsIndex,
type ThreadHistoryEntry,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
import { readWorkerCtl } from "./worker-spawn.js";
function parseFirstJsonLineObject(text: string): Record<string, unknown> | null {
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
async function readWorkflowNameFromStartHash(
storageRoot: string,
startHash: string,
): Promise<string | null> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const yamlText = await cas.get(startHash);
if (yamlText === null) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
return parsed.node.payload.name;
}
async function listBundleHashDirs(storageRoot: string): Promise<string[]> {
const bundlesRoot = join(storageRoot, "bundles");
if (!(await pathExists(bundlesRoot))) {
return [];
}
return parsed as Record<string, unknown>;
const names = await readdir(bundlesRoot);
const out: string[] = [];
for (const name of names) {
const p = join(bundlesRoot, name);
try {
const st = await stat(p);
if (st.isDirectory()) {
out.push(name);
}
} catch {}
}
out.sort();
return out;
}
async function parseHistoryFile(path: string): Promise<ThreadHistoryEntry[]> {
const text = await readTextFileIfExists(path);
if (text === null) {
return [];
}
const out: ThreadHistoryEntry[] = [];
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
} catch {
continue;
}
if (raw === null || typeof raw !== "object") {
continue;
}
const rec = raw as Record<string, unknown>;
const threadId = rec.threadId;
const head = rec.head;
const start = rec.start;
const completedAt = rec.completedAt;
if (
typeof threadId !== "string" ||
typeof head !== "string" ||
typeof start !== "string" ||
typeof completedAt !== "number"
) {
continue;
}
out.push({ threadId, head, start, completedAt });
}
return out;
}
export type RunningThreadRow = {
@@ -30,32 +97,173 @@ export type HistoricalThreadRow = {
threadId: string;
hash: string;
workflowName: string | null;
/** Active entry from `threads.json` vs completed line from `history/*.jsonl`. */
source: "active" | "history";
/** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */
activityTs: number;
/** Current CAS head (`threads.json` / history row). */
head: string;
};
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
export type ResolvedThreadRecord = {
threadId: string;
bundleHash: string;
bundleDir: string;
head: string;
start: string;
source: "active" | "history";
};
/** Resolve a thread via `threads.json` (active) or `history/*.jsonl` (completed). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: scans all bundle dirs for thread id
export async function resolveThreadRecord(
storageRoot: string,
threadId: string,
): Promise<ResolvedThreadRecord | null> {
const hashes = await listBundleHashDirs(storageRoot);
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
const active = index[threadId];
if (active !== undefined) {
return {
threadId,
bundleHash,
bundleDir,
head: active.head,
start: active.start,
source: "active",
};
}
}
const parsed = parseFirstJsonLineObject(text);
if (parsed === null) {
return null;
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
if (e.threadId === threadId) {
return {
threadId,
bundleHash,
bundleDir,
head: e.head,
start: e.start,
source: "history",
};
}
}
}
}
const ts = parsed.timestamp;
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
return null;
}
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
export type ThreadHeadTerminal =
| { kind: "non-terminal" }
| { kind: "terminal"; returnCode: number };
/** True when the newest frame at `headHash` is `__end__` (workflow finished in CAS). */
export async function readThreadTerminalFromHead(
storageRoot: string,
headHash: string,
): Promise<ThreadHeadTerminal> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, headHash);
const newest = frames[0];
if (newest === undefined) {
return { kind: "non-terminal" };
}
const parsed = parseFirstJsonLineObject(text);
if (parsed === null) {
return null;
if (newest.payload.role !== END) {
return { kind: "non-terminal" };
}
const name = parsed.name;
return typeof name === "string" ? name : null;
const rc = newest.payload.meta.returnCode;
if (typeof rc !== "number") {
return { kind: "terminal", returnCode: 1 };
}
return { kind: "terminal", returnCode: rc };
}
export type ThreadListStatus = "running" | "active" | "completed" | "failed";
/** Combines `.running` marker with CAS head: stale markers do not imply `running`. */
export async function resolveThreadListStatus(
storageRoot: string,
row: HistoricalThreadRow,
runningMarkerPresent: boolean,
): Promise<ThreadListStatus> {
const terminal = await readThreadTerminalFromHead(storageRoot, row.head);
if (terminal.kind === "terminal") {
return terminal.returnCode !== 0 ? "failed" : "completed";
}
if (row.source === "history") {
return "completed";
}
if (runningMarkerPresent) {
const ctlResult = await readWorkerCtl(storageRoot, row.hash);
if (ctlResult.ok) {
try {
process.kill(ctlResult.value.pid, 0);
return "running";
} catch {
// Worker PID is dead but .running marker remains — crashed thread
return "failed";
}
}
return "running";
}
// No .running marker + no __end__ + source "active" → check if worker is dead (crashed)
const ctlResult = await readWorkerCtl(storageRoot, row.hash);
if (!ctlResult.ok) {
// No ctl file means worker never registered or was already cleaned up — dead thread
return "failed";
}
try {
process.kill(ctlResult.value.pid, 0);
} catch {
// Worker PID is dead, thread never finished — crashed
return "failed";
}
return "active";
}
async function appendRunningThreadRowIfLive(
storageRoot: string,
hash: string,
threadId: string,
out: RunningThreadRow[],
): Promise<void> {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null && resolved.bundleHash !== hash) {
return;
}
if (resolved !== null) {
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return;
}
}
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
@@ -82,9 +290,7 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
continue;
}
const threadId = fileName.slice(0, -".running".length);
const dataPath = join(dir, `${threadId}.data.jsonl`);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
out.push({ threadId, hash, workflowName });
await appendRunningThreadRowIfLive(storageRoot, hash, threadId, out);
}
}
@@ -98,41 +304,84 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
}
/**
* Historical threads discovered via `*.data.jsonl`.
* When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned.
* Threads discovered via `threads.json` (active) and `history/*.jsonl` (completed).
* When `workflowNameFilter` is non-null, only threads whose StartNode `name` matches are returned.
*/
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: merges active index + partitioned history
export async function listHistoricalThreads(
storageRoot: string,
workflowNameFilter: string | null,
): Promise<HistoricalThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const hashes = await listBundleHashDirs(storageRoot);
const seen = new Set<string>();
const out: HistoricalThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
entries = await readdir(dir);
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".data.jsonl")) {
for (const threadId of Object.keys(index)) {
const key = `${bundleHash}/${threadId}`;
if (seen.has(key)) {
continue;
}
const threadId = fileName.slice(0, -".data.jsonl".length);
const dataPath = join(dir, fileName);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
seen.add(key);
const entry = index[threadId];
if (entry === undefined) {
continue;
}
const workflowName = await readWorkflowNameFromStartHash(storageRoot, entry.start);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({ threadId, hash, workflowName });
out.push({
threadId,
hash: bundleHash,
workflowName,
source: "active",
activityTs: entry.updatedAt,
head: entry.head,
});
}
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
const key = `${bundleHash}/${e.threadId}`;
if (seen.has(key)) {
continue;
}
seen.add(key);
const workflowName = await readWorkflowNameFromStartHash(storageRoot, e.start);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({
threadId: e.threadId,
hash: bundleHash,
workflowName,
source: "history",
activityTs: e.completedAt,
head: e.head,
});
}
}
}
@@ -145,64 +394,93 @@ export async function listHistoricalThreads(
return out;
}
export type LatestThreadTarget = {
threadId: string;
bundleHash: string;
bundleDir: string;
threadsJsonPath: string;
};
/**
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
* falling back to file `mtime` when the timestamp is missing.
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
* Picks the newest thread by StartNode timestamp approximation (`updatedAt` active,
* else `completedAt` history), falling back to lexical thread id order.
*/
export async function findLatestThreadDataPath(
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: compares active heads vs history tails
export async function findLatestThreadBundleTarget(
storageRoot: string,
): Promise<{ threadId: string; dataPath: string } | null> {
const threads = await listHistoricalThreads(storageRoot, null);
if (threads.length === 0) {
return null;
}
): Promise<LatestThreadTarget | null> {
const hashes = await listBundleHashDirs(storageRoot);
let best: {
threadId: string;
dataPath: string;
primary: number;
secondary: number;
bundleHash: string;
bundleDir: string;
ts: number;
} | null = null;
for (const t of threads) {
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
let mtimeMs = 0;
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
const st = await stat(dataPath);
mtimeMs = st.mtimeMs;
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
const startTs = await readThreadStartTimestampMs(dataPath);
const primary = startTs !== null ? startTs : mtimeMs;
const secondary = mtimeMs;
if (
best === null ||
primary > best.primary ||
(primary === best.primary && secondary > best.secondary)
) {
best = { threadId: t.threadId, dataPath, primary, secondary };
for (const threadId of Object.keys(index)) {
const ent = index[threadId];
if (ent === undefined) {
continue;
}
const ts = ent.updatedAt;
const cand = { threadId, bundleHash, bundleDir, ts };
if (
best === null ||
cand.ts > best.ts ||
(cand.ts === best.ts &&
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
) {
best = cand;
}
}
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
const ts = e.completedAt;
const cand = { threadId: e.threadId, bundleHash, bundleDir, ts };
if (
best === null ||
cand.ts > best.ts ||
(cand.ts === best.ts &&
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
) {
best = cand;
}
}
}
}
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
}
export async function resolveThreadDataPath(
storageRoot: string,
threadId: string,
): Promise<string | null> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
if (best === null) {
return null;
}
const hashes = await readdir(logsRoot);
for (const hash of hashes) {
const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`);
if (await pathExists(candidate)) {
return candidate;
}
}
return null;
return {
threadId: best.threadId,
bundleHash: best.bundleHash,
bundleDir: best.bundleDir,
threadsJsonPath: join(best.bundleDir, "threads.json"),
};
}
+21 -3
View File
@@ -2,11 +2,11 @@ import { type ChildProcess, spawn } from "node:child_process";
import { mkdir, readdir, unlink, writeFile } from "node:fs/promises";
import { createConnection } from "node:net";
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
import { readThreadTerminalFromHead, resolveThreadRecord } from "./thread-scan.js";
export type WorkerCtl = {
pid: number;
@@ -270,7 +270,25 @@ export async function resolveRunningHashForThread(
if (!(await pathExists(logsRoot))) {
return err(`thread not running (no logs dir): ${threadId}`);
}
const hashes = await readdir(logsRoot);
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null) {
const runningPath = join(logsRoot, resolved.bundleHash, `${threadId}.running`);
if (!(await pathExists(runningPath))) {
return err(`thread not running: ${threadId}`);
}
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return err(`thread not running: ${threadId}`);
}
return ok(resolved.bundleHash);
}
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch {
return err(`thread not running: ${threadId}`);
}
for (const hash of hashes) {
const runningPath = join(logsRoot, hash, `${threadId}.running`);
if (await pathExists(runningPath)) {
-84
View File
@@ -1,84 +0,0 @@
const BASE = "/api";
async function postJson<T>(path: string, body: unknown): Promise<T> {
const res = await fetch(`${BASE}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!res.ok) {
const err = (await res.json().catch(() => ({ error: res.statusText }))) as { error: string };
throw new Error(err.error || `API ${res.status}`);
}
return res.json() as Promise<T>;
}
async function fetchJson<T>(path: string): Promise<T> {
const res = await fetch(`${BASE}${path}`);
if (!res.ok) {
throw new Error(`API ${res.status}: ${path}`);
}
return res.json() as Promise<T>;
}
export type WorkflowSummary = {
name: string;
currentHash: string;
versions: number;
};
export type ThreadSummary = {
threadId: string;
workflow: string | null;
hash: string | null;
startedAt: string | null;
status: string | null;
};
export type ThreadRecord = {
type: string;
role: string | null;
content: string | null;
timestamp: number | null;
[key: string]: unknown;
};
export function listWorkflows(): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson("/workflows");
}
export function listThreads(): Promise<{ threads: ThreadSummary[] }> {
return fetchJson("/threads");
}
export function listRunningThreads(): Promise<{ threads: ThreadSummary[] }> {
return fetchJson("/threads/running");
}
export function getThread(id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(`/threads/${id}`);
}
export function runThread(
workflow: string,
prompt: string,
maxRounds: number = 10,
): Promise<{ threadId: string }> {
return postJson("/threads", { workflow, prompt, maxRounds });
}
export function killThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/kill`, {});
}
export function pauseThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/pause`, {});
}
export function resumeThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/resume`, {});
}
export function getHealth(): Promise<{ ok: boolean }> {
return fetchJson("/healthz");
}
-38
View File
@@ -1,38 +0,0 @@
import { useState } from "react";
import { RunDialog } from "./components/run-dialog.tsx";
import { Sidebar } from "./components/sidebar.tsx";
import { StatusBar } from "./components/status-bar.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const { view, threadId, setView, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
return (
<div className="flex h-screen">
<Sidebar view={view} onViewChange={setView} />
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{view === "threads" && threadId === null && <ThreadList onSelect={setThreadId} />}
{view === "threads" && threadId !== null && (
<ThreadDetail threadId={threadId} onBack={() => setThreadId(null)} />
)}
{view === "workflows" && <WorkflowList />}
</div>
</main>
{showRun && (
<RunDialog
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
setThreadId(id);
}}
/>
)}
</div>
);
}
@@ -1,43 +0,0 @@
type Props = {
view: "threads" | "workflows";
onViewChange: (v: "threads" | "workflows") => void;
};
export function Sidebar({ view, onViewChange }: Props) {
const items = [
{ key: "threads" as const, label: "Threads", icon: "⚡" },
{ key: "workflows" as const, label: "Workflows", icon: "📦" },
];
return (
<aside
className="w-56 border-r flex flex-col"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="p-4 border-b" style={{ borderColor: "var(--color-border)" }}>
<h1 className="text-lg font-semibold" style={{ color: "var(--color-accent)" }}>
Workflow
</h1>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>
Dashboard
</p>
</div>
<nav className="flex-1 p-2 space-y-1">
{items.map((item) => (
<button
type="button"
key={item.key}
onClick={() => onViewChange(item.key)}
className="w-full text-left px-3 py-2 rounded text-sm transition-colors"
style={{
background: view === item.key ? "var(--color-accent-dim)" : "transparent",
color: view === item.key ? "#fff" : "var(--color-text-muted)",
}}
>
{item.icon} {item.label}
</button>
))}
</nav>
</aside>
);
}
-64
View File
@@ -1,64 +0,0 @@
import { useCallback, useEffect, useState } from "react";
type View = "threads" | "workflows";
type HashRoute = {
view: View;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
if (raw.startsWith("threads/")) {
const id = raw.slice("threads/".length);
if (id.length > 0) {
return { view: "threads", threadId: id };
}
}
if (raw === "workflows") {
return { view: "workflows", threadId: null };
}
return { view: "threads", threadId: null };
}
function buildHash(route: HashRoute): string {
if (route.view === "workflows") {
return "#workflows";
}
if (route.threadId !== null) {
return `#threads/${route.threadId}`;
}
return "#threads";
}
export function useHashRoute(): {
view: View;
threadId: string | null;
setView: (v: View) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
useEffect(() => {
function onHashChange(): void {
setRoute(parseHash(window.location.hash));
}
window.addEventListener("hashchange", onHashChange);
return () => window.removeEventListener("hashchange", onHashChange);
}, []);
const navigate = useCallback((next: HashRoute) => {
const hash = buildHash(next);
window.location.hash = hash;
setRoute(next);
}, []);
const setView = useCallback((v: View) => navigate({ view: v, threadId: null }), [navigate]);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", threadId: id }),
[navigate],
);
return { view: route.view, threadId: route.threadId, setView, setThreadId };
}
+3 -4
View File
@@ -7,10 +7,10 @@ The agent builds a full prompt (system + task + step history via `@uncaged/workf
## Install
```bash
bun add @uncaged/workflow-agent-cursor @uncaged/workflow @uncaged/workflow-util-agent zod
bun add @uncaged/workflow-agent-cursor @uncaged/workflow-runtime @uncaged/workflow-util-agent zod
```
In this monorepo: `"@uncaged/workflow-agent-cursor": "workspace:*"` plus `workspace:*` for `@uncaged/workflow` and `@uncaged/workflow-util-agent`.
In this monorepo: `"@uncaged/workflow-agent-cursor": "workspace:*"` plus `workspace:*` for `@uncaged/workflow-runtime` and `@uncaged/workflow-util-agent`, and `zod` ^4.
## Usage
@@ -28,9 +28,8 @@ const agent = createCursorAgent({
| Export | Description |
|--------|-------------|
| `createCursorAgent(config)` | Returns `AgentFn` that runs `cursor-agent` with `buildAgentPrompt(ctx)` |
| `createCursorAgent(config)` | Returns `AgentFn` that runs `cursor-agent` with `buildAgentPrompt(ctx)` from `@uncaged/workflow-util-agent` |
| `CursorAgentConfig` | `model`, `timeout`, `extract` (must supply workspace path) |
| `validateCursorAgentConfig` | Config validation result |
| `buildAgentPrompt` | Re-exported from `@uncaged/workflow-util-agent` |
Requires `cursor-agent` on `PATH` at runtime.
@@ -7,7 +7,11 @@ const testExtract: ExtractFn = async <T extends Record<string, unknown>>(
_schema: z.ZodType<T>,
_prompt: string,
_ctx: ExtractContext,
): Promise<T> => ({ workspace: "/tmp" }) as unknown as T;
): Promise<{ meta: T; contentPayload: string; refs: string[] }> => ({
meta: { workspace: "/tmp" } as unknown as T,
contentPayload: "",
refs: [],
});
describe("validateCursorAgentConfig", () => {
test("accepts valid config", () => {
+2 -2
View File
@@ -5,7 +5,6 @@ import * as z from "zod/v4";
import type { CursorAgentConfig } from "./types.js";
import { validateCursorAgentConfig } from "./validate-config.js";
export { buildAgentPrompt } from "@uncaged/workflow-util-agent";
export type { CursorAgentConfig } from "./types.js";
export { validateCursorAgentConfig } from "./validate-config.js";
@@ -49,11 +48,12 @@ export function createCursorAgent(config: CursorAgentConfig): AgentFn {
...ctx,
agentContent: "",
};
const { workspace } = await config.extract(
const extracted = await config.extract(
cursorWorkspaceSchema,
"From the thread context, determine the absolute filesystem path where the project/repository is located.",
extractCtx,
);
const { workspace } = extracted.meta;
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"-p",
+2 -3
View File
@@ -7,10 +7,10 @@ The agent composes the same thread-aware prompt as other CLI-backed agents via `
## Install
```bash
bun add @uncaged/workflow-agent-hermes @uncaged/workflow @uncaged/workflow-util-agent
bun add @uncaged/workflow-agent-hermes @uncaged/workflow-runtime @uncaged/workflow-util-agent
```
In this monorepo: use `workspace:*` for all three `@uncaged/*` packages.
In this monorepo: use `workspace:*` for `@uncaged/workflow-agent-hermes`, `@uncaged/workflow-runtime`, and `@uncaged/workflow-util-agent`.
## Usage
@@ -30,6 +30,5 @@ const agent = createHermesAgent({
| `createHermesAgent(config)` | Returns `AgentFn` wrapping `hermes chat -q ...` |
| `HermesAgentConfig` | `model`, `timeout` |
| `validateHermesAgentConfig` | Config validation result |
| `buildAgentPrompt` | Re-exported from `@uncaged/workflow-util-agent` |
Requires `hermes` on `PATH` at runtime.
@@ -6,7 +6,6 @@ import { validateHermesAgentConfig } from "./validate-config.js";
const HERMES_DEFAULT_MAX_TURNS = 90;
export { buildAgentPrompt } from "@uncaged/workflow-util-agent";
export type { HermesAgentConfig } from "./types.js";
export { validateHermesAgentConfig } from "./validate-config.js";
+3 -3
View File
@@ -1,16 +1,16 @@
# @uncaged/workflow-agent-llm
`AgentFn` adapter that calls an OpenAI-compatible `POST /chat/completions` endpoint using `@uncaged/workflow`’s `LlmProvider` (base URL, API key, model).
`AgentFn` adapter that calls an OpenAI-compatible `POST /chat/completions` endpoint using `LlmProvider` from `@uncaged/workflow-runtime`.
Single-turn: system text is the current role’s `systemPrompt`, user text is the thread’s initial prompt (`ctx.start.content`). Errors from HTTP, JSON, or empty choices are thrown as `Error` with a JSON payload string.
## Install
```bash
bun add @uncaged/workflow-agent-llm @uncaged/workflow
bun add @uncaged/workflow-agent-llm @uncaged/workflow-runtime zod
```
In this monorepo: `"@uncaged/workflow-agent-llm": "workspace:*"`, `"@uncaged/workflow": "workspace:*"`.
In this monorepo: `"@uncaged/workflow-agent-llm": "workspace:*"`, `"@uncaged/workflow-runtime": "workspace:*"` (and satisfy `zod` ^4 as required by `@uncaged/workflow-runtime`).
## Usage
+31
View File
@@ -0,0 +1,31 @@
# @uncaged/workflow-cas
Content-addressable storage implementation, bundle hashing, and Merkle helpers.
## What This Package Does
It implements `CasStore` from `@uncaged/workflow-protocol`, hashes workflow bundle bytes and strings with XXH64, and builds serializable Merkle nodes for thread/step/content payloads used when persisting execution artifacts.
## Key Exports
From `src/index.ts`:
- **CAS:** `createCasStore`
- **Hash:** `hashString`, `hashWorkflowBundleBytes`
- **Merkle:** `createContentMerkleNode`, `getContentMerklePayload`, `parseMerkleNode`, `putContentMerkleNode`, `putStepMerkleNode`, `putThreadMerkleNode`, `serializeMerkleNode`
- **Types:** `CasStore`, `MerkleNode`, `MerkleNodeType`, `StepMerklePayload`, `ThreadMerklePayload`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol` (`CasStore` contract), `@uncaged/workflow-util`
- **npm:** `xxhashjs`, `yaml`
## Usage
```typescript
import { createCasStore, hashWorkflowBundleBytes } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
const store = createCasStore(getGlobalCasDir());
const hash = await hashWorkflowBundleBytes(esmJsBytes);
```
@@ -0,0 +1,65 @@
import { describe, expect, test } from "bun:test";
import type { StateNode } from "@uncaged/workflow-protocol";
import { collectRefs } from "../src/collect-refs.js";
function payload(
partial: Partial<StateNode["payload"]> & Pick<StateNode["payload"], "role">,
): StateNode["payload"] {
return {
role: partial.role,
meta: partial.meta ?? {},
start: partial.start ?? "STARTHASH000000000000001",
content: partial.content ?? "CONTENTHASH00000000000001",
ancestors: partial.ancestors ?? [],
compact: partial.compact ?? null,
timestamp: partial.timestamp ?? 0,
};
}
describe("collectRefs", () => {
test("collects start, content, ancestors, and compact hashes in order", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "01START00000000000000001",
content: "01CONTENT0000000000000001",
ancestors: ["01PARENT0000000000000001", "01GRAND000000000000000001"],
compact: "01COMPACT0000000000000001",
}),
);
expect(refs).toEqual([
"01START00000000000000001",
"01CONTENT0000000000000001",
"01PARENT0000000000000001",
"01GRAND000000000000000001",
"01COMPACT0000000000000001",
]);
});
test("does not collect compact when compact is null", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "S1",
content: "C1",
ancestors: ["A1"],
compact: null,
}),
);
expect(refs).toEqual(["S1", "C1", "A1"]);
});
test("returns only start and content when ancestors is empty", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "S2",
content: "C2",
ancestors: [],
compact: null,
}),
);
expect(refs).toEqual(["S2", "C2"]);
});
});
@@ -0,0 +1,69 @@
import { describe, expect, test } from "bun:test";
import type { CasStore } from "@uncaged/workflow-protocol";
import { stringify } from "yaml";
import { findReachableHashes } from "../src/reachable.js";
function yamlBlob(refs: readonly string[]): string {
return stringify({ type: "node", payload: {}, refs: [...refs] }, { indent: 2 });
}
function memoryCas(entries: Record<string, string>): CasStore {
const map = { ...entries };
return {
async put(): Promise<string> {
throw new Error("memoryCas.put not used in tests");
},
async get(hash: string): Promise<string | null> {
return map[hash] ?? null;
},
async delete(): Promise<void> {},
async list(): Promise<string[]> {
return Object.keys(map);
},
};
}
describe("findReachableHashes", () => {
test("walks refs recursively from a single root", async () => {
const cas = memoryCas({
R1: yamlBlob(["R2"]),
R2: yamlBlob(["R3"]),
R3: yamlBlob([]),
});
const reachable = await findReachableHashes(["R1"], cas);
expect([...reachable].sort()).toEqual(["R1", "R2", "R3"]);
});
test("union of reachability from multiple roots", async () => {
const cas = memoryCas({
A: yamlBlob(["X"]),
B: yamlBlob(["Y"]),
X: yamlBlob([]),
Y: yamlBlob(["Z"]),
Z: yamlBlob([]),
});
const reachable = await findReachableHashes(["A", "B"], cas);
expect([...reachable].sort()).toEqual(["A", "B", "X", "Y", "Z"]);
});
test("handles cycles via visited set", async () => {
const cas = memoryCas({
C1: yamlBlob(["C2"]),
C2: yamlBlob(["C1"]),
});
const reachable = await findReachableHashes(["C1"], cas);
expect(reachable.size).toBe(2);
expect(reachable.has("C1")).toBe(true);
expect(reachable.has("C2")).toBe(true);
});
test("does not throw when a ref points to a missing blob", async () => {
const cas = memoryCas({
H1: yamlBlob(["MISSINGHASH0000000000001"]),
});
const reachable = await findReachableHashes(["H1"], cas);
expect(reachable.has("H1")).toBe(true);
expect(reachable.has("MISSINGHASH0000000000001")).toBe(false);
});
});
+3
View File
@@ -2,6 +2,9 @@
"name": "@uncaged/workflow-cas",
"version": "0.1.0",
"type": "module",
"scripts": {
"test": "bun test"
},
"exports": {
".": {
"types": "./dist/index.d.ts",
+4
View File
@@ -3,10 +3,14 @@ import { join } from "node:path";
import { hashString } from "./hash.js";
import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "./merkle.js";
import { isCasNodeYaml } from "./nodes.js";
import type { CasStore } from "./types.js";
/** Raw strings become content merkle YAML; already-valid merkle documents pass through. */
function normalizeCasPutContent(content: string): string {
if (isCasNodeYaml(content)) {
return content;
}
try {
parseMerkleNode(content);
return content;
+13
View File
@@ -0,0 +1,13 @@
import type { StateNode } from "@uncaged/workflow-protocol";
/** Collects CAS hashes from {@link StateNode} payload fields for GC `refs[]` derivation. */
export function collectRefs(payload: StateNode["payload"]): string[] {
const out: string[] = [payload.start, payload.content];
for (const h of payload.ancestors) {
out.push(h);
}
if (payload.compact !== null) {
out.push(payload.compact);
}
return out;
}
+1 -3
View File
@@ -1,8 +1,6 @@
import { Buffer } from "node:buffer";
import XXH from "xxhashjs";
import { encodeUint64AsCrockford } from "@uncaged/workflow-util";
import XXH from "xxhashjs";
function digestToUint64(digest: { toString(radix?: number): string }): bigint {
const hex = digest.toString(16).padStart(16, "0");
+11
View File
@@ -1,4 +1,5 @@
export { createCasStore } from "./cas.js";
export { collectRefs } from "./collect-refs.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
createContentMerkleNode,
@@ -9,6 +10,16 @@ export {
putThreadMerkleNode,
serializeMerkleNode,
} from "./merkle.js";
export type { ParsedCasThreadNode } from "./nodes.js";
export {
isCasNodeYaml,
parseCasThreadNode,
putContentNodeWithRefs,
putStartNode,
putStateNode,
serializeCasNode,
} from "./nodes.js";
export { findReachableHashes } from "./reachable.js";
export type {
CasStore,
MerkleNode,
+53 -16
View File
@@ -1,8 +1,41 @@
import { parse, stringify } from "yaml";
import type { CasStore, MerkleNode, StepMerklePayload, ThreadMerklePayload } from "./types.js";
import type {
CasStore,
MerkleNode,
MerkleNodeType,
StepMerklePayload,
ThreadMerklePayload,
} from "./types.js";
function requireStringHashArray(value: unknown, notArrayMessage: string): string[] {
if (!Array.isArray(value)) {
throw new Error(notArrayMessage);
}
const out: string[] = [];
for (const c of value) {
if (typeof c !== "string") {
throw new Error("merkle: hash entry must be a string");
}
out.push(c);
}
return out;
}
function edgeListRaw(rec: Record<string, unknown>, type: MerkleNodeType): unknown {
if (type === "content") {
return rec.refs !== undefined ? rec.refs : rec.children;
}
return rec.children;
}
export function serializeMerkleNode(node: MerkleNode): string {
if (node.type === "content") {
return stringify(
{ type: node.type, payload: node.payload, refs: node.children },
{ indent: 2 },
);
}
return stringify(
{ type: node.type, payload: node.payload, children: node.children },
{ indent: 2 },
@@ -17,23 +50,18 @@ export function parseMerkleNode(yamlText: string): MerkleNode {
const rec = raw as Record<string, unknown>;
const type = rec.type;
const payload = rec.payload;
const children = rec.children;
if (type !== "content" && type !== "step" && type !== "thread") {
throw new Error("merkle: invalid or missing type");
}
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
throw new Error("merkle: payload must be a string or object");
}
if (!Array.isArray(children)) {
throw new Error("merkle: children must be an array");
}
const childHashes: string[] = [];
for (const c of children) {
if (typeof c !== "string") {
throw new Error("merkle: child hash must be a string");
}
childHashes.push(c);
}
const notArrayMsg =
type === "content"
? "merkle: content node requires refs or children array"
: "merkle: children must be an array";
const childHashes = requireStringHashArray(edgeListRaw(rec, type), notArrayMsg);
return {
type,
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
@@ -82,7 +110,12 @@ export async function putContentMerkleNode(store: CasStore, content: string): Pr
return store.put(content);
}
/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */
/**
* Loads a CAS blob and returns the payload string for a `content` node.
*
* Accepts both the legacy `{ type:content, payload, children }` Merkle layout
* and the RFC-aligned `{ type:content, payload, refs }` content node layout.
*/
export async function getContentMerklePayload(
store: CasStore,
hash: string,
@@ -91,9 +124,13 @@ export async function getContentMerklePayload(
if (yamlText === null) {
return null;
}
const node = parseMerkleNode(yamlText);
if (node.type !== "content" || typeof node.payload !== "string") {
const raw = parse(yamlText) as unknown;
if (raw === null || typeof raw !== "object") {
return null;
}
return node.payload;
const rec = raw as Record<string, unknown>;
if (rec.type !== "content" || typeof rec.payload !== "string") {
return null;
}
return rec.payload;
}
+178
View File
@@ -0,0 +1,178 @@
import type {
ContentMerkleNode,
StartNode,
StartNodePayload,
StateNode,
StateNodePayload,
} from "@uncaged/workflow-protocol";
import { parse, stringify } from "yaml";
import { collectRefs } from "./collect-refs.js";
import type { CasStore } from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isStartPayload(value: unknown): value is StartNodePayload {
if (!isRecord(value)) {
return false;
}
return (
typeof value.name === "string" &&
typeof value.hash === "string" &&
typeof value.maxRounds === "number" &&
typeof value.depth === "number"
);
}
function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(value)) {
return false;
}
const compact = value.compact;
if (!(compact === null || typeof compact === "string")) {
return false;
}
const ancestors = value.ancestors;
if (!Array.isArray(ancestors) || !ancestors.every((h) => typeof h === "string")) {
return false;
}
const meta = value.meta;
if (!isRecord(meta)) {
return false;
}
return (
typeof value.role === "string" &&
typeof value.start === "string" &&
typeof value.content === "string" &&
typeof value.timestamp === "number"
);
}
/** Parses a YAML CAS blob into a typed RFC v3 thread node (or legacy content layout with `children`). */
export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null {
let raw: unknown;
try {
raw = parse(yamlText) as unknown;
} catch {
return null;
}
if (!isRecord(raw)) {
return null;
}
const type = raw.type;
if (type !== "start" && type !== "state" && type !== "content") {
return null;
}
let refsRaw: unknown = raw.refs;
if (refsRaw === undefined && type === "content") {
refsRaw = raw.children;
}
if (!Array.isArray(refsRaw) || !refsRaw.every((r) => typeof r === "string")) {
return null;
}
const refs = refsRaw as string[];
if (type === "content") {
if (typeof raw.payload !== "string") {
return null;
}
const node: ContentMerkleNode = { type: "content", payload: raw.payload, refs: [...refs] };
return { kind: "content", node };
}
if (type === "start") {
if (!isStartPayload(raw.payload)) {
return null;
}
const node: StartNode = { type: "start", payload: raw.payload, refs: [...refs] };
return { kind: "start", node };
}
if (!isStatePayload(raw.payload)) {
return null;
}
const node: StateNode = { type: "state", payload: raw.payload, refs: [...refs] };
return { kind: "state", node };
}
export type ParsedCasThreadNode =
| { kind: "start"; node: StartNode }
| { kind: "state"; node: StateNode }
| { kind: "content"; node: ContentMerkleNode };
/** YAML-serialize a CAS node carrying `{type, payload, refs}` (RFC v3 thread storage format). */
export function serializeCasNode(node: StartNode | StateNode | ContentMerkleNode): string {
return stringify({ type: node.type, payload: node.payload, refs: node.refs }, { indent: 2 });
}
/**
* Recognizes a YAML CAS blob with the `{type, payload, refs[]}` shape used by
* `start` / `state` / `content` thread nodes. Used by {@link createCasStore}
* to skip the legacy auto-wrap step when the caller already supplied a
* pre-serialized RFC v3 node.
*/
export function isCasNodeYaml(content: string): boolean {
let raw: unknown;
try {
raw = parse(content) as unknown;
} catch {
return false;
}
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return false;
}
const rec = raw as Record<string, unknown>;
if (typeof rec.type !== "string") {
return false;
}
if (!Array.isArray(rec.refs)) {
return false;
}
for (const r of rec.refs) {
if (typeof r !== "string") {
return false;
}
}
return true;
}
export async function putStartNode(
store: CasStore,
payload: StartNode["payload"],
promptHash: string,
): Promise<string> {
const node: StartNode = {
type: "start",
payload,
refs: [promptHash],
};
return store.put(serializeCasNode(node));
}
export async function putStateNode(
store: CasStore,
payload: StateNode["payload"],
): Promise<string> {
const node: StateNode = {
type: "state",
payload,
refs: collectRefs(payload),
};
return store.put(serializeCasNode(node));
}
export async function putContentNodeWithRefs(
store: CasStore,
payload: string,
refs: readonly string[],
): Promise<string> {
const node: ContentMerkleNode = {
type: "content",
payload,
refs: [...refs],
};
return store.put(serializeCasNode(node));
}
+58
View File
@@ -0,0 +1,58 @@
import { parse } from "yaml";
import type { CasStore } from "./types.js";
function refsFromBlob(content: string): string[] {
try {
const raw = parse(content) as unknown;
if (raw === null || typeof raw !== "object") {
return [];
}
const rec = raw as Record<string, unknown>;
let refs = rec.refs;
if (!Array.isArray(refs) && Array.isArray(rec.children)) {
refs = rec.children;
}
if (!Array.isArray(refs)) {
return [];
}
const out: string[] = [];
for (const r of refs) {
if (typeof r === "string") {
out.push(r);
}
}
return out;
} catch {
return [];
}
}
/** Recursively collects all CAS hashes reachable from `roots` via each blob's `refs[]`. */
export async function findReachableHashes(
roots: readonly string[],
cas: CasStore,
): Promise<ReadonlySet<string>> {
const visited = new Set<string>();
const stack = [...roots];
while (stack.length > 0) {
const hash = stack.pop();
if (hash === undefined) {
break;
}
if (visited.has(hash)) {
continue;
}
const blob = await cas.get(hash);
if (blob === null) {
continue;
}
visited.add(hash);
for (const ref of refsFromBlob(blob)) {
if (!visited.has(ref)) {
stack.push(ref);
}
}
}
return visited;
}
+1 -4
View File
@@ -5,8 +5,5 @@
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" }
]
"references": [{ "path": "../workflow-protocol" }, { "path": "../workflow-util" }]
}
@@ -0,0 +1 @@
VITE_GATEWAY_URL=https://workflow-gateway.shazhou.workers.dev
@@ -10,7 +10,9 @@
},
"dependencies": {
"react": "^19.2.6",
"react-dom": "^19.2.6"
"react-dom": "^19.2.6",
"react-markdown": "^10.1.0",
"shiki": "^4.0.2"
},
"devDependencies": {
"@tailwindcss/vite": "^4.2.4",
+155
View File
@@ -0,0 +1,155 @@
const GATEWAY_URL = import.meta.env.VITE_GATEWAY_URL || "";
export function getApiKey(): string | null {
try {
return localStorage.getItem("workflow-api-key");
} catch {
return null;
}
}
export function setApiKey(key: string): void {
localStorage.setItem("workflow-api-key", key);
}
export function clearApiKey(): void {
localStorage.removeItem("workflow-api-key");
}
export function hasApiKey(): boolean {
return getApiKey() !== null && getApiKey() !== "";
}
function authHeaders(): Record<string, string> {
const key = getApiKey();
if (key) return { Authorization: `Bearer ${key}` };
return {};
}
function agentBase(agent: string): string {
if (GATEWAY_URL) {
return `${GATEWAY_URL}/api/agents/${agent}`;
}
// Local dev: proxy via vite, no agent prefix
return "/api";
}
async function postJson<T>(base: string, path: string, body: unknown): Promise<T> {
const res = await fetch(`${base}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json", ...authHeaders() },
body: JSON.stringify(body),
});
if (!res.ok) {
const err = (await res.json().catch(() => ({ error: res.statusText }))) as { error: string };
throw new Error(err.error || `API ${res.status}`);
}
return res.json() as Promise<T>;
}
async function fetchJson<T>(base: string, path: string): Promise<T> {
const res = await fetch(`${base}${path}`, { headers: authHeaders() });
if (!res.ok) {
throw new Error(`API ${res.status}: ${path}`);
}
return res.json() as Promise<T>;
}
// ── Endpoint types ──────────────────────────────────────────────────
export type AgentEndpoint = {
name: string;
url: string;
status: string;
lastHeartbeat: number;
};
export type WorkflowSummary = {
name: string;
currentHash: string;
versions: number;
};
export type ThreadSummary = {
threadId: string;
workflow: string | null;
hash: string | null;
startedAt: string | null;
status: string | null;
};
export type ThreadStartRecord = {
type: "thread-start";
workflow: string;
prompt: string | null;
threadId: string;
status: string;
timestamp: null;
};
export type RoleRecord = {
type: "role";
role: string;
content: string;
timestamp: number | null;
meta: Record<string, unknown>;
};
export type WorkflowResultRecord = {
type: "workflow-result";
returnCode: number;
content: string;
timestamp: number | null;
};
export type ThreadRecord = ThreadStartRecord | RoleRecord | WorkflowResultRecord;
// ── Gateway endpoints ───────────────────────────────────────────────
export function listAgents(): Promise<AgentEndpoint[]> {
const url = GATEWAY_URL || "";
return fetchJson(url, "/api/gateway/endpoints");
}
// ── Agent-scoped endpoints ──────────────────────────────────────────
export function listWorkflows(agent: string): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson(agentBase(agent), "/workflows");
}
export function listThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads");
}
export function listRunningThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads/running");
}
export function getThread(agent: string, id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(agentBase(agent), `/threads/${id}`);
}
export function runThread(
agent: string,
workflow: string,
prompt: string,
maxRounds: number = 10,
): Promise<{ threadId: string }> {
return postJson(agentBase(agent), "/threads", { workflow, prompt, maxRounds });
}
export function killThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/kill`, {});
}
export function pauseThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/pause`, {});
}
export function resumeThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/resume`, {});
}
export function getAgentHealth(agent: string): Promise<{ ok: boolean }> {
return fetchJson(agentBase(agent), "/healthz");
}
+64
View File
@@ -0,0 +1,64 @@
import { useState } from "react";
import { hasApiKey, clearApiKey } from "./api.ts";
import { LoginPage } from "./components/login.tsx";
import { RunDialog } from "./components/run-dialog.tsx";
import { Sidebar } from "./components/sidebar.tsx";
import { StatusBar } from "./components/status-bar.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const [authed, setAuthed] = useState(hasApiKey());
const { view, agent, threadId, setView, setAgent, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
if (!authed) {
return <LoginPage onLogin={() => setAuthed(true)} />;
}
return (
<div className="flex h-screen">
<Sidebar
view={view}
agent={agent}
onViewChange={setView}
onAgentChange={setAgent}
onLogout={() => {
clearApiKey();
setAuthed(false);
}}
/>
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar agent={agent} onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{!agent && (
<div className="flex items-center justify-center h-full">
<p style={{ color: "var(--color-text-muted)" }}>
Select an agent from the sidebar to get started.
</p>
</div>
)}
{agent && view === "threads" && threadId === null && (
<ThreadList agent={agent} onSelect={setThreadId} />
)}
{agent && view === "threads" && threadId !== null && (
<ThreadDetail agent={agent} threadId={threadId} onBack={() => setThreadId(null)} />
)}
{agent && view === "workflows" && <WorkflowList agent={agent} />}
</div>
</main>
{showRun && agent && (
<RunDialog
agent={agent}
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
setThreadId(id);
}}
/>
)}
</div>
);
}
@@ -0,0 +1,96 @@
import { useState } from "react";
import { setApiKey } from "../api.ts";
type Props = {
onLogin: () => void;
};
export function LoginPage({ onLogin }: Props) {
const [key, setKey] = useState("");
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!key.trim()) return;
setLoading(true);
setError(null);
// Test the key by hitting the endpoints list
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
try {
const res = await fetch(`${gatewayUrl}/endpoints`, {
headers: { Authorization: `Bearer ${key.trim()}` },
});
if (res.status === 401) {
setError("Invalid API key");
setLoading(false);
return;
}
if (!res.ok) {
setError(`Server error: ${res.status}`);
setLoading(false);
return;
}
} catch (err) {
setError(`Connection failed: ${err instanceof Error ? err.message : String(err)}`);
setLoading(false);
return;
}
setApiKey(key.trim());
onLogin();
}
return (
<div
className="min-h-screen flex items-center justify-center"
style={{ background: "var(--color-bg)" }}
>
<div
className="p-8 rounded-lg border w-full max-w-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h1 className="text-xl font-bold mb-1" style={{ color: "var(--color-accent)" }}>
Workflow Dashboard
</h1>
<p className="text-sm mb-6" style={{ color: "var(--color-text-muted)" }}>
Enter your API key to continue
</p>
<form onSubmit={handleSubmit}>
<input
type="password"
value={key}
onChange={(e) => setKey(e.target.value)}
placeholder="API Key"
className="w-full px-3 py-2 rounded border text-sm mb-3 outline-none"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-border)",
color: "var(--color-text)",
}}
autoFocus
/>
{error && (
<p className="text-xs mb-3" style={{ color: "var(--color-error)" }}>
{error}
</p>
)}
<button
type="submit"
disabled={loading || !key.trim()}
className="w-full px-3 py-2 rounded text-sm font-medium"
style={{
background: "var(--color-accent)",
color: "var(--color-bg)",
opacity: loading || !key.trim() ? 0.5 : 1,
}}
>
{loading ? "Verifying..." : "Login"}
</button>
</form>
</div>
</div>
);
}
@@ -0,0 +1,127 @@
import ReactMarkdown from "react-markdown";
import { useEffect, useState } from "react";
import {
createHighlighter,
type HighlighterGeneric,
type BundledLanguage,
type BundledTheme,
} from "shiki";
let highlighterPromise: Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> | null = null;
const LANGS: BundledLanguage[] = [
"typescript",
"javascript",
"json",
"yaml",
"bash",
"python",
"markdown",
];
function getHighlighter(): Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> {
if (highlighterPromise === null) {
highlighterPromise = createHighlighter({
themes: ["github-dark"],
langs: LANGS,
});
}
return highlighterPromise;
}
function CodeBlock({ className, children }: { className?: string; children?: React.ReactNode }) {
const [html, setHtml] = useState<string | null>(null);
const code = String(children).replace(/\n$/, "");
const lang = className?.replace("language-", "") ?? "text";
useEffect(() => {
let cancelled = false;
getHighlighter().then((hl) => {
if (cancelled) return;
try {
const result = hl.codeToHtml(code, { lang, theme: "github-dark" });
setHtml(result);
} catch {
setHtml(null);
}
});
return () => {
cancelled = true;
};
}, [code, lang]);
if (html !== null) {
return (
<div
className="rounded overflow-x-auto text-xs my-2"
// biome-ignore lint/security/noDangerouslySetInnerHtml: shiki output is safe
dangerouslySetInnerHTML={{ __html: html }}
/>
);
}
return (
<pre
className="rounded overflow-x-auto text-xs my-2 p-3"
style={{ background: "var(--color-bg)" }}
>
<code>{code}</code>
</pre>
);
}
export function Markdown({ content }: { content: string }) {
return (
<div className="prose prose-invert prose-sm max-w-none">
<ReactMarkdown
components={{
code({ className, children, ...props }) {
const isInline = !className;
if (isInline) {
return (
<code
className="text-xs px-1 py-0.5 rounded"
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
{...props}
>
{children}
</code>
);
}
return <CodeBlock className={className}>{children}</CodeBlock>;
},
p({ children }) {
return <p className="my-1.5 leading-relaxed">{children}</p>;
},
ul({ children }) {
return <ul className="list-disc pl-4 my-1.5">{children}</ul>;
},
ol({ children }) {
return <ol className="list-decimal pl-4 my-1.5">{children}</ol>;
},
h1({ children }) {
return <h1 className="text-lg font-bold mt-3 mb-1">{children}</h1>;
},
h2({ children }) {
return <h2 className="text-base font-bold mt-2 mb-1">{children}</h2>;
},
h3({ children }) {
return <h3 className="text-sm font-bold mt-2 mb-1">{children}</h3>;
},
blockquote({ children }) {
return (
<blockquote
className="border-l-2 pl-3 my-2 text-sm"
style={{ borderColor: "var(--color-accent)", color: "var(--color-text-muted)" }}
>
{children}
</blockquote>
);
},
}}
>
{content}
</ReactMarkdown>
</div>
);
}
@@ -0,0 +1,126 @@
import type { ThreadStartRecord, RoleRecord, WorkflowResultRecord, ThreadRecord } from "../api.ts";
import { Markdown } from "./markdown.tsx";
const ROLE_COLORS: Record<string, string> = {
preparer: "#8b5cf6",
agent: "#3b82f6",
extractor: "#f59e0b",
};
function roleColor(role: string): string {
return ROLE_COLORS[role] ?? "var(--color-accent)";
}
function formatTime(ts: number | null): string | null {
if (ts === null) return null;
return new Date(ts).toLocaleTimeString();
}
function StartCard({ record }: { record: ThreadStartRecord }) {
return (
<div
className="p-4 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-2">
<span className="text-lg">🚀</span>
<span className="font-semibold" style={{ color: "var(--color-accent)" }}>
{record.workflow}
</span>
<span
className="text-xs px-2 py-0.5 rounded"
style={{
background: record.status === "active" ? "var(--color-success)" : "var(--color-border)",
color: record.status === "active" ? "var(--color-bg)" : "var(--color-text-muted)",
}}
>
{record.status}
</span>
</div>
{record.prompt !== null && (
<div
className="mt-2 p-3 rounded text-sm border-l-2"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-accent)",
color: "var(--color-text)",
}}
>
<div className="text-xs mb-1" style={{ color: "var(--color-text-muted)" }}>
Prompt
</div>
<Markdown content={record.prompt} />
</div>
)}
</div>
);
}
function RoleMessage({ record }: { record: RoleRecord }) {
const color = roleColor(record.role);
return (
<div
className="p-3 rounded-lg border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-2">
<span
className="text-xs px-2 py-0.5 rounded font-mono font-medium"
style={{ background: color, color: "#fff" }}
>
{record.role}
</span>
{formatTime(record.timestamp) !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{formatTime(record.timestamp)}
</span>
)}
</div>
<Markdown content={record.content} />
</div>
);
}
function ResultCard({ record }: { record: WorkflowResultRecord }) {
const success = record.returnCode === 0;
return (
<div
className="p-4 rounded-lg border"
style={{
background: "var(--color-surface)",
borderColor: success ? "var(--color-success)" : "var(--color-error)",
}}
>
<div className="flex items-center gap-2 mb-2">
<span className="text-lg">{success ? "✅" : "❌"}</span>
<span className="font-semibold text-sm">{success ? "Completed" : "Failed"}</span>
<span
className="text-xs px-2 py-0.5 rounded font-mono"
style={{
background: success ? "var(--color-success)" : "var(--color-error)",
color: "#fff",
}}
>
exit {record.returnCode}
</span>
{formatTime(record.timestamp) !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{formatTime(record.timestamp)}
</span>
)}
</div>
<Markdown content={record.content} />
</div>
);
}
export function RecordCard({ record }: { record: ThreadRecord }) {
switch (record.type) {
case "thread-start":
return <StartCard record={record} />;
case "role":
return <RoleMessage record={record} />;
case "workflow-result":
return <ResultCard record={record} />;
}
}
@@ -3,12 +3,13 @@ import { listWorkflows, runThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
onClose: () => void;
onCreated: (threadId: string) => void;
};
export function RunDialog({ onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(), []);
export function RunDialog({ agent, onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(agent), [agent]);
const [workflow, setWorkflow] = useState("");
const [prompt, setPrompt] = useState("");
const [maxRounds, setMaxRounds] = useState(10);
@@ -21,7 +22,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
setSubmitting(true);
setError(null);
try {
const result = await runThread(workflow, prompt, maxRounds);
const result = await runThread(agent, workflow, prompt, maxRounds);
onCreated(result.threadId);
} catch (err) {
setError(err instanceof Error ? err.message : String(err));
@@ -38,7 +39,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
className="w-full max-w-lg p-6 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h3 className="text-lg font-semibold mb-4">Run Thread</h3>
<h3 className="text-lg font-semibold mb-4">Run Thread on {agent}</h3>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label
@@ -0,0 +1,110 @@
import { useEffect } from "react";
import type { AgentEndpoint } from "../api.ts";
import { listAgents } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
view: "threads" | "workflows";
agent: string | null;
onViewChange: (v: "threads" | "workflows") => void;
onAgentChange: (a: string | null) => void;
onLogout: () => void;
};
export function Sidebar({ view, agent, onViewChange, onAgentChange, onLogout }: Props) {
const { status, data } = useFetch(() => listAgents(), []);
const agents: AgentEndpoint[] = status === "ok" ? data : [];
// Auto-select first agent when none is selected
useEffect(() => {
if (agent === null && agents.length > 0) {
onAgentChange(agents[0].name);
}
}, [agent, agents, onAgentChange]);
const viewItems = [
{ key: "threads" as const, label: "Threads", icon: "⚡" },
{ key: "workflows" as const, label: "Workflows", icon: "📦" },
];
return (
<aside
className="w-56 border-r flex flex-col"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="p-4 border-b" style={{ borderColor: "var(--color-border)" }}>
<h1 className="text-lg font-semibold" style={{ color: "var(--color-accent)" }}>
Workflow
</h1>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>
Dashboard
</p>
</div>
{/* Agent selector */}
<div className="px-4 py-3 border-b" style={{ borderColor: "var(--color-border)" }}>
<label
className="block text-xs font-medium mb-1"
style={{ color: "var(--color-text-muted)" }}
htmlFor="agent-select"
>
Agent
</label>
<select
id="agent-select"
className="w-full rounded px-2 py-1.5 text-xs"
style={{
background: "var(--color-bg)",
color: "var(--color-text)",
border: "1px solid var(--color-border)",
}}
value={agent ?? ""}
onChange={(e) => onAgentChange(e.target.value || null)}
disabled={status === "loading"}
>
{status === "loading" ? (
<option value="">Loading</option>
) : agents.length === 0 ? (
<option value="">No agents online</option>
) : (
agents.map((a) => (
<option key={a.name} value={a.name}>
{a.status === "online" ? "🟢" : "🔴"} {a.name}
</option>
))
)}
</select>
</div>
{/* View navigation */}
<nav className="flex-1 p-2 space-y-1">
{viewItems.map((item) => (
<button
type="button"
key={item.key}
onClick={() => onViewChange(item.key)}
className="w-full text-left px-3 py-2 rounded text-sm transition-colors"
style={{
background: view === item.key ? "var(--color-accent-dim)" : "transparent",
color: view === item.key ? "#fff" : "var(--color-text-muted)",
}}
>
{item.icon} {item.label}
</button>
))}
</nav>
<div className="p-2 border-t" style={{ borderColor: "var(--color-border)" }}>
<button
type="button"
onClick={onLogout}
className="w-full text-left px-3 py-2 rounded text-xs transition-colors"
style={{ color: "var(--color-text-muted)" }}
>
🚪 Logout
</button>
</div>
</aside>
);
}
@@ -1,9 +1,10 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { getHealth } from "../api.ts";
import { getAgentHealth } from "../api.ts";
type HealthStatus = "connected" | "disconnected" | "reconnecting";
type Props = {
agent: string | null;
onRun: () => void;
};
@@ -17,13 +18,17 @@ function statusLabel(status: HealthStatus): { text: string; color: string } {
return { text: "● Offline", color: "var(--color-error)" };
}
export function StatusBar({ onRun }: Props) {
export function StatusBar({ agent, onRun }: Props) {
const [status, setStatus] = useState<HealthStatus>("disconnected");
const wasConnectedRef = useRef(false);
const checkHealth = useCallback(async () => {
if (!agent) {
setStatus("disconnected");
return;
}
try {
await getHealth();
await getAgentHealth(agent);
wasConnectedRef.current = true;
setStatus("connected");
} catch {
@@ -33,9 +38,11 @@ export function StatusBar({ onRun }: Props) {
setStatus("disconnected");
}
}
}, []);
}, [agent]);
useEffect(() => {
wasConnectedRef.current = false;
setStatus("disconnected");
checkHealth();
const interval = setInterval(checkHealth, 10_000);
return () => clearInterval(interval);
@@ -49,12 +56,19 @@ export function StatusBar({ onRun }: Props) {
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="flex items-center gap-4">
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
<span style={{ color: "var(--color-text-muted)" }}>
{agent ? `Agent: ${agent}` : "No agent selected"}
</span>
<button
type="button"
onClick={onRun}
disabled={!agent}
className="px-3 py-1 rounded text-xs font-medium"
style={{ background: "var(--color-accent)", color: "#fff" }}
style={{
background: agent ? "var(--color-accent)" : "var(--color-border)",
color: "#fff",
opacity: agent ? 1 : 0.5,
}}
>
Run Thread
</button>
@@ -2,15 +2,17 @@ import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
import { RecordCard } from "./record-card.tsx";
type Props = {
agent: string;
threadId: string;
onBack: () => void;
};
export function ThreadDetail({ threadId, onBack }: Props) {
const sse = useSSE(threadId);
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
export function ThreadDetail({ agent, threadId, onBack }: Props) {
const sse = useSSE(agent, threadId);
const { status, data, error } = useFetch(() => getThread(agent, threadId), [agent, threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
@@ -30,7 +32,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
setActionStatus(`${action}ing...`);
try {
const fn = action === "kill" ? killThread : action === "pause" ? pauseThread : resumeThread;
await fn(threadId);
await fn(agent, threadId);
setActionStatus(`${action} sent ✓`);
} catch (e) {
setActionStatus(`${action} failed: ${e instanceof Error ? e.message : String(e)}`);
@@ -78,7 +80,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
<h2 className="text-xl font-semibold mb-2 font-mono flex items-center gap-2 flex-wrap">
<span>{threadId}</span>
{sse.connected && (
{sse.connected && !sse.completed && (
<span
className="text-xs font-medium px-2 py-0.5 rounded"
style={{ background: "var(--color-success)", color: "var(--color-bg)" }}
@@ -101,39 +103,8 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r) => (
<div
key={`${threadId}-${r.type}-${String(r.timestamp)}-${r.role ?? ""}-${r.content ?? ""}`}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-1">
<span
className="text-xs px-1.5 py-0.5 rounded font-mono"
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
>
{r.type}
</span>
{r.role && (
<span className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{r.role}
</span>
)}
{r.timestamp !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{new Date(r.timestamp).toLocaleTimeString()}
</span>
)}
</div>
{r.content && (
<pre
className="whitespace-pre-wrap text-xs mt-1"
style={{ color: "var(--color-text)" }}
>
{typeof r.content === "string" ? r.content : JSON.stringify(r.content, null, 2)}
</pre>
)}
</div>
{records.map((r, i) => (
<RecordCard key={`${threadId}-${i}`} record={r} />
))}
<div ref={recordsEndRef} aria-hidden />
</div>
@@ -2,11 +2,12 @@ import { listThreads } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
onSelect: (id: string) => void;
};
export function ThreadList({ onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(), []);
export function ThreadList({ agent, onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(agent), [agent]);
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
@@ -1,8 +1,12 @@
import { listWorkflows } from "../api.ts";
import { useFetch } from "../hooks.ts";
export function WorkflowList() {
const { status, data, error } = useFetch(() => listWorkflows(), []);
type Props = {
agent: string;
};
export function WorkflowList({ agent }: Props) {
const { status, data, error } = useFetch(() => listWorkflows(agent), [agent]);
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
@@ -0,0 +1,92 @@
import { useCallback, useEffect, useState } from "react";
type View = "threads" | "workflows";
type HashRoute = {
view: View;
agent: string | null;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
// Format: #agent/threads/id or #agent/workflows or #threads or #workflows
const parts = raw.split("/");
// Check if first part is a known view
if (parts[0] === "threads" || parts[0] === "workflows") {
return {
view: parts[0] as View,
agent: null,
threadId: parts[0] === "threads" && parts.length > 1 ? parts.slice(1).join("/") : null,
};
}
// First part is agent name
const agent = parts[0] || null;
const viewPart = parts[1] ?? "threads";
const view: View = viewPart === "workflows" ? "workflows" : "threads";
const threadId = view === "threads" && parts.length > 2 ? parts.slice(2).join("/") : null;
return { view, agent, threadId };
}
function buildHash(route: HashRoute): string {
const prefix = route.agent ? `${route.agent}/` : "";
if (route.view === "workflows") {
return `#${prefix}workflows`;
}
if (route.threadId !== null) {
return `#${prefix}threads/${route.threadId}`;
}
return `#${prefix}threads`;
}
export function useHashRoute(): {
view: View;
agent: string | null;
threadId: string | null;
setView: (v: View) => void;
setAgent: (a: string | null) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
useEffect(() => {
function onHashChange(): void {
setRoute(parseHash(window.location.hash));
}
window.addEventListener("hashchange", onHashChange);
return () => window.removeEventListener("hashchange", onHashChange);
}, []);
const navigate = useCallback((next: HashRoute) => {
const hash = buildHash(next);
window.location.hash = hash;
setRoute(next);
}, []);
const setView = useCallback(
(v: View) => navigate({ view: v, agent: route.agent, threadId: null }),
[navigate, route.agent],
);
const setAgent = useCallback(
(a: string | null) => navigate({ view: route.view, agent: a, threadId: null }),
[navigate, route.view],
);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", agent: route.agent, threadId: id }),
[navigate, route.agent],
);
return {
view: route.view,
agent: route.agent,
threadId: route.threadId,
setView,
setAgent,
setThreadId,
};
}
@@ -8,6 +8,7 @@ import {
} from "react";
import type { ThreadRecord } from "./api.ts";
import { getApiKey } from "./api.ts";
export type UseSSEReturn = {
records: ThreadRecord[];
@@ -56,7 +57,17 @@ function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
ctx.cleanupEs();
}
export function useSSE(threadId: string | null): UseSSEReturn {
function sseUrl(agent: string, threadId: string): string {
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
const key = getApiKey();
const keyParam = key ? `?key=${encodeURIComponent(key)}` : "";
if (gatewayUrl) {
return `${gatewayUrl}/api/${agent}/threads/${encodeURIComponent(threadId)}/live${keyParam}`;
}
return `/api/threads/${encodeURIComponent(threadId)}/live`;
}
export function useSSE(agent: string | null, threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
@@ -65,7 +76,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null) {
if (threadId === null || agent === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
@@ -75,6 +86,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
const tid = threadId;
const agentName = agent;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
@@ -113,7 +125,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
cleanupEs();
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
const url = sseUrl(agentName, tid);
es = new EventSource(url);
es.onopen = () => {
@@ -136,6 +148,16 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}),
);
es.addEventListener("done", () => {
if (cancelled) {
return;
}
completedRef.current = true;
setCompleted(true);
setConnected(false);
cleanupEs();
});
es.onerror = () => {
if (cancelled || completedRef.current) {
return;
@@ -155,7 +177,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
cleanupEs();
};
}, [threadId]);
}, [agent, threadId]);
return { records, connected, completed };
}
+33
View File
@@ -0,0 +1,33 @@
# @uncaged/workflow-execute
Thread engine: execution, fork/GC, extract pipeline, supervisor/worker wiring, and workflow-as-agent.
## What This Package Does
It runs `WorkflowFn` generators against disk-backed threads, integrates CAS and registry-backed extract (`createExtract`), coordinates LLM tool usage via `@uncaged/workflow-reactor`, handles fork plans and garbage collection, and exposes `workflowAsAgent` for nesting workflows.
## Key Exports
From `src/index.ts`:
- **Engine:** `createWorkflow` (engine-local re-export), `executeThread`, `getWorkerHostScriptPath`
- **Fork / parse:** `buildForkPlan`, `parseThreadDataJsonl`, `selectForkHistoricalSteps`, `tryParseRoleStepRecord`, `tryParseWorkflowResultRecord`
- **GC / pause:** `garbageCollectCas`, `createThreadPauseGate`
- **Engine types:** `ExecuteThreadIo`, `ExecuteThreadOptions`, `ForkHistoricalStep`, `ForkPlan`, `GcResult`, `ParsedThreadStartRecord`, `PrefilledDiskStep`, `SupervisorDecision`, `ThreadPauseGate`
- **Extract:** `buildExtractUserContent`, `createExtract`, `extractFunctionToolFromZodSchema`, `llmErrorToCause`, `llmExtract`, types `ExtractFn`, `ExtractThreadContext`, `LlmError`, `LlmExtractArgs`
- **Agent composition:** `workflowAsAgent`, `WorkflowAsAgentOptions`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol`, `@uncaged/workflow-runtime`, `@uncaged/workflow-util`, `@uncaged/workflow-cas`, `@uncaged/workflow-reactor`, `@uncaged/workflow-register`
- **npm:** `yaml`
- **Peer:** `zod` ^4
`@uncaged/workflow-reactor` is used for LLM-backed extract and supervisor flows (`extract-fn.ts`, `supervisor.ts`).
## Usage
```typescript
import { executeThread } from "@uncaged/workflow-execute";
// Typical callers are CLI/tests that supply ExecuteThreadIo (paths, CAS, abort, logger, …).
```
@@ -0,0 +1,319 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import type {
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { parse as parseYaml } from "yaml";
import { executeThread } from "../src/engine/engine.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
const TEST_REGISTRY_YAML = `config:
maxDepth: 3
supervisorInterval: 0
providers:
stub:
baseUrl: http://127.0.0.1:9
apiKey: test
models:
default: stub/m
workflows: {}
`;
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
maxRounds: 5,
depth: 0,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot: "/tmp/never",
...overrides,
};
}
async function setupStorage(): Promise<{
storageRoot: string;
casDir: string;
bundleHash: string;
bundleDir: string;
}> {
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-engine-"));
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
const bundleHash = "TESTHASH00001";
const bundleDir = join(storageRoot, "bundles", bundleHash);
return { storageRoot, casDir, bundleHash, bundleDir };
}
function readCasNode(casDir: string, hash: string): Record<string, unknown> {
const text = require("node:fs").readFileSync(join(casDir, `${hash}.txt`), "utf8") as string;
return parseYaml(text) as Record<string, unknown>;
}
describe("executeThread (Phase 2 — CAS thread storage)", () => {
let storageRoot: string;
let casDir: string;
let bundleHash: string;
let bundleDir: string;
beforeEach(async () => {
const setup = await setupStorage();
storageRoot = setup.storageRoot;
casDir = setup.casDir;
bundleHash = setup.bundleHash;
bundleDir = setup.bundleDir;
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("writes a StartNode whose refs[0] is the prompt CAS hash", async () => {
const cas = createCasStore(casDir);
// biome-ignore lint/correctness/useYield: deliberately empty generator — exercises the start/end path with no role steps
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "no-op" };
};
const io: ExecuteThreadIo = {
threadId: "T01",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T01.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "hello", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const historyText = await readFile(
(await import("node:fs/promises")).readdir ? await firstHistoryFile(bundleDir) : "",
"utf8",
);
const histLine = historyText.trim().split("\n")[0] ?? "";
const histEntry = JSON.parse(histLine) as Record<string, unknown>;
expect(histEntry.threadId).toBe("T01");
const startHash = histEntry.start as string;
const startNode = readCasNode(casDir, startHash);
expect(startNode.type).toBe("start");
expect((startNode.payload as Record<string, unknown>).name).toBe("demo");
expect((startNode.payload as Record<string, unknown>).hash).toBe(bundleHash);
expect((startNode.payload as Record<string, unknown>).maxRounds).toBe(5);
const refs = startNode.refs as string[];
expect(refs.length).toBe(1);
const promptBlob = await cas.get(refs[0] ?? "");
expect(promptBlob).not.toBeNull();
const promptParsed = parseYaml(promptBlob ?? "") as Record<string, unknown>;
expect(promptParsed.payload).toBe("hello");
});
test("each role yield produces a chained StateNode and updates threads.json head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("plan-text");
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
const h2 = await runtime.cas.put("code-text");
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T02",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T02.info.jsonl"),
cas,
};
let observedHead: string | null = null;
let observedHeadAtSecondYield: string | null = null;
const opts = makeOptions({
storageRoot,
maxRounds: 5,
awaitAfterEachYield: async () => {
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, { head: string }>;
const head = parsed.T02?.head ?? null;
if (observedHead === null) {
observedHead = head;
} else if (observedHeadAtSecondYield === null) {
observedHeadAtSecondYield = head;
}
},
});
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
opts,
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
expect(observedHead).not.toBeNull();
expect(observedHeadAtSecondYield).not.toBeNull();
expect(observedHead).not.toBe(observedHeadAtSecondYield);
const firstState = readCasNode(casDir, observedHead ?? "");
expect(firstState.type).toBe("state");
expect((firstState.payload as Record<string, unknown>).role).toBe("planner");
expect((firstState.payload as Record<string, unknown>).ancestors).toEqual([]);
const secondState = readCasNode(casDir, observedHeadAtSecondYield ?? "");
expect(secondState.type).toBe("state");
expect((secondState.payload as Record<string, unknown>).role).toBe("coder");
expect((secondState.payload as Record<string, unknown>).ancestors).toEqual([observedHead]);
expect((secondState.payload as Record<string, unknown>).start).toBe(
(firstState.payload as Record<string, unknown>).start,
);
});
test("on completion: removes threads.json entry, appends history with __end__ head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("only-step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "completed" };
};
const io: ExecuteThreadIo = {
threadId: "T03",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T03.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const indexText = await readFile(join(bundleDir, "threads.json"), "utf8");
const indexParsed = JSON.parse(indexText) as Record<string, unknown>;
expect(indexParsed).toEqual({});
const historyPath = await firstHistoryFile(bundleDir);
const historyText = await readFile(historyPath, "utf8");
const lines = historyText.trim().split("\n");
expect(lines.length).toBe(1);
const entry = JSON.parse(lines[0] ?? "") as Record<string, unknown>;
expect(entry.threadId).toBe("T03");
expect(entry.head).toBe(result.rootHash);
const endNode = readCasNode(casDir, String(entry.head));
expect(endNode.type).toBe("state");
expect((endNode.payload as Record<string, unknown>).role).toBe("__end__");
expect((endNode.payload as Record<string, unknown>).meta).toEqual({
returnCode: 0,
summary: "completed",
});
});
test("does not write any .data.jsonl file under storageRoot", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T04",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T04.info.jsonl"),
cas,
};
await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
const fsp = await import("node:fs/promises");
const found: string[] = [];
async function walk(dir: string): Promise<void> {
let entries: { name: string; isDirectory: () => boolean; isFile: () => boolean }[];
try {
entries = await fsp.readdir(dir, { withFileTypes: true });
} catch {
return;
}
for (const ent of entries) {
const p = join(dir, ent.name);
if (ent.isDirectory()) {
await walk(p);
} else if (ent.isFile() && ent.name.endsWith(".data.jsonl")) {
found.push(p);
}
}
}
await walk(storageRoot);
expect(found).toEqual([]);
});
});
async function firstHistoryFile(bundleDir: string): Promise<string> {
const fsp = await import("node:fs/promises");
const dir = join(bundleDir, "history");
const entries = await fsp.readdir(dir);
const file = entries.find((n) => n.endsWith(".jsonl"));
if (file === undefined) {
throw new Error(`no history file under ${dir}`);
}
return join(dir, file);
}
@@ -0,0 +1,72 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import { type ExtractContext, START } from "@uncaged/workflow-runtime";
import * as z from "zod/v4";
import { createExtract } from "../src/extract/extract-fn.js";
function installPlainJsonExtractMock(meta: Record<string, unknown>): () => void {
const origFetch = globalThis.fetch;
const mockFetch = async (): Promise<Response> =>
new Response(
JSON.stringify({
choices: [{ message: { content: JSON.stringify(meta) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
describe("createExtract — ExtractResult shape", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("returns meta, contentPayload, and refs[]", async () => {
restoreFetch = installPlainJsonExtractMock({ confidence: 0.9 });
const dir = await mkdtemp(join(tmpdir(), "wf-extract-refs-"));
try {
const cas = createCasStore(join(dir, "cas"));
const extract = createExtract(
{ baseUrl: "http://127.0.0.1:9", apiKey: "key", model: "m" },
{ cas },
);
const schema = z.object({ confidence: z.number() });
const ctx: ExtractContext = {
threadId: "01THREADTESTAAAAAAAAAAAAAA",
depth: 0,
start: {
role: START,
content: "task text",
meta: { maxRounds: 10 },
timestamp: 100,
},
steps: [],
currentRole: { name: "analyst", systemPrompt: "be precise" },
agentContent: "model says hello",
};
const out = await extract(schema, "extract fields", ctx);
expect(out.meta).toEqual({ confidence: 0.9 });
expect(out.contentPayload).toBe("model says hello");
expect(Array.isArray(out.refs)).toBe(true);
expect(out.refs).toEqual([]);
} finally {
await rm(dir, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,112 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import { FORK_BRANCH_ROLE } from "../src/engine/fork-thread.js";
import { garbageCollectCas } from "../src/engine/gc.js";
import { getBundleDir, removeThreadEntry, upsertThreadEntry } from "../src/engine/threads-index.js";
describe("garbageCollectCas (mark-and-sweep)", () => {
let storageRoot: string;
let casDir: string;
beforeEach(async () => {
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-ms-"));
casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
await writeFile(
join(storageRoot, "workflow.yaml"),
"config:\n maxDepth: 1\n supervisorInterval: 0\n providers: {}\n models: {}\nworkflows: {}\n",
"utf8",
);
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("shared CAS prefix survives when one fork thread index entry is removed", async () => {
const bundleHash = "TESTGC0000001";
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(casDir);
const promptHash = await cas.put("prompt");
const startHash = await putStartNode(
cas,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
const c1 = await putContentNodeWithRefs(cas, "p1", []);
const h1 = await putStateNode(cas, {
role: "planner",
meta: {},
start: startHash,
content: c1,
ancestors: [],
compact: null,
timestamp: 1,
} satisfies StateNodePayload);
const c2 = await putContentNodeWithRefs(cas, "c1", []);
const h2 = await putStateNode(cas, {
role: "coder",
meta: {},
start: startHash,
content: c2,
ancestors: [h1],
compact: null,
timestamp: 2,
} satisfies StateNodePayload);
const ec = await putContentNodeWithRefs(cas, "", []);
const fm = await putStateNode(cas, {
role: FORK_BRANCH_ROLE,
meta: {},
start: startHash,
content: ec,
ancestors: [h1],
compact: null,
timestamp: 3,
} satisfies StateNodePayload);
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
head: h2,
start: startHash,
updatedAt: 10,
});
await upsertThreadEntry(bundleDir, "THREAD_BBBBBBB", {
head: fm,
start: startHash,
updatedAt: 20,
});
await removeThreadEntry(bundleDir, "THREAD_AAAAAAA");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(await cas.get(h2)).toBeNull();
expect(await cas.get(h1)).not.toBeNull();
expect(await cas.get(startHash)).not.toBeNull();
expect(await cas.get(promptHash)).not.toBeNull();
expect(await cas.get(fm)).not.toBeNull();
});
});
@@ -0,0 +1,91 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, readdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
appendThreadHistoryEntry,
removeThreadEntry,
upsertThreadEntry,
} from "../src/engine/threads-index.js";
describe("threads-index", () => {
let bundleDir: string;
beforeEach(async () => {
bundleDir = await mkdtemp(join(tmpdir(), "uncaged-wf-threads-"));
});
afterEach(async () => {
await rm(bundleDir, { recursive: true, force: true });
});
test("upsertThreadEntry creates threads.json and persists entries", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1", start: "S1", updatedAt: 100 },
});
});
test("upsertThreadEntry overwrites the head while preserving siblings", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await upsertThreadEntry(bundleDir, "T1", { head: "H1B", start: "S1", updatedAt: 300 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1B", start: "S1", updatedAt: 300 },
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry deletes the entry but keeps the file", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await removeThreadEntry(bundleDir, "T1");
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry on a missing thread is a no-op", async () => {
await removeThreadEntry(bundleDir, "MISSING");
const dirEntries = await readdir(bundleDir);
expect(dirEntries.includes("threads.json")).toBe(false);
});
test("appendThreadHistoryEntry writes one JSONL line per call into a date-keyed file", async () => {
const ts = Date.UTC(2026, 4, 9, 12, 0, 0);
await appendThreadHistoryEntry(bundleDir, {
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
await appendThreadHistoryEntry(bundleDir, {
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
const text = await readFile(join(bundleDir, "history", "2026-05-09.jsonl"), "utf8");
const lines = text.trim().split("\n");
expect(lines.length).toBe(2);
expect(JSON.parse(lines[0] ?? "{}")).toEqual({
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
expect(JSON.parse(lines[1] ?? "{}")).toEqual({
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
});
});
+241 -122
View File
@@ -1,5 +1,18 @@
import { appendFile, mkdir } from "node:fs/promises";
import { mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import {
type CasStore,
getContentMerklePayload,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNode } from "@uncaged/workflow-protocol";
import {
readWorkflowRegistry,
resolveModel,
type WorkflowConfig,
} from "@uncaged/workflow-register";
import type {
LlmProvider,
RoleOutput,
@@ -9,20 +22,29 @@ import type {
WorkflowResult,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { START } from "@uncaged/workflow-runtime";
import {
type CasStore,
getContentMerklePayload,
putStepMerkleNode,
putThreadMerkleNode,
} from "@uncaged/workflow-cas";
import { resolveModel } from "@uncaged/workflow-register";
import { createExtract } from "../extract/index.js";
import { readWorkflowRegistry, type WorkflowConfig } from "@uncaged/workflow-register";
import { err, type LogFn, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import { END, START } from "@uncaged/workflow-runtime";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import { createExtract } from "../extract/index.js";
import { runSupervisor } from "./supervisor.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import {
appendThreadHistoryEntry,
getBundleDir,
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
import type { ChainState, ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
async function resolveEngineRegistryRuntime(
storageRoot: string,
@@ -57,51 +79,108 @@ async function resolveEngineRegistryRuntime(
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
const line = `${JSON.stringify(record)}\n`;
await appendFile(path, line, "utf8");
async function appendStateForStep(params: {
cas: CasStore;
startHash: string;
chain: ChainState;
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: readonly string[];
timestamp: number;
}): Promise<{ stateHash: string; chain: ChainState }> {
const text = await getContentMerklePayload(params.cas, params.contentHash);
if (text === null) {
throw new Error(
`role step ${params.role}: CAS blob missing for contentHash ${params.contentHash}`,
);
}
const artifactRefs = params.refs.filter((r) => r !== params.contentHash);
const contentHash = await putContentNodeWithRefs(params.cas, text, artifactRefs);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: params.role,
meta: params.meta,
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
const stateHash = await putStateNode(params.cas, payload);
return {
stateHash,
chain: { parentStateHash: stateHash, parentAncestors: ancestors },
};
}
async function finalizeThreadResult(params: {
async function appendEndState(params: {
cas: CasStore;
workflowName: string;
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
timestamp: number;
}): Promise<string> {
const contentHash = await putContentNodeWithRefs(params.cas, params.completion.summary, []);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: END,
meta: { returnCode: params.completion.returnCode, summary: params.completion.summary },
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
return putStateNode(params.cas, payload);
}
async function finalizeThread(params: {
cas: CasStore;
bundleDir: string;
threadId: string;
stepMerkleHashes: readonly string[];
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
}): Promise<WorkflowResult> {
const rootHash = await putThreadMerkleNode(
params.cas,
{
workflow: params.workflowName,
threadId: params.threadId,
result: {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
},
},
params.stepMerkleHashes,
);
const ts = Date.now();
const endHash = await appendEndState({
cas: params.cas,
startHash: params.startHash,
chain: params.chain,
completion: params.completion,
timestamp: ts,
});
await removeThreadEntry(params.bundleDir, params.threadId);
await appendThreadHistoryEntry(params.bundleDir, {
threadId: params.threadId,
head: endHash,
start: params.startHash,
completedAt: ts,
});
return {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
rootHash,
rootHash: endHash,
};
}
async function finalizeAbortedThread(params: {
cas: CasStore;
workflowName: string;
bundleDir: string;
threadId: string;
stepMerkleHashes: string[];
startHash: string;
chain: ChainState;
logger: LogFn;
abortLogTag: string;
}): Promise<WorkflowResult> {
params.logger(params.abortLogTag, `thread ${params.threadId} aborted`);
return finalizeThreadResult({
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
@@ -114,8 +193,9 @@ async function maybeSupervisorHaltsThread(params: {
logger: LogFn;
threadId: string;
cas: CasStore;
workflowName: string;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult | null> {
const interval = params.workflowConfig.supervisorInterval;
if (interval <= 0 || params.written % interval !== 0) {
@@ -131,45 +211,59 @@ async function maybeSupervisorHaltsThread(params: {
params.logger("K6PW9NYT", `supervisor skipped: ${sup.error}`);
return null;
}
if (sup.value !== "stop") {
if (sup.value !== "kill") {
return null;
}
params.logger("M4QX8VHN", `thread ${params.threadId} stopped by supervisor`);
return finalizeThreadResult({
params.logger("M4QX8VHN", `thread ${params.threadId} killed by supervisor`);
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
completion: { returnCode: 0, summary: "completed: supervisor stopped thread" },
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 1, summary: "killed: supervisor detected pathological behavior" },
});
}
async function publishHead(params: {
bundleDir: string;
threadId: string;
startHash: string;
headHash: string;
}): Promise<void> {
await upsertThreadEntry(params.bundleDir, params.threadId, {
head: params.headHash,
start: params.startHash,
updatedAt: Date.now(),
});
}
async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
workflowConfig: WorkflowConfig;
thread: ThreadContext;
runtime: WorkflowRuntime;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
logger: LogFn;
cas: CasStore;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult> {
const {
fn,
workflowName,
workflowConfig,
thread,
runtime,
executeOptions,
dataJsonlPath,
threadId,
logger,
cas,
stepMerkleHashes,
bundleDir,
startHash,
} = params;
let chain: ChainState = params.chain;
const gen = fn(thread, runtime);
let written = 0;
const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({
@@ -181,9 +275,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP2",
});
@@ -191,11 +286,12 @@ async function driveWorkflowGenerator(params: {
if (written >= executeOptions.maxRounds) {
logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
@@ -207,39 +303,31 @@ async function driveWorkflowGenerator(params: {
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
const completion = iterResult.value;
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
completion,
startHash,
chain,
completion: iterResult.value,
});
}
written++;
const step = iterResult.value;
const resolved = await getContentMerklePayload(cas, step.contentHash);
if (resolved === null) {
throw new Error(
`role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`,
);
}
const ts = Date.now();
await appendDataLine(dataJsonlPath, {
const written_ = await appendStateForStep({
cas,
startHash,
chain,
role: step.role,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
refs: step.refs,
timestamp: ts,
});
const stepNodeHash = await putStepMerkleNode(
cas,
{ role: step.role, meta: step.meta },
step.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written_.chain;
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`);
@@ -262,9 +350,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP4",
});
@@ -278,8 +367,9 @@ async function driveWorkflowGenerator(params: {
logger,
threadId,
cas,
workflowName,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
if (supervised !== null) {
return supervised;
@@ -288,8 +378,16 @@ async function driveWorkflowGenerator(params: {
}
/**
* Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records,
* debug lines via `logger` to `.info.jsonl`.
* Execute a workflow thread by driving the bundle's `AsyncGenerator`.
*
* Persistence layout (RFC v3 CAS-based thread storage):
* - Thread chain is written as immutable CAS blobs: a single {@link StartNode}
* plus one {@link StateNode} per role step (including a final `__end__`
* state on completion / abort / `maxRounds`).
* - The active thread head is published in `<bundleDir>/threads.json`; on
* completion it is removed and a record is appended to
* `<bundleDir>/history/{YYYY-MM-DD}.jsonl`.
* - Debug logging continues to flow through `logger` to `.info.jsonl`.
*/
export async function executeThread(
fn: WorkflowFn,
@@ -299,71 +397,92 @@ export async function executeThread(
io: ExecuteThreadIo,
logger: LogFn,
): Promise<WorkflowResult> {
await mkdir(dirname(io.dataJsonlPath), { recursive: true });
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const prefilled = options.prefilledDiskSteps;
const fork = options.forkContinuation;
if (fork !== null && prefilled !== null) {
throw new Error("forkContinuation and prefilledDiskSteps cannot both be set");
}
if (prefilled !== null && prefilled.length !== input.steps.length) {
throw new Error(
`prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`,
);
}
const nowMs = Date.now();
const startRecord: Record<string, unknown> = {
name: workflowName,
hash: io.hash,
threadId: io.threadId,
parameters: {
prompt: input.prompt,
options: {
const replayTs = options.replayTimestamps;
if (replayTs !== null && replayTs.length !== input.steps.length) {
throw new Error(
`replayTimestamps length (${replayTs.length}) must match input.steps length (${input.steps.length})`,
);
}
const bundleDir = getBundleDir(options.storageRoot, io.hash);
let startHash: string;
if (fork !== null) {
startHash = fork.startHash;
logger("T9HQ2KHM", `thread ${io.threadId} continued fork for workflow ${workflowName}`);
} else {
const promptHash = await io.cas.put(input.prompt);
startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
};
if (options.forkSourceThreadId !== null) {
startRecord.forkFrom = { threadId: options.forkSourceThreadId };
promptHash,
);
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
}
await appendDataLine(io.dataJsonlPath, startRecord);
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
const stepMerkleHashes: string[] = [];
let chain: ChainState = fork !== null ? fork.initialChain : EMPTY_CHAIN_STATE;
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
if (prefilledPayload === null) {
throw new Error(
`prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`,
);
}
await appendDataLine(io.dataJsonlPath, {
const written = await appendStateForStep({
cas: io.cas,
startHash,
chain,
role: row.role,
contentHash: row.contentHash,
meta: row.meta,
refs: normalizeRefsField(row.refs),
refs: row.refs,
timestamp: row.timestamp,
});
const stepNodeHash = await putStepMerkleNode(
io.cas,
{ role: row.role, meta: row.meta },
row.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written.chain;
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: written.stateHash,
});
}
}
const nowMs = Date.now();
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas: io.cas,
workflowName,
bundleDir,
threadId: io.threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
@@ -390,7 +509,7 @@ export async function executeThread(
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
timestamp: replayTs?.[i] ?? prefilled?.[i]?.timestamp ?? nowMs + i,
})),
};
@@ -401,15 +520,15 @@ export async function executeThread(
return await driveWorkflowGenerator({
fn,
workflowName,
workflowConfig: registryRuntime.value.workflowConfig,
thread,
runtime,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
logger,
cas: io.cas,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
}
@@ -1,9 +1,29 @@
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import type { CasStore } from "@uncaged/workflow-cas";
import { parseCasThreadNode, putContentNodeWithRefs, putStateNode } from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import type { RoleOutput, WorkflowCompletion } from "@uncaged/workflow-runtime";
import { END } from "@uncaged/workflow-runtime";
import { err, ok, type Result } from "@uncaged/workflow-util";
import { parse as parseYaml } from "yaml";
import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js";
import { upsertThreadEntry } from "./threads-index.js";
import type { CasForkPlan, ChainState, ForkContinuationOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */
/** Internal branch marker; skipped when presenting fork selection / replay slices. */
export const FORK_BRANCH_ROLE = "__fork__";
/** Cap for {@link StateNodePayload}.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */
export function tryParseWorkflowResultRecord(
obj: Record<string, unknown>,
): WorkflowCompletion | null {
@@ -18,227 +38,288 @@ export function tryParseWorkflowResultRecord(
return { returnCode, summary };
}
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
const role = obj.role;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return null;
}
if (typeof contentHash !== "string") {
return null;
}
if (meta === null || typeof meta !== "object") {
return null;
}
if (typeof timestamp !== "number") {
return null;
}
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
};
}
function parseRoleLine(
obj: Record<string, unknown>,
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const parsed = tryParseRoleStepRecord(obj);
if (parsed === null) {
return err(`invalid role record at line ${lineIndex}`);
}
return ok(parsed);
}
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
let startParsed: unknown;
try {
startParsed = JSON.parse(firstLine) as unknown;
} catch {
return err("invalid JSON on line 1 (start record)");
}
if (startParsed === null || typeof startParsed !== "object") {
return err("invalid start record shape");
}
const startRec = startParsed as Record<string, unknown>;
const name = startRec.name;
const hash = startRec.hash;
const threadId = startRec.threadId;
const parameters = startRec.parameters;
if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") {
return err("start record missing name, hash, or threadId");
}
if (parameters === null || typeof parameters !== "object") {
return err("start record missing parameters");
}
const paramsRec = parameters as Record<string, unknown>;
const prompt = paramsRec.prompt;
const options = paramsRec.options;
if (typeof prompt !== "string") {
return err("start record missing parameters.prompt");
}
if (options === null || typeof options !== "object") {
return err("start record missing parameters.options");
}
const optRec = options as Record<string, unknown>;
const maxRounds = optRec.maxRounds;
if (typeof maxRounds !== "number") {
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[], string> {
const roleSteps: ForkHistoricalStep[] = [];
for (let i = 1; i < lines.length; i++) {
const line = lines[i];
if (line === undefined) {
/** Walk {@link StateNode} hashes from head toward the first step (newest → oldest). */
export async function walkStateFramesNewestFirst(
cas: CasStore,
headHash: string,
): Promise<Array<{ hash: string; payload: StateNodePayload }>> {
const frames: Array<{ hash: string; payload: StateNodePayload }> = [];
let cur = headHash;
while (true) {
const yamlText = await cas.get(cur);
if (yamlText === null) {
break;
}
let rec: unknown;
try {
rec = JSON.parse(line) as unknown;
} catch {
return err(`invalid JSON at line ${i + 1}`);
}
if (rec === null || typeof rec !== "object") {
return err(`invalid record at line ${i + 1}`);
}
const recObj = rec as Record<string, unknown>;
const wf = tryParseWorkflowResultRecord(recObj);
if (wf !== null) {
if (i !== lines.length - 1) {
return err("WorkflowResult record must be the final line in `.data.jsonl`");
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
break;
}
const parsed = parseRoleLine(recObj, i + 1);
if (!parsed.ok) {
return parsed;
frames.push({ hash: cur, payload: parsed.node.payload });
const ancestors = parsed.node.payload.ancestors;
if (ancestors.length === 0) {
break;
}
roleSteps.push(parsed.value);
const parent = ancestors[0];
if (parent === undefined || parent === "") {
break;
}
cur = parent;
}
return ok(roleSteps);
return frames;
}
/**
* Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs.
*/
export function parseThreadDataJsonl(text: string): Result<
{
start: ParsedThreadStartRecord;
roleSteps: ForkHistoricalStep[];
},
string
> {
const lines = text
.split("\n")
.map((l) => l.trim())
.filter((l) => l !== "");
if (lines.length === 0) {
return err("thread data is empty");
}
const firstLine = lines[0];
if (firstLine === undefined) {
return err("thread data is empty");
}
const start = parseStartRecordLine(firstLine);
if (!start.ok) {
return start;
}
const roleSteps = parseFollowingRoleLines(lines);
if (!roleSteps.ok) {
return roleSteps;
}
return ok({
start: start.value,
roleSteps: roleSteps.value,
});
}
function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] {
function orderedUniqueRoles(roles: string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const s of roleSteps) {
if (!seen.has(s.role)) {
seen.add(s.role);
out.push(s.role);
for (const r of roles) {
if (!seen.has(r)) {
seen.add(r);
out.push(r);
}
}
return out;
}
/**
* Select historical steps for a fork:
* - `fromRole === null`: drop the last step (retry the last role).
* - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive).
*/
export function selectForkHistoricalSteps(
roleSteps: ForkHistoricalStep[],
async function readPromptText(cas: CasStore, promptHash: string): Promise<Result<string, string>> {
const yamlText = await cas.get(promptHash);
if (yamlText === null) {
return err(`prompt CAS blob missing: ${promptHash}`);
}
let raw: unknown;
try {
raw = parseYaml(yamlText) as unknown;
} catch {
return err(`prompt CAS blob is not valid YAML: ${promptHash}`);
}
if (raw === null || typeof raw !== "object") {
return err(`prompt CAS blob has unexpected shape: ${promptHash}`);
}
const payload = (raw as Record<string, unknown>).payload;
if (typeof payload !== "string") {
return err(`prompt CAS blob missing string payload: ${promptHash}`);
}
return ok(payload);
}
async function readStartWorkflowIdentity(params: {
cas: CasStore;
startHash: string;
}): Promise<
Result<{ workflowName: string; maxRounds: number; depth: number; prompt: string }, string>
> {
const yamlText = await params.cas.get(params.startHash);
if (yamlText === null) {
return err(`start node missing in CAS: ${params.startHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return err(`CAS blob is not a StartNode: ${params.startHash}`);
}
const refs = parsed.node.refs;
const promptHash = refs[0];
if (typeof promptHash !== "string") {
return err("StartNode refs[0] must be the prompt hash");
}
const prompt = await readPromptText(params.cas, promptHash);
if (!prompt.ok) {
return prompt;
}
const p = parsed.node.payload;
return ok({
workflowName: p.name,
maxRounds: p.maxRounds,
depth: p.depth,
prompt: prompt.value,
});
}
async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Promise<RoleOutput> {
let refs: string[] = [];
const blob = await cas.get(payload.content);
if (blob !== null) {
const cn = parseCasThreadNode(blob);
if (cn?.kind === "content") {
refs = [...cn.node.refs];
}
}
return {
role: payload.role,
contentHash: payload.content,
meta: payload.meta,
refs,
};
}
function meaningfulFramesOldestFirst(
newestFirst: Array<{ hash: string; payload: StateNodePayload }>,
): Array<{ hash: string; payload: StateNodePayload }> {
const chronological = [...newestFirst].reverse();
return chronological.filter((f) => f.payload.role !== END && f.payload.role !== FORK_BRANCH_ROLE);
}
function selectForkPointStateHash(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
fromRole: string | null,
): Result<ForkHistoricalStep[], string> {
if (roleSteps.length === 0) {
): Result<string | null, string> {
if (meaningfulOldestFirst.length === 0) {
return err("thread has no completed role steps to fork from");
}
if (fromRole === null) {
if (roleSteps.length === 1) {
return ok([]);
if (meaningfulOldestFirst.length === 1) {
return ok(null);
}
return ok(roleSteps.slice(0, -1));
const forkFrame = meaningfulOldestFirst[meaningfulOldestFirst.length - 2];
if (forkFrame === undefined) {
return err("thread has no completed role steps to fork from");
}
return ok(forkFrame.hash);
}
const idx = roleSteps.findIndex((s) => s.role === fromRole);
const idx = meaningfulOldestFirst.findIndex((f) => f.payload.role === fromRole);
if (idx < 0) {
const available = orderedUniqueRoles(roleSteps);
const available = orderedUniqueRoles(meaningfulOldestFirst.map((f) => f.payload.role));
return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`);
}
return ok(roleSteps.slice(0, idx + 1));
const forkFrame = meaningfulOldestFirst[idx];
if (forkFrame === undefined) {
return err("fork frame missing");
}
return ok(forkFrame.hash);
}
function replayFramesThroughForkPoint(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
forkPointHash: string | null,
): Array<{ hash: string; payload: StateNodePayload }> {
if (forkPointHash === null) {
return [];
}
const idx = meaningfulOldestFirst.findIndex((f) => f.hash === forkPointHash);
if (idx < 0) {
return [];
}
return meaningfulOldestFirst.slice(0, idx + 1);
}
async function buildForkContinuation(params: {
cas: CasStore;
sourceThreadId: string;
startHash: string;
forkPointStateHash: string | null;
}): Promise<Result<ForkContinuationOptions, string>> {
const { cas, sourceThreadId, startHash, forkPointStateHash } = params;
if (forkPointStateHash === null) {
return ok({
startHash,
forkHeadHash: startHash,
initialChain: EMPTY_CHAIN_STATE,
});
}
const yamlText = await cas.get(forkPointStateHash);
if (yamlText === null) {
return err(`fork point state missing in CAS: ${forkPointStateHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
return err(`fork point blob is not a StateNode: ${forkPointStateHash}`);
}
const fpPayload = parsed.node.payload;
const chainBefore: ChainState = {
parentStateHash: forkPointStateHash,
parentAncestors: fpPayload.ancestors,
};
const ancestorsMarker = computeAncestors(chainBefore);
const emptyContentHash = await putContentNodeWithRefs(cas, "", []);
const markerPayload: StateNodePayload = {
role: FORK_BRANCH_ROLE,
meta: { forkFrom: sourceThreadId },
start: startHash,
content: emptyContentHash,
ancestors: ancestorsMarker,
compact: null,
timestamp: Date.now(),
};
const markerHash = await putStateNode(cas, markerPayload);
const initialChain: ChainState = {
parentStateHash: markerHash,
parentAncestors: ancestorsMarker,
};
return ok({
startHash,
forkHeadHash: markerHash,
initialChain,
});
}
/**
* Read `.data.jsonl` text and compute fork payload for the worker `run` command.
* Prepare a CAS fork: writes the branch marker {@link StateNode}, registers `threads.json`,
* and returns worker payload fields (shared {@link StartNode}, zero ancestor duplication).
*/
export function buildForkPlan(
dataJsonlText: string,
fromRole: string | null,
): Result<ForkPlan, string> {
const parsed = parseThreadDataJsonl(dataJsonlText);
if (!parsed.ok) {
return parsed;
export async function prepareCasFork(params: {
cas: CasStore;
bundleDir: string;
bundleHash: string;
sourceThreadId: string;
headHash: string;
startHash: string;
newThreadId: string;
fromRole: string | null;
}): Promise<Result<CasForkPlan, string>> {
const id = await readStartWorkflowIdentity({
cas: params.cas,
startHash: params.startHash,
});
if (!id.ok) {
return id;
}
const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole);
if (!selected.ok) {
return selected;
const newestFirst = await walkStateFramesNewestFirst(params.cas, params.headHash);
const meaningful = meaningfulFramesOldestFirst(newestFirst);
const forkPoint = selectForkPointStateHash(meaningful, params.fromRole);
if (!forkPoint.ok) {
return forkPoint;
}
const { start } = parsed.value;
const replayFrames = replayFramesThroughForkPoint(meaningful, forkPoint.value);
const steps: RoleOutput[] = [];
const stepTimestamps: number[] = [];
for (const fr of replayFrames) {
steps.push(await payloadToRoleOutput(params.cas, fr.payload));
stepTimestamps.push(fr.payload.timestamp);
}
const cont = await buildForkContinuation({
cas: params.cas,
sourceThreadId: params.sourceThreadId,
startHash: params.startHash,
forkPointStateHash: forkPoint.value,
});
if (!cont.ok) {
return cont;
}
await upsertThreadEntry(params.bundleDir, params.newThreadId, {
head: cont.value.forkHeadHash,
start: params.startHash,
updatedAt: Date.now(),
});
return ok({
workflowName: start.workflowName,
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
workflowName: id.value.workflowName,
hash: params.bundleHash,
sourceThreadId: params.sourceThreadId,
prompt: id.value.prompt,
runOptions: { maxRounds: id.value.maxRounds, depth: id.value.depth },
steps,
stepTimestamps,
forkContinuation: cont.value,
});
}
+129 -69
View File
@@ -1,122 +1,182 @@
import { readdir, readFile } from "node:fs/promises";
import type { Stats } from "node:fs";
import { readdir, readFile, stat } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "@uncaged/workflow-cas";
import { type CasStore, createCasStore, findReachableHashes } from "@uncaged/workflow-cas";
import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util";
import { parseThreadDataJsonl } from "./fork-thread.js";
import type { ThreadHistoryEntry, ThreadIndex } from "./threads-index.js";
import { readThreadsIndex } from "./threads-index.js";
import type { GcResult } from "./types.js";
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseHistoryLine(jsonLine: string): ThreadHistoryEntry | null {
let raw: unknown;
try {
hashes = await readdir(logsRoot);
raw = JSON.parse(jsonLine) as unknown;
} catch {
return null;
}
if (!isPlainObject(raw)) {
return null;
}
const threadId = raw.threadId;
const head = raw.head;
const start = raw.start;
const completedAt = raw.completedAt;
if (
typeof threadId !== "string" ||
typeof head !== "string" ||
typeof start !== "string" ||
typeof completedAt !== "number"
) {
return null;
}
return { threadId, head, start, completedAt };
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: walks threads index + optional history dir
async function collectGcRootsFromBundle(bundleDir: string): Promise<Result<string[], string>> {
const roots: string[] = [];
let activeIndex: ThreadIndex;
try {
activeIndex = await readThreadsIndex(bundleDir);
} catch (e) {
return err(`failed to read threads.json under ${bundleDir}: ${String(e)}`);
}
for (const entry of Object.values(activeIndex)) {
roots.push(entry.head);
roots.push(entry.start);
}
const histDir = join(bundleDir, "history");
let histFiles: string[];
try {
histFiles = await readdir(histDir);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(roots);
}
return err(`failed to read history directory ${histDir}: ${String(e)}`);
}
for (const name of histFiles) {
if (!name.endsWith(".jsonl")) {
continue;
}
let text: string;
try {
text = await readFile(join(histDir, name), "utf8");
} catch (e) {
return err(`failed to read history file ${name}: ${String(e)}`);
}
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
const entry = parseHistoryLine(trimmed);
if (entry === null) {
continue;
}
roots.push(entry.head);
roots.push(entry.start);
}
}
return ok(roots);
}
async function collectAllGcRoots(storageRoot: string): Promise<Result<string[], string>> {
const bundlesRoot = join(storageRoot, "bundles");
let entries: string[];
try {
entries = await readdir(bundlesRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
return err(`failed to read bundles directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
const roots: string[] = [];
for (const name of entries) {
const bundleDir = join(bundlesRoot, name);
let st: Stats;
try {
entries = await readdir(dir);
st = await stat(bundleDir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
if (!st.isDirectory()) {
continue;
}
const chunk = await collectGcRootsFromBundle(bundleDir);
if (!chunk.ok) {
return chunk;
}
roots.push(...chunk.value);
}
paths.sort();
return ok(paths);
return ok(roots);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
async function deleteCasNotMarked(cas: CasStore, marked: ReadonlySet<string>): Promise<string[]> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
throw new Error(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
if (marked.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
throw new Error(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
return deletedHashes;
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
* Mark-and-sweep CAS GC: roots are every `head` / `start` hash from `threads.json` and
* `history/*.jsonl` across bundle dirs; marks closure via `refs[]`; deletes unreachable blobs.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
const rootsResult = await collectAllGcRoots(storageRoot);
if (!rootsResult.ok) {
return rootsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const roots = rootsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
const marked = await findReachableHashes(roots, cas);
let deletedHashes: string[];
try {
deletedHashes = await deleteCasNotMarked(cas, marked);
} catch (e) {
return err(String(e));
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
scannedThreads: roots.length,
activeRefs: marked.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
+17 -7
View File
@@ -1,23 +1,33 @@
export { createWorkflow } from "./create-workflow.js";
export { executeThread } from "./engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./types.js";
export { EMPTY_CHAIN_STATE } from "./types.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
@@ -1,10 +1,9 @@
import * as z from "zod/v4";
import { resolveModel } from "@uncaged/workflow-register";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { resolveModel } from "@uncaged/workflow-register";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import type { SupervisorDecision } from "./types.js";
@@ -13,12 +12,12 @@ const SUPERVISOR_MAX_REACT_ROUNDS = 4;
const supervisorDecisionSchema = z
.object({
decision: z.enum(["continue", "stop"]),
decision: z.enum(["continue", "kill"]),
})
.meta({
title: "supervisor_decision",
description:
'Workflow supervisor decision. "continue" when the thread is making progress; "stop" when done, looping, or stuck.',
'Workflow supervisor decision. "continue" when the thread is making progress or following its normal role sequence; "kill" only when the thread is stuck in an infinite loop, producing no meaningful progress, or has gone off the rails. Normal workflow completion is handled by the moderator — the supervisor should NOT kill a thread just because it looks done.',
});
type SupervisorThreadContext = Record<string, never>;
@@ -64,7 +63,7 @@ export async function runSupervisor(
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You supervise a multi-step workflow. Decide whether the thread should keep running or halt. Reply with "continue" when the thread is making progress toward the task, or "stop" when it is finished, looping, or no longer making progress. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"stop"}.`,
`You supervise a multi-step workflow. Your job is to detect pathological situations — NOT to decide when the workflow is "done" (that is the moderator's job). Reply with "continue" when the thread is making progress or following its normal role sequence. Reply with "kill" ONLY when the thread is stuck in an infinite loop, producing repetitive/meaningless output, or has clearly gone off the rails. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"continue"}.`,
toolHandler: async (call) => `Unknown tool: ${call.function.name}`,
});
@@ -75,7 +74,7 @@ export async function runSupervisor(
});
if (!result.ok) {
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
args.logger("R9CW4PHM", `supervisor failed: ${result.error}`);
return err(`supervisor: ${result.error}`);
}
@@ -0,0 +1,199 @@
import { appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-util";
/**
* Active-thread index entry stored in `<bundleDir>/threads.json`.
*
* Once the thread reaches `__end__`, the entry is removed from `threads.json`
* and a corresponding line is appended to `history/{YYYY-MM-DD}.jsonl`.
*/
export type ThreadIndexEntry = {
head: string;
start: string;
updatedAt: number;
};
export type ThreadHistoryEntry = {
threadId: string;
head: string;
start: string;
completedAt: number;
};
export type ThreadIndex = Record<string, ThreadIndexEntry>;
export function getBundleDir(storageRoot: string, bundleHash: string): string {
return join(storageRoot, "bundles", bundleHash);
}
function threadsJsonPath(bundleDir: string): string {
return join(bundleDir, "threads.json");
}
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseThreadIndexEntry(raw: unknown): ThreadIndexEntry | null {
if (!isPlainObject(raw)) {
return null;
}
const head = raw.head;
const start = raw.start;
const updatedAt = raw.updatedAt;
if (typeof head !== "string" || typeof start !== "string" || typeof updatedAt !== "number") {
return null;
}
return { head, start, updatedAt };
}
function parseThreadIndex(text: string): ThreadIndex {
const trimmed = text.trim();
if (trimmed === "") {
return {};
}
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
} catch {
return {};
}
if (!isPlainObject(raw)) {
return {};
}
const out: ThreadIndex = {};
for (const [k, v] of Object.entries(raw)) {
const entry = parseThreadIndexEntry(v);
if (entry !== null) {
out[k] = entry;
}
}
return out;
}
/** Read `<bundleDir>/threads.json` (empty object when missing or invalid). */
export async function readThreadsIndex(bundleDir: string): Promise<ThreadIndex> {
const path = threadsJsonPath(bundleDir);
let text: string;
try {
text = await readFile(path, "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return {};
}
throw e;
}
return parseThreadIndex(text);
}
export async function writeThreadsIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
const path = threadsJsonPath(bundleDir);
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
const json = `${JSON.stringify(index, null, 2)}\n`;
await writeFile(tmp, json, "utf8");
await rename(tmp, path);
}
/** Insert/update a thread entry in `threads.json`. */
export async function upsertThreadEntry(
bundleDir: string,
threadId: string,
entry: ThreadIndexEntry,
): Promise<void> {
const index = await readThreadsIndex(bundleDir);
index[threadId] = entry;
await writeThreadsIndex(bundleDir, index);
}
/** Remove a thread entry from `threads.json` (no-op when absent). */
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
const index = await readThreadsIndex(bundleDir);
if (!(threadId in index)) {
return;
}
delete index[threadId];
await writeThreadsIndex(bundleDir, index);
}
function dateKey(epochMs: number): string {
const d = new Date(epochMs);
const y = d.getUTCFullYear().toString().padStart(4, "0");
const m = (d.getUTCMonth() + 1).toString().padStart(2, "0");
const day = d.getUTCDate().toString().padStart(2, "0");
return `${y}-${m}-${day}`;
}
/** Append a completion record to `history/{YYYY-MM-DD}.jsonl` keyed off `completedAt`. */
export async function appendThreadHistoryEntry(
bundleDir: string,
entry: ThreadHistoryEntry,
): Promise<void> {
const path = join(bundleDir, "history", `${dateKey(entry.completedAt)}.jsonl`);
await mkdir(dirname(path), { recursive: true });
const line = `${JSON.stringify(entry)}\n`;
await appendFile(path, line, "utf8");
}
/** Removes every `history/*.jsonl` line whose `threadId` matches (rewrite files in place). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: per-file JSONL filtering keeps RM deterministic
export async function removeThreadHistoryEntries(
bundleDir: string,
threadId: string,
): Promise<Result<number, string>> {
const histRoot = join(bundleDir, "history");
let files: string[];
try {
files = await readdir(histRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(0);
}
return err(`failed to read history directory: ${String(e)}`);
}
let removed = 0;
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const path = join(histRoot, name);
let text: string;
try {
text = await readFile(path, "utf8");
} catch {
continue;
}
const kept: string[] = [];
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
kept.push(`${trimmed}\n`);
continue;
}
if (rec === null || typeof rec !== "object") {
kept.push(`${trimmed}\n`);
continue;
}
const id = (rec as Record<string, unknown>).threadId;
if (id === threadId) {
removed++;
continue;
}
kept.push(`${trimmed}\n`);
}
await writeFile(path, kept.join(""), "utf8");
}
return ok(removed);
}
+35 -19
View File
@@ -1,18 +1,35 @@
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { CasStore } from "@uncaged/workflow-cas";
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { Result } from "@uncaged/workflow-util";
export type SupervisorDecision = "continue" | "stop";
export type SupervisorDecision = "continue" | "kill";
export type ExecuteThreadIo = {
threadId: string;
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
cas: CasStore;
};
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
/** CAS chain tail state before the next appended {@link StateNode}. */
export type ChainState = {
parentStateHash: string | null;
parentAncestors: readonly string[];
};
export const EMPTY_CHAIN_STATE: ChainState = { parentStateHash: null, parentAncestors: [] };
/**
* When forking, the worker continues from an existing {@link StartNode} plus an optional
* branch marker {@link StateNode} instead of allocating a new start blob.
*/
export type ForkContinuationOptions = {
startHash: string;
forkHeadHash: string;
initialChain: ChainState;
};
/** One replayed role step (prefill) before the generator runs (same layout as disk replay rows). */
export type PrefilledDiskStep = {
role: string;
contentHash: string;
@@ -31,37 +48,36 @@ export type ExecuteThreadOptions = {
/** When non-null, written into the start record so tooling can trace lineage. */
forkSourceThreadId: string | null;
/**
* Written to `.data.jsonl` immediately after the start record, before the generator runs.
* When non-null, replays these steps into CAS before the generator runs.
* Must match `input.steps` length and order when present.
*/
prefilledDiskSteps: PrefilledDiskStep[] | null;
/** When non-null, skip creating a new {@link StartNode} and continue this CAS chain. */
forkContinuation: ForkContinuationOptions | null;
/**
* When non-null, must match `input.steps.length`; supplies persisted timestamps for
* {@link ThreadContext.steps} (used when restoring history without prefilled CAS replay).
*/
replayTimestamps: readonly number[] | null;
/** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */
storageRoot: string;
};
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
export type ParsedThreadStartRecord = {
workflowName: string;
hash: string;
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
export type ForkPlan = {
export type CasForkPlan = {
workflowName: string;
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
steps: RoleOutput[];
stepTimestamps: number[];
forkContinuation: ForkContinuationOptions;
};
export type GcResult = {
/** Count of root hashes seeded from thread indexes (`head`/`start` per entry). */
scannedThreads: number;
/** Reachable CAS blobs after the mark phase. */
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
+101 -14
View File
@@ -1,9 +1,13 @@
import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
import { unlinkSync } from "node:fs";
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import type { RoleOutput, WorkflowFn, WorkflowResult } from "@uncaged/workflow-runtime";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import {
ensureUncagedWorkflowSymlink,
importWorkflowBundleModule,
} from "@uncaged/workflow-register";
import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime";
import {
createLogger,
err,
@@ -14,7 +18,12 @@ import {
} from "@uncaged/workflow-util";
import { executeThread } from "./engine.js";
import { createThreadPauseGate } from "./thread-pause-gate.js";
import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js";
import type {
ExecuteThreadIo,
ForkContinuationOptions,
PrefilledDiskStep,
ThreadPauseGate,
} from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
@@ -25,9 +34,10 @@ type RunCommand = {
prompt: string;
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
/** Timestamps aligned with `steps` for replay / fork restore; length must match `steps` when steps are non-empty. */
stepTimestamps: number[] | null;
forkSourceThreadId: string | null;
forkContinuation: ForkContinuationOptions | null;
};
type KillCommand = {
@@ -70,6 +80,7 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
};
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: mirrors permissive worker IPC decoding shape checks
function parseRunStepsPayload(rec: Record<string, unknown>): {
steps: RoleOutput[];
stepTimestamps: number[] | null;
@@ -104,12 +115,60 @@ function parseRunStepsPayload(rec: Record<string, unknown>): {
return null;
}
}
const parallelTsRaw = rec.stepTimestamps;
if (
steps.length > 0 &&
Array.isArray(parallelTsRaw) &&
parallelTsRaw.length === steps.length &&
parallelTsRaw.every((x): x is number => typeof x === "number")
) {
return { steps, stepTimestamps: [...parallelTsRaw] };
}
return {
steps,
stepTimestamps: anyTimestamp ? timestamps : null,
};
}
function parseForkContinuation(rec: Record<string, unknown>): ForkContinuationOptions | null {
const raw = rec.forkContinuation;
if (raw === undefined || raw === null) {
return null;
}
if (typeof raw !== "object") {
return null;
}
const o = raw as Record<string, unknown>;
const startHash = o.startHash;
const forkHeadHash = o.forkHeadHash;
const ic = o.initialChain;
if (typeof startHash !== "string" || typeof forkHeadHash !== "string") {
return null;
}
if (ic === null || typeof ic !== "object") {
return null;
}
const ich = ic as Record<string, unknown>;
const pph = ich.parentStateHash;
const pa = ich.parentAncestors;
if (!(pph === null || typeof pph === "string")) {
return null;
}
if (!Array.isArray(pa) || !pa.every((x) => typeof x === "string")) {
return null;
}
return {
startHash,
forkHeadHash,
initialChain: {
parentStateHash: pph,
parentAncestors: pa,
},
};
}
function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null {
const threadId = rec.threadId;
const workflowName = rec.workflowName;
@@ -145,6 +204,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
}
forkSourceThreadId = rawFork;
}
const forkContinuation = parseForkContinuation(rec);
return {
type: "run",
threadId,
@@ -154,6 +214,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
forkContinuation,
};
}
@@ -322,6 +383,23 @@ async function main(): Promise<void> {
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
function cleanupAllRunningMarkersSync(): void {
for (const threadId of threads.keys()) {
try {
unlinkSync(join(storageRoot, "logs", hash, `${threadId}.running`));
} catch {
// ignore missing file or other fs errors
}
}
}
for (const sig of ["SIGINT", "SIGTERM"] as const) {
process.on(sig, () => {
cleanupAllRunningMarkersSync();
process.exit(sig === "SIGINT" ? 130 : 143);
});
}
const cas = createCasStore(getGlobalCasDir(storageRoot));
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
@@ -354,6 +432,7 @@ async function main(): Promise<void> {
}
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: TCP worker multiplexes lifecycle + runs
async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise<void> {
if (cmd.type !== "run") {
dispatchThreadLifecycleCommand(threads, socket, cmd);
@@ -364,13 +443,11 @@ async function main(): Promise<void> {
const threadId = cmd.threadId;
const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`);
const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId,
hash,
dataJsonlPath,
infoJsonlPath,
cas,
};
@@ -387,14 +464,25 @@ async function main(): Promise<void> {
try {
await mkdir(dirname(runningPath), { recursive: true });
await mkdir(dirname(dataJsonlPath), { recursive: true });
await writeFile(runningPath, "", "utf8");
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const baseTs = Date.now();
let prefilledDiskSteps: PrefilledDiskStep[] | null = null;
if (cmd.steps.length > 0) {
let replayTimestamps: readonly number[] | null = null;
if (cmd.forkContinuation !== null) {
if (
cmd.steps.length > 0 &&
(cmd.stepTimestamps === null || cmd.stepTimestamps.length !== cmd.steps.length)
) {
bootLog("J5WQ8NXT", "forkContinuation requires stepTimestamps aligned with steps");
throw new Error("forkContinuation requires stepTimestamps aligned with steps");
}
replayTimestamps =
cmd.steps.length === 0 ? null : (cmd.stepTimestamps as readonly number[]);
} else if (cmd.steps.length > 0) {
prefilledDiskSteps = cmd.steps.map((step, i) => {
const ts = cmd.stepTimestamps?.[i];
return {
@@ -407,7 +495,7 @@ async function main(): Promise<void> {
});
}
const runResult = await executeThread(
await executeThread(
workflowFn,
cmd.workflowName,
{ prompt: cmd.prompt, steps: cmd.steps },
@@ -417,20 +505,19 @@ async function main(): Promise<void> {
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
forkSourceThreadId: cmd.forkSourceThreadId,
prefilledDiskSteps,
forkContinuation: cmd.forkContinuation,
replayTimestamps,
storageRoot,
},
io,
logger,
);
await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" };
await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {});
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
threads.delete(threadId);
bumpDone();
socket?.end();
}
@@ -1,7 +1,12 @@
import type { ExtractContext, ExtractFn, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type {
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
} from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
export type ExtractDeps = {
@@ -15,7 +20,7 @@ const CAS_GET_TOOL_DEFINITION = {
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
@@ -97,7 +102,7 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, refs for content nodes or children for step/thread legacy nodes) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
@@ -121,7 +126,7 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
): Promise<ExtractResult<T>> => {
const text = await buildExtractUserContent(ctx, prompt, deps);
const result = await reactor({
thread: { cas: deps.cas },
@@ -131,6 +136,10 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
if (!result.ok) {
throw new Error(`extract failed: ${result.error}`);
}
return result.value;
return {
meta: result.value,
contentPayload: ctx.agentContent,
refs: [],
};
};
}
@@ -1,6 +1,5 @@
import * as z from "zod/v4";
import { err, ok, type Result } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import type { LlmError, LlmExtractArgs } from "./types.js";
+23 -11
View File
@@ -1,35 +1,47 @@
export { createWorkflow } from "./engine/create-workflow.js";
export { executeThread } from "./engine/engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./engine/fork-thread.js";
export { garbageCollectCas } from "./engine/gc.js";
export { createThreadPauseGate } from "./engine/thread-pause-gate.js";
export type {
ThreadHistoryEntry,
ThreadIndex,
ThreadIndexEntry,
} from "./engine/threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./engine/threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./engine/types.js";
export { EMPTY_CHAIN_STATE } from "./engine/types.js";
export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export {
buildExtractUserContent,
createExtract,
type ExtractThreadContext,
} from "./extract/index.js";
export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
} from "./extract/index.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export { workflowAsAgent, type WorkflowAsAgentOptions } from "./workflow-as-agent.js";
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
@@ -1,17 +1,20 @@
import { join } from "node:path";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import { extractBundleExports } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import {
extractBundleExports,
getRegisteredWorkflow,
readWorkflowRegistry,
} from "@uncaged/workflow-register";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import {
createLogger,
generateUlid,
getDefaultWorkflowStorageRoot,
getGlobalCasDir,
} from "@uncaged/workflow-util";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
@@ -74,13 +77,11 @@ export function workflowAsAgent(
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
cas: createCasStore(getGlobalCasDir(storageRoot)),
};
@@ -100,6 +101,8 @@ export function workflowAsAgent(
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot,
},
io,
+17
View File
@@ -0,0 +1,17 @@
{
"name": "@uncaged/workflow-gateway",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy"
},
"dependencies": {
"hono": "^4.7.11"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20260425.1",
"wrangler": "^4.20.0"
}
}
+152
View File
@@ -0,0 +1,152 @@
import { Hono } from "hono";
import { cors } from "hono/cors";
type Env = {
Bindings: {
ENDPOINTS: KVNamespace;
GATEWAY_SECRET: string;
DASHBOARD_API_KEY: string;
};
};
type EndpointRecord = {
name: string;
url: string;
agentToken: string;
registeredAt: number;
lastHeartbeat: number;
};
const TTL_SECONDS = 300; // 5 min — offline if no heartbeat
const app = new Hono<Env>();
app.use("*", cors());
function checkDashboardAuth(c: {
req: { header: (n: string) => string | undefined; query: (n: string) => string | undefined };
env: Env["Bindings"];
}): boolean {
const bearer = c.req.header("Authorization")?.replace("Bearer ", "");
const query = c.req.query("key");
const key = bearer ?? query;
return key === c.env.DASHBOARD_API_KEY;
}
// ── Health ──────────────────────────────────────────────────────────
app.get("/healthz", (c) => c.json({ ok: true }));
// ── Gateway management (GATEWAY_SECRET auth) ────────────────────────
const gateway = new Hono<Env>();
gateway.post("/register", async (c) => {
const body = await c.req.json<{
name?: string;
url?: string;
secret?: string;
agentToken?: string;
}>();
const { name, url, secret, agentToken } = body;
if (!name || !url) {
return c.json({ error: "name and url required" }, 400);
}
if (secret !== c.env.GATEWAY_SECRET) {
return c.json({ error: "unauthorized" }, 401);
}
const existing = await c.env.ENDPOINTS.get<EndpointRecord>(name, "json");
const now = Date.now();
const record: EndpointRecord = {
name,
url: url.replace(/\/+$/, ""), // strip trailing slash
agentToken: agentToken ?? existing?.agentToken ?? "",
registeredAt: existing?.registeredAt ?? now,
lastHeartbeat: now,
};
await c.env.ENDPOINTS.put(name, JSON.stringify(record), {
expirationTtl: TTL_SECONDS,
});
const status = existing ? 200 : 201;
return c.json({ registered: name }, status);
});
gateway.delete("/register/:name", async (c) => {
const auth = c.req.header("Authorization");
if (auth !== `Bearer ${c.env.GATEWAY_SECRET}`) {
return c.json({ error: "unauthorized" }, 401);
}
const name = c.req.param("name");
await c.env.ENDPOINTS.delete(name);
return c.json({ unregistered: name });
});
// endpoints requires dashboard auth
gateway.get("/endpoints", async (c) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
const list = await c.env.ENDPOINTS.list();
const endpoints: Array<{ name: string; url: string; status: string; lastHeartbeat: number }> = [];
for (const key of list.keys) {
const record = await c.env.ENDPOINTS.get<EndpointRecord>(key.name, "json");
if (record) {
const age = Date.now() - record.lastHeartbeat;
endpoints.push({
name: record.name,
url: record.url,
status: age < TTL_SECONDS * 1000 ? "online" : "offline",
lastHeartbeat: record.lastHeartbeat,
});
}
}
return c.json(endpoints);
});
app.route("/api/gateway", gateway);
// ── API proxy: /api/agents/:agent/* → agent's tunnel URL (dashboard auth) ──
app.all("/api/agents/:agent/*", async (c) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
const agent = c.req.param("agent");
const record = await c.env.ENDPOINTS.get<EndpointRecord>(agent, "json");
if (!record) {
return c.json({ error: "agent not found" }, 404);
}
// Build target URL: strip /api/:agent prefix, forward the rest
const url = new URL(c.req.url);
const pathAfterAgent = url.pathname.replace(`/api/agents/${agent}`, "");
const targetUrl = `${record.url}/api${pathAfterAgent}${url.search}`;
const headers = new Headers(c.req.raw.headers);
headers.delete("host");
headers.delete("Authorization"); // don't forward dashboard key to agent
if (record.agentToken) {
headers.set("X-Agent-Token", record.agentToken);
}
try {
const resp = await fetch(targetUrl, {
method: c.req.method,
headers,
body: c.req.method !== "GET" && c.req.method !== "HEAD" ? c.req.raw.body : undefined,
});
// Stream response back
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
});
} catch (err) {
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
}
});
export default app;
+12
View File
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "bundler",
"types": ["@cloudflare/workers-types"],
"strict": true,
"noEmit": true,
"skipLibCheck": true
},
"include": ["src"]
}
+9
View File
@@ -0,0 +1,9 @@
name = "workflow-gateway"
main = "src/index.ts"
compatibility_date = "2025-04-01"
[[kv_namespaces]]
binding = "ENDPOINTS"
id = "88b118d1cfab4c049f9c1684848811a3"
# GATEWAY_SECRET is set via `wrangler secret put`
+29
View File
@@ -0,0 +1,29 @@
# @uncaged/workflow-protocol
Shared workflow types, sentinel constants, and `Result` helpers.
## What This Package Does
It defines the cross-package contract for bundles and the engine: thread/step shapes, `WorkflowFn`, agent/extract contexts, descriptor types, and `CasStore` as an interface. Implementations (CAS store, CLI, extract) depend on these types so bundles stay decoupled from Node APIs.
## Key Exports
From `src/index.ts`:
- **Types:** `Result`, `CasStore`, `WorkflowRoleSchema`, `WorkflowRoleDescriptor`, `WorkflowDescriptor`, `RoleMeta`, `RoleOutput`, `StartStep`, `RoleStep`, `ThreadContext`, `ModeratorContext`, `AgentContext`, `ExtractContext`, `WorkflowCompletion`, `WorkflowResult`, `LlmProvider`, `ProviderConfig`, `ResolvedModel`, `WorkflowConfig`, `ExtractFn`, `AgentFn`, `AgentBinding`, `WorkflowRuntime`, `WorkflowFn`, `RoleDefinition`, `Moderator`, `WorkflowDefinition`, `AdvanceOutcome`
- **Constants:** `START`, `END`
- **Functions:** `ok`, `err`
## Dependencies
- **Peer:** `zod` ^4 — used in type positions for schemas (`ExtractFn`, `RoleDefinition`, etc.)
No workspace packages; this is the bottom layer.
## Usage
```typescript
import { END, START, type WorkflowFn, type ThreadContext } from "@uncaged/workflow-protocol";
```
Concrete `WorkflowFn` implementations are built with `@uncaged/workflow-runtime` (`createWorkflow`).

Some files were not shown because too many files have changed in this diff Show More