Compare commits

...

27 Commits

Author SHA1 Message Date
xiaoju 669af841e1 refactor: address review feedback for CLI restructure
- Extract shared module (shared.ts) — walkChain, expandDeep, etc. deduplicated
- Hide step read command (half-baked, not ready for users)
- Remove cmdThreadKill dead code
- Revert unrelated protocol type change
- Revert unrelated package.json change
- Fix unused imports (biome)

Refs #463
2026-05-24 11:32:47 +00:00
xiaoju 650313b1c2 feat(step): expand detail CAS refs by default in step list
Previously step list showed raw CAS refs for detail fields.
Now detail is recursively expanded (like output already was),
since every turn is individually hashed and walkable.

Refs #463
2026-05-24 11:12:22 +00:00
xiaoju c40007eeaf fix(agent-claude-code): add missing workflow-util dependency
The claude-code agent imports createLogger from @uncaged/workflow-util
but was missing the dependency declaration, causing test failures.
2026-05-24 11:04:02 +00:00
xiaoju 1f13b1e79c fix(cli): resolve lint errors and unused imports (#463)
Fix all lint errors flagged by biome check to ensure clean codebase.

## Changes

### Removed Unused Imports
- `packages/cli-workflow/src/commands/thread.ts`:
  - Removed `StartEntry` (moved to step.ts)
  - Removed `StepEntry` (moved to step.ts)
  - Removed `ThreadForkOutput` (moved to step.ts)
  - Removed `ThreadStepsOutput` (moved to step.ts)

- `packages/cli-workflow/src/cli.ts`:
  - Removed unused `yamlStringify` import from yaml package

### Fixed Unused Parameter
- `packages/cli-workflow/src/commands/step.ts`:
  - Prefixed unused `before` parameter with underscore in `cmdStepRead`
  - Parameter is part of the function signature for future use (awaiting #462)

### Fixed Import Order
- `packages/cli-workflow/src/__tests__/thread.test.ts`:
  - Reordered imports to follow biome's organization rules
  - Moved cmdStepShow import before cmdThreadRead imports

## Test Results
-  `bun run check` passes (typecheck + lint + log tags)
-  All 124 tests passing
-  Build completes successfully

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 10:50:49 +00:00
xiaoju 031c3aa632 docs(cli): add deprecation handlers and update documentation (#463)
Complete the CLI refactoring with deprecation error handlers, updated
help text, and comprehensive migration guide.

## Changes

### Deprecation Handlers
Add error handlers for all removed commands with helpful migration messages:
- `workflow put` → suggests `workflow add`
- `thread step` → suggests `thread exec`
- `thread steps` → suggests `step list`
- `thread step-details` → suggests `step show`
- `thread fork` → suggests `step fork`
- `thread kill` → suggests `thread stop` or `thread cancel`
- `thread running` → suggests `thread list --status running`

Error messages follow the format:
```
Error: Command 'X' has been removed.
Use 'Y' instead.

For more information, see: uwf help Y
```

### Help Documentation
Updated CLI help text to explain four-layer architecture:
- Main help shows architecture diagram with Chinese labels
- Command group descriptions reference layers:
  - `workflow` → "Workflow definitions (layer 1: templates)"
  - `thread` → "Thread execution (layer 2: instances)"
  - `step` → "Step results (layer 3: single cycle)"
- Deprecated commands appear in help with [DEPRECATED] tag

### README Updates
Comprehensive documentation updates:
- Added "Four-Layer Architecture" section with diagram
- Updated all command tables with new command names
- Added complete migration guide with:
  - Renamed commands table
  - Merged commands table
  - Split commands table
  - Moved commands table
  - Example deprecation error output
- Updated "Internal Structure" to show new step.ts module

## Testing
-  All 124 tests pass
-  Build completes successfully
-  Deprecation handlers tested manually
-  Help output verified for main, thread, and step commands

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 10:46:31 +00:00
xiaoju 7b50969307 refactor(cli): reorganize CLI commands into four-layer model (#463)
Implement comprehensive CLI refactoring to clarify the four-layer model:
workflow → thread → step → turn

## Breaking Changes

### Renamed Commands
- `uwf workflow put` → `uwf workflow add`
- `uwf thread step` → `uwf thread exec`

### Removed Commands
- `uwf thread running` (merged into `thread list --status running`)
- `uwf thread kill` (split into `thread stop` and `thread cancel`)

### Moved Commands
- `uwf thread steps` → `uwf step list`
- `uwf thread step-details` → `uwf step show`
- `uwf thread fork` → `uwf step fork`

## New Commands

### Thread Commands
- `uwf thread list --status <idle|running|completed>` - Filter threads by status
- `uwf thread stop <thread-id>` - Stop background execution (keep thread active)
- `uwf thread cancel <thread-id>` - Cancel thread (stop + archive to history)

### Step Command Group (New)
- `uwf step list <thread-id>` - List all steps in a thread
- `uwf step show <step-hash>` - Show step details
- `uwf step read <step-hash> [--before N]` - Read step output as markdown
- `uwf step fork <step-hash>` - Fork thread from a step

## Implementation Details

### Files Modified
- `packages/cli-workflow/src/commands/workflow.ts` - Renamed cmdWorkflowPut → cmdWorkflowAdd
- `packages/cli-workflow/src/commands/thread.ts`:
  - Renamed cmdThreadStep → cmdThreadExec
  - Added cmdThreadStop and cmdThreadCancel (split from cmdThreadKill)
  - Updated cmdThreadList to support --status filter with idle/running/completed
  - Removed cmdThreadSteps, cmdThreadStepDetails, cmdThreadFork
- `packages/cli-workflow/src/commands/step.ts` - New module with:
  - cmdStepList (moved from cmdThreadSteps)
  - cmdStepShow (moved from cmdThreadStepDetails)
  - cmdStepFork (moved from cmdThreadFork)
  - cmdStepRead (new, stub implementation pending #462)
- `packages/cli-workflow/src/cli.ts` - Updated all CLI command registrations

### Tests Updated
- `packages/cli-workflow/src/__tests__/thread-step-count.test.ts` - Updated references from "thread step" to "thread exec"
- `packages/cli-workflow/src/__tests__/thread.test.ts` - Updated imports to use cmdStepShow from step.ts

## Test Results
All 124 tests pass in cli-workflow package.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 10:40:32 +00:00
xiaoju fc6072c28c Merge pull request 'feat: use git worktree for isolated development in solve-issue workflow' (#465) from fix/464-worktree-isolation into main 2026-05-24 10:27:51 +00:00
xiaoju b0e3f4a363 feat: use git worktree for isolated development in solve-issue workflow
All roles (developer, reviewer, tester, committer) now work in
~/repos/workflow-worktrees/fix/<issue>-<slug> instead of modifying
the main working directory. Prevents self-destructive edits.

Fixes #464
2026-05-24 10:22:25 +00:00
xiaoju 38112053a0 Merge pull request 'fix(agent-kit): separate session cache per agent' (#462) from fix/461-per-agent-session-cache into main 2026-05-24 09:19:50 +00:00
xiaoju 1d174ee5c9 fix(agent-kit): separate session cache per agent
Each agent now maintains its own session cache file instead of sharing
a single agent-sessions.json. This prevents session ID conflicts when
multiple agents operate on the same thread+role pair.

Changes:
- getCachePath() now takes agentName parameter
- getCachedSessionId/setCachedSessionId require agentName as first param
- Cache files named <agent>-sessions.json (e.g., hermes-sessions.json)
- Agent wrappers inject their agent name into cache calls
- Add comprehensive tests for session cache isolation
- Handle malformed JSON gracefully (treat as empty cache)

Fixes #461
2026-05-24 09:16:06 +00:00
xiaoju 6e3b32ca34 Merge pull request 'fix(cli): replace markdown headings with XML tags in thread read output' (#460) from fix/459-xml-tag-isolation into main 2026-05-24 08:44:47 +00:00
xiaoju 932bbe5c41 fix(cli): replace markdown headings with XML tags in thread read output
Changed uwf thread read to wrap role prompts and agent outputs in XML tags
(<prompt> and <output>) instead of markdown headings (### Prompt, ### Content).
This prevents Claude Code from treating step outputs as structural headings.

- Updated formatStepPrompt to use <prompt>...</prompt> tags
- Updated formatStepContent to use <output>...</output> tags
- Added comprehensive test suite in thread-read-xml-tags.test.ts
- Updated existing tests to verify XML tag behavior

Fixes #459

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 08:04:34 +00:00
xiaomo 9440b9af82 Merge pull request 'chore: fix biome noExcessiveCognitiveComplexity warnings' (#458) from fix/444-biome-complexity-warnings into main 2026-05-24 07:30:41 +00:00
xiaoju f96d6eb7c4 refactor(agent-builtin): reduce cognitive complexity in loop.ts
Refactored runBuiltinLoop function to reduce cognitive complexity from 30 to below 15 by extracting helper functions:

- shouldInjectDeadlineWarning: checks if deadline warning should be shown
- shouldProcessToolCalls: determines if tool calls should be processed
- extractFinalText: extracts last assistant message content
- injectDeadlineWarning: injects deadline warning message
- handleTextOnlyTurn: handles text-only turn logic
- handleToolCallTurn: handles tool call turn logic
- processLoopIteration: processes a single loop iteration

Added 24 new unit tests for the extracted helper functions, bringing total test count to 41 (all passing). All existing behavior is preserved.

Fixes #444

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 05:53:55 +00:00
xiaomo 95102941f1 Merge pull request 'feat(cli): thread step --background + thread running' (#457) from fix/456-thread-step-background into main 2026-05-24 05:33:56 +00:00
xiaoju 521d908719 feat(cli): add background thread execution and running threads query
This commit implements issue #456, adding two related capabilities to the uwf CLI:

1. **Background execution mode** for `uwf thread step` (via `--background` flag)
   - Spawns agent execution in a detached background process
   - Returns immediately with thread ID and background status
   - Maintains marker files to track running processes
   - Supports `--count` option to run multiple steps in background
   - Prevents concurrent execution of the same thread

2. **Running threads query** command (`uwf thread running`)
   - Lists all threads currently executing in background
   - Returns thread ID, workflow, current role, PID, and start time
   - Automatically filters out stale markers (dead processes)
   - Empty list when no threads are running

**Key changes:**

- **workflow-protocol**: Added `RunningThreadItem`, `RunningThreadsOutput` types
  Updated `StepOutput` to include `background: boolean | null` field

- **cli-workflow/background**: New module for process management
  - Marker file creation/deletion (atomic operations)
  - PID liveness checking
  - Stale marker cleanup
  - Running threads query

- **cli-workflow/commands/thread**:
  - Updated `cmdThreadStep` to support `--background` and `--_background-worker` flags
  - Added `cmdThreadStepBackground` for spawning detached processes
  - Added `cmdThreadRunning` to list running threads
  - Updated `cmdThreadKill` to terminate background processes

- **cli-workflow/cli**: Added CLI routing for new commands and flags

**Integration:**
- `uwf thread kill` now terminates background processes before archiving
- Foreground execution checks for existing background process and fails if found
- Background worker creates/cleans up marker files automatically
- Marker files stored in `~/.uncaged/workflow/running/*.json`

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-24 05:28:29 +00:00
xiaoju 02a2c00175 refactor: replace UWF_EDGE_PROMPT env var with named CLI args
Agent adapters now use named parameters:
  uwf-<agent> --thread <id> --role <role> --prompt <text>

Instead of positional args + env var:
  UWF_EDGE_PROMPT=... uwf-<agent> <thread-id> <role>

Changes:
- workflow-agent-kit/src/run.ts: parseArgv uses named --thread/--role/--prompt
- workflow-agent-kit/src/context.ts: edgePrompt passed as parameter, not read from env
- cli-workflow/src/commands/thread.ts: spawnAgent passes named args

小橘 <xiaoju@shazhou.work>
2026-05-24 04:31:44 +00:00
xiaoju 8ca7708a12 fix: add cas_ref format to claude-code-detail turns schema
The turns array items in CLAUDE_CODE_DETAIL_SCHEMA were missing
format: 'cas_ref', so expandDeep in step-details couldn't resolve
turn hashes to their payloads. Hermes schema already had this.

小橘 <xiaoju@shazhou.work>
2026-05-24 04:17:29 +00:00
xiaomo 0fdc0fdec3 Merge pull request 'refactor(workflow-dashboard): reduce cyclomatic complexity in editor' (#455) from fix/449-reduce-dashboard-complexity into main 2026-05-24 03:44:08 +00:00
xiaomo d6eaf3fdc7 Merge pull request 'refactor: reduce cognitive complexity in session-detail and acp-client' (#454) from fix/448-reduce-complexity into main 2026-05-24 03:44:06 +00:00
xingyue 5dc2352ac5 fix(workflow-dashboard): replace optional properties with T | null in handlers.ts
Per CLAUDE.md convention, use `string | null` instead of `?:` in the
isFirstConditionalSibling helper function parameter types.

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
2026-05-24 00:52:54 +08:00
xingyue 39e2ab7f0d refactor(workflow-dashboard): reduce cyclomatic complexity in editor (#449)
- Extract helpers in assignLayers (bfsLayers, processTarget, placeIsolatedNodes, maxLayerExcludingEnd) to reduce complexity from 26 → ≤15
- Extract isProtectedNode and isFirstConditionalSibling helpers in onBeforeDelete (20 → ≤15)
- Extract handleEscape and handleUndoRedo in handleKeyDown (23 → ≤15)
- Extract buildNodeMap, sortTransitions, buildStepEdges, pushStepEdges, assignTargetHandles in transIn (33 → ≤15)
- Extract validateRoleNodeEdges and hasEmptyConditionOnIfEdge in validateRoleNodes (22 → ≤15)
- Remove unused state parameter from Form component in add-node.tsx
- Add vitest + 19 tests covering all refactored functions

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
2026-05-24 00:50:15 +08:00
xingyue 221919448e refactor: reduce cognitive complexity in session-detail and acp-client
Extract helper functions to bring parseClaudeCodeStreamOutput (37→≤15)
and handleSessionUpdate (24→≤15) within complexity limits. Add tests.

Fixes #448
2026-05-24 00:41:39 +08:00
xingyue 68b82c9574 style: use dot notation for process.env.CLAUDE_MODEL 2026-05-24 00:25:08 +08:00
xiaomo 335b8a4ae6 Merge pull request 'refactor(cli): reduce cognitive complexity in setup.ts' (#453) from fix/445-reduce-setup-complexity into main 2026-05-23 16:18:10 +00:00
xingyue bf31fa0d03 refactor(cli): reduce cognitive complexity in setup.ts
Extracts inline logic into focused helper functions to bring
each function under the complexity threshold.

Fixes #445
2026-05-24 00:14:15 +08:00
xiaomo c39f2f3e63 Merge pull request 'refactor(cli): reduce cognitive complexity in thread.ts' (#452) from fix/446-reduce-thread-complexity into main 2026-05-23 15:55:03 +00:00
43 changed files with 4050 additions and 1055 deletions
+27 -11
View File
@@ -38,19 +38,26 @@ roles:
capabilities:
- coding
procedure: |
Before starting any work, ensure a clean worktree:
1. `git checkout main && git pull` to get the latest code
2. `git checkout -b fix/<issue-number>-<short-description>` to create a fresh branch
- If bounced back from reviewer or tester, reuse the existing branch and rebase onto latest main:
`git checkout main && git pull && git checkout <branch> && git rebase main`
IMPORTANT: Always work in a git worktree, NEVER modify the main working directory directly.
Before starting any work, set up an isolated worktree:
1. `cd ~/repos/workflow && git fetch origin` to get latest refs
2. First time (no existing branch):
- `git worktree add ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug> -b fix/<issue-number>-<short-slug> origin/main`
- `cd ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug> && bun install`
3. If bounced back from reviewer or tester (branch already exists):
- The worktree should already exist at `~/repos/workflow-worktrees/fix/<issue-number>-<short-slug>`
- `cd ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug>`
- `git fetch origin && git rebase origin/main`
4. ALL subsequent work must happen inside the worktree directory.
Then implement TDD:
3. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
4. If bounced back from reviewer or tester: read the previous role's output to understand what needs fixing
5. Write tests first based on the spec
6. Implement the code to make tests pass
7. Ensure `bun run build` passes with no errors
8. Run `bun test` to verify all tests pass
5. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
6. If bounced back from reviewer or tester: read the previous role's output to understand what needs fixing
7. Write tests first based on the spec
8. Implement the code to make tests pass
9. Ensure `bun run build` passes with no errors
10. Run `bun test` to verify all tests pass
output: "List all files changed and provide a summary. Frontmatter must include: status (done or failed)."
frontmatter:
type: object
@@ -66,6 +73,8 @@ roles:
- code-review
- static-analysis
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
Before reviewing, verify the git branch:
1. Run `git branch --show-current` — confirm the branch name references the issue number being worked on
2. If the branch doesn't correspond to the issue, flag it in your output and reject
@@ -99,6 +108,8 @@ roles:
capabilities:
- testing
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
1. Run `bun test` for automated test verification
2. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
3. Verify each scenario in the spec is covered and passing
@@ -119,6 +130,8 @@ roles:
goal: "You are a committer agent. You create a clean commit and push a PR linking the original issue."
capabilities: []
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
Note: You inherit the developer's worktree and branch. Do NOT create a new branch.
1. Stage all changes: `git add -A`
2. Commit with a descriptive message referencing the issue: `git commit -m "type: description\n\nFixes #N"`
@@ -126,6 +139,9 @@ roles:
- If push hook fails: capture the error log in your output, mark hook_failed
4. On push success: create a PR via `tea pr create --title "..." --description "..."`
- PR description must follow the project template: What / Why / Changes / Ref sections, with `Fixes #N` in Ref
5. After PR creation, clean up the worktree:
- `cd ~/repos/workflow`
- `git worktree remove ~/repos/workflow-worktrees/fix/<issue-number>-<slug>`
output: "Include PR URL on success or error log on failure. Frontmatter must include: success (true or false)."
frontmatter:
type: object
+91 -13
View File
@@ -6,6 +6,18 @@
Layer 4 entry point for the workflow engine. The `uwf` binary orchestrates one step per invocation: load thread head from `threads.yaml`, run the moderator, spawn the configured agent CLI, run extract, append a CAS step node, and update the head pointer (or archive when `$END`).
### Four-Layer Architecture
```
workflow → thread → step → turn
模板定义 执行实例 单步结果 agent内部交互
```
- **Workflow** (layer 1): YAML template with roles and routing graph
- **Thread** (layer 2): Single workflow execution instance
- **Step** (layer 3): One moderator→agent→extract cycle
- **Turn** (layer 4): Agent-internal interactions (use `step read` or CAS to inspect)
This package has no library `src/index.ts` — it is consumed as a CLI binary only.
**Dependencies:** `@uncaged/json-cas`, `@uncaged/json-cas-fs`, `@uncaged/workflow-agent-kit`, `@uncaged/workflow-moderator`, `@uncaged/workflow-protocol`, `@uncaged/workflow-util`, `commander`, `dotenv`, `yaml`
@@ -30,34 +42,53 @@ bun link packages/cli-workflow
-h, --help Show help
```
### Thread
### Thread (Layer 2: Execution Instances)
| Command | Description |
|---------|-------------|
| `uwf thread start <workflow> -p <prompt>` | Create a thread without executing |
| `uwf thread step <thread-id> [--agent <cmd>] [-c <count>]` | Execute one or more moderator→agent→extract cycles |
| `uwf thread exec <thread-id> [--agent <cmd>] [-c <count>] [--background]` | Execute one or more moderator→agent→extract cycles |
| `uwf thread show <thread-id>` | Show thread head pointer |
| `uwf thread list [--all]` | List active threads (`--all` includes archived) |
| `uwf thread steps <thread-id>` | List all steps chronologically |
| `uwf thread list [--status <idle\|running\|completed>]` | List threads, optionally filtered by status |
| `uwf thread read <thread-id> [--quota N] [--before <hash>] [--start]` | Render thread as readable markdown |
| `uwf thread fork <step-hash>` | Fork from a specific step |
| `uwf thread step-details <step-hash>` | Dump full detail node as YAML |
| `uwf thread kill <thread-id>` | Terminate and archive |
| `uwf thread stop <thread-id>` | Stop background execution (keep thread active) |
| `uwf thread cancel <thread-id>` | Cancel thread (stop + archive to history) |
Examples:
```bash
uwf thread start solve-issue -p "Fix the login redirect bug"
uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV --background
uwf thread list --status running
uwf thread read 01ARZ3NDEKTSV4RRFFQ69G5FAV --quota 8000
uwf thread stop 01ARZ3NDEKTSV4RRFFQ69G5FAV
```
### Workflow
### Step (Layer 3: Single Cycle Results)
| Command | Description |
|---------|-------------|
| `uwf workflow put <file.yaml>` | Register a workflow from YAML |
| `uwf step list <thread-id>` | List all steps in a thread chronologically |
| `uwf step show <step-hash>` | Show step metadata and frontmatter |
| `uwf step read <step-hash> [--before N]` | Read step output as markdown |
| `uwf step fork <step-hash>` | Fork a thread from a specific step |
Examples:
```bash
uwf step list 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf step show 32GCDE899RRQ3
uwf step read 32GCDE899RRQ3 --before 3
uwf step fork 32GCDE899RRQ3
```
### Workflow (Layer 1: Templates)
| Command | Description |
|---------|-------------|
| `uwf workflow add <file.yaml>` | Register a workflow from YAML |
| `uwf workflow show <name-or-hash>` | Show workflow definition |
| `uwf workflow list` | List registered workflows |
@@ -99,6 +130,52 @@ Config: `~/.uncaged/workflow/config.yaml`. API keys: `~/.uncaged/workflow/.env`.
| `uwf log show [--thread <id>] [--process <pid>] [--date YYYY-MM-DD]` | Show filtered log entries |
| `uwf log clean [--before YYYY-MM-DD]` | Delete old log files |
## Migration Guide
### Breaking Changes (v0.x → v1.x)
The CLI was reorganized to clarify the four-layer architecture. **No backward compatibility** — old commands have been removed.
#### Renamed Commands
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `workflow put` | `workflow add` | More intuitive verb |
| `thread step` | `thread exec` | Eliminates ambiguity with "step" noun |
| `thread list --all` | `thread list --status completed` | Unified status filtering |
#### Removed Commands (Merged)
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `thread running` | `thread list --status running` | Merged into unified list |
#### Removed Commands (Split)
| Old Command | New Commands | Notes |
|------------|-------------|-------|
| `thread kill` | `thread stop` or `thread cancel` | `stop` keeps thread active, `cancel` archives it |
#### Moved Commands
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `thread steps` | `step list` | Moved to step layer |
| `thread step-details` | `step show` | Moved to step layer |
| `thread fork` | `step fork` | Moved to step layer (forks are step-based) |
#### Deprecation Errors
Old commands now show helpful error messages:
```bash
$ uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV
Error: Command 'thread step' has been removed.
Use 'thread exec' instead.
For more information, see: uwf help thread exec
```
## Internal Structure
```
@@ -109,8 +186,9 @@ src/
├── validate.ts Workflow YAML validation
├── schemas.ts CLI-local schema registration
└── commands/
├── thread.ts Thread lifecycle and step execution
├── workflow.ts Workflow registry (put/show/list)
├── thread.ts Thread lifecycle and exec
├── step.ts Step operations (list/show/read/fork)
├── workflow.ts Workflow registry (add/show/list)
├── cas.ts CAS inspection and schema ops
├── setup.ts Interactive/non-interactive setup
├── skill.ts Built-in skill references
@@ -0,0 +1,381 @@
import { mkdirSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, test, vi } from "vitest";
import {
_discoverAgents,
_isBackspace,
_isTerminator,
_parseWhichOutput,
_printModelMenu,
_printProviderMenu,
_printValidationResult,
_resolveModelChoice,
_resolveProviderChoice,
_searchPathDirs,
} from "../commands/setup.js";
// ──────────────────────────────────────────────────────────────────────────────
// 1a. _searchPathDirs
// ──────────────────────────────────────────────────────────────────────────────
describe("_searchPathDirs", () => {
test("returns empty array for empty PATH", async () => {
const result = await _searchPathDirs("");
expect(result).toEqual([]);
});
test("finds uwf-hermes in a single dir", async () => {
const dir = mkdirSync(join(tmpdir(), `uwf-test-${Date.now()}`), { recursive: true }) as
| string
| undefined;
const actualDir = dir ?? join(tmpdir(), `uwf-test-${Date.now()}`);
mkdirSync(actualDir, { recursive: true });
const filePath = join(actualDir, "uwf-hermes");
writeFileSync(filePath, "#!/bin/sh\n", { mode: 0o755 });
const result = await _searchPathDirs(actualDir);
expect(result).toContain("uwf-hermes");
});
test("skips non-uwf- prefixed binaries", async () => {
const dir = join(tmpdir(), `uwf-test-${Date.now()}-2`);
mkdirSync(dir, { recursive: true });
writeFileSync(join(dir, "hermes"), "#!/bin/sh\n", { mode: 0o755 });
writeFileSync(join(dir, "uwf-hermes"), "#!/bin/sh\n", { mode: 0o755 });
const result = await _searchPathDirs(dir);
expect(result).toEqual(["uwf-hermes"]);
});
test("skips entry named exactly 'uwf'", async () => {
const dir = join(tmpdir(), `uwf-test-${Date.now()}-3`);
mkdirSync(dir, { recursive: true });
writeFileSync(join(dir, "uwf"), "#!/bin/sh\n", { mode: 0o755 });
writeFileSync(join(dir, "uwf-hermes"), "#!/bin/sh\n", { mode: 0o755 });
const result = await _searchPathDirs(dir);
expect(result).toEqual(["uwf-hermes"]);
});
test("skips non-executable files", async () => {
const dir = join(tmpdir(), `uwf-test-${Date.now()}-4`);
mkdirSync(dir, { recursive: true });
writeFileSync(join(dir, "uwf-foo"), "#!/bin/sh\n", { mode: 0o644 });
const result = await _searchPathDirs(dir);
expect(result).toEqual([]);
});
test("deduplicates across PATH dirs", async () => {
const dir1 = join(tmpdir(), `uwf-test-${Date.now()}-5a`);
const dir2 = join(tmpdir(), `uwf-test-${Date.now()}-5b`);
mkdirSync(dir1, { recursive: true });
mkdirSync(dir2, { recursive: true });
writeFileSync(join(dir1, "uwf-hermes"), "#!/bin/sh\n", { mode: 0o755 });
writeFileSync(join(dir2, "uwf-hermes"), "#!/bin/sh\n", { mode: 0o755 });
const result = await _searchPathDirs(`${dir1}:${dir2}`);
expect(result).toEqual(["uwf-hermes"]);
});
test("returns sorted array", async () => {
const dir = join(tmpdir(), `uwf-test-${Date.now()}-6`);
mkdirSync(dir, { recursive: true });
writeFileSync(join(dir, "uwf-zoo"), "#!/bin/sh\n", { mode: 0o755 });
writeFileSync(join(dir, "uwf-alpha"), "#!/bin/sh\n", { mode: 0o755 });
writeFileSync(join(dir, "uwf-mid"), "#!/bin/sh\n", { mode: 0o755 });
const result = await _searchPathDirs(dir);
expect(result).toEqual(["uwf-alpha", "uwf-mid", "uwf-zoo"]);
});
test("skips inaccessible/nonexistent directories silently", async () => {
const result = await _searchPathDirs("/nonexistent-dir-xyz-abc-12345");
expect(result).toEqual([]);
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 1b. _parseWhichOutput
// ──────────────────────────────────────────────────────────────────────────────
describe("_parseWhichOutput", () => {
test("returns empty array for empty string", () => {
expect(_parseWhichOutput("")).toEqual([]);
});
test("parses single path", () => {
expect(_parseWhichOutput("/usr/local/bin/uwf-hermes")).toEqual(["uwf-hermes"]);
});
test("parses multiple paths", () => {
expect(_parseWhichOutput("/usr/local/bin/uwf-hermes\n/usr/bin/uwf-claude-code")).toEqual([
"uwf-claude-code",
"uwf-hermes",
]);
});
test("deduplicates identical basenames from different dirs", () => {
expect(_parseWhichOutput("/a/uwf-hermes\n/b/uwf-hermes")).toEqual(["uwf-hermes"]);
});
test("skips blank lines", () => {
expect(_parseWhichOutput("/a/uwf-hermes\n\n/b/uwf-cursor")).toEqual([
"uwf-cursor",
"uwf-hermes",
]);
});
test("skips entry named exactly 'uwf'", () => {
expect(_parseWhichOutput("/usr/bin/uwf")).toEqual([]);
});
test("skips basenames not starting with uwf-", () => {
expect(_parseWhichOutput("/usr/bin/node")).toEqual([]);
});
test("returns sorted array", () => {
expect(_parseWhichOutput("/a/uwf-zoo\n/a/uwf-alpha")).toEqual(["uwf-alpha", "uwf-zoo"]);
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 2a. _isTerminator
// ──────────────────────────────────────────────────────────────────────────────
describe("_isTerminator", () => {
test("\\n is a terminator", () => {
expect(_isTerminator("\n")).toBe(true);
});
test("\\r is a terminator", () => {
expect(_isTerminator("\r")).toBe(true);
});
test("\\u0004 (EOT) is a terminator", () => {
expect(_isTerminator("")).toBe(true);
});
test("regular char is not a terminator", () => {
expect(_isTerminator("a")).toBe(false);
});
test("empty string is not a terminator", () => {
expect(_isTerminator("")).toBe(false);
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 2b. _isBackspace
// ──────────────────────────────────────────────────────────────────────────────
describe("_isBackspace", () => {
test("\\u007F is a backspace", () => {
expect(_isBackspace("")).toBe(true);
});
test("\\b is a backspace", () => {
expect(_isBackspace("\b")).toBe(true);
});
test("regular char is not a backspace", () => {
expect(_isBackspace("x")).toBe(false);
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 3a. _printProviderMenu
// ──────────────────────────────────────────────────────────────────────────────
describe("_printProviderMenu", () => {
afterEach(() => {
vi.restoreAllMocks();
});
const providers = [
{ name: "openai", label: "OpenAI", baseUrl: "https://api.openai.com/v1" },
{ name: "xai", label: "xAI", baseUrl: "https://api.x.ai/v1" },
] as const;
test("prints correct number of lines (one per provider + custom)", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printProviderMenu(providers);
// 2 providers + 1 custom = 3 lines
expect(lines.length).toBe(3);
});
test("custom option number = providers.length + 1", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printProviderMenu(providers);
const lastLine = lines[lines.length - 1] ?? "";
expect(lastLine).toMatch(/3\)/);
});
test("each provider line contains its label and baseUrl", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printProviderMenu(providers);
expect(lines[0]).toContain("OpenAI");
expect(lines[0]).toContain("https://api.openai.com/v1");
expect(lines[1]).toContain("xAI");
expect(lines[1]).toContain("https://api.x.ai/v1");
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 3b. _resolveProviderChoice
// ──────────────────────────────────────────────────────────────────────────────
describe("_resolveProviderChoice", () => {
const providers = [
{ name: "openai", label: "OpenAI", baseUrl: "https://api.openai.com/v1" },
{ name: "xai", label: "xAI", baseUrl: "https://api.x.ai/v1" },
{ name: "deepseek", label: "DeepSeek", baseUrl: "https://api.deepseek.com/v1" },
] as const;
test("valid index 1 returns first provider", () => {
const result = _resolveProviderChoice("1", providers);
expect(result).toEqual({ providerName: "openai", baseUrl: "https://api.openai.com/v1" });
});
test("valid index N (last preset) returns last provider", () => {
const result = _resolveProviderChoice("3", providers);
expect(result).toEqual({ providerName: "deepseek", baseUrl: "https://api.deepseek.com/v1" });
});
test("index providers.length+1 (custom) returns null", () => {
const result = _resolveProviderChoice("4", providers);
expect(result).toBeNull();
});
test("non-numeric string returns null", () => {
expect(_resolveProviderChoice("abc", providers)).toBeNull();
});
test("0 returns null (out of range)", () => {
expect(_resolveProviderChoice("0", providers)).toBeNull();
});
test("N+2 returns null (out of range)", () => {
expect(_resolveProviderChoice("5", providers)).toBeNull();
});
test("negative number returns null", () => {
expect(_resolveProviderChoice("-1", providers)).toBeNull();
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 3c. _resolveModelChoice
// ──────────────────────────────────────────────────────────────────────────────
describe("_resolveModelChoice", () => {
test("numeric input within range returns model at that index", () => {
expect(_resolveModelChoice("2", ["a", "b", "c"])).toBe("b");
});
test("numeric input out of range returns input as-is", () => {
expect(_resolveModelChoice("5", ["a"])).toBe("5");
});
test("non-numeric input returns input as-is", () => {
expect(_resolveModelChoice("gpt-4o", ["a", "b"])).toBe("gpt-4o");
});
test("numeric input 1 returns first model", () => {
expect(_resolveModelChoice("1", ["alpha", "beta"])).toBe("alpha");
});
test("empty models list with numeric input returns input as-is", () => {
expect(_resolveModelChoice("1", [])).toBe("1");
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 3d. _printModelMenu
// ──────────────────────────────────────────────────────────────────────────────
describe("_printModelMenu", () => {
afterEach(() => {
vi.restoreAllMocks();
});
test("prints all models — each model name appears in output", () => {
const output: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
output.push(msg);
});
const models = ["model-a", "model-b", "model-c"];
_printModelMenu(models, 100);
const combined = output.join("\n");
for (const m of models) {
expect(combined).toContain(m);
}
});
test("single column when termCols is very small", () => {
const output: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
output.push(msg);
});
_printModelMenu(["a", "b", "c"], 1);
// Each model on its own row → 3 lines
expect(output.length).toBe(3);
});
test("wide terminal fits multiple columns", () => {
const output: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
output.push(msg);
});
const models = Array.from({ length: 6 }, (_, i) => `m${i}`);
_printModelMenu(models, 200);
// With wide terminal and short names, should fit in fewer than 6 rows
expect(output.length).toBeLessThan(6);
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 3e. _printValidationResult
// ──────────────────────────────────────────────────────────────────────────────
describe("_printValidationResult", () => {
afterEach(() => {
vi.restoreAllMocks();
});
test("ok=true prints success message containing '✓'", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printValidationResult({ ok: true, error: null });
expect(lines.join("\n")).toContain("✓");
});
test("ok=false prints warning message containing '⚠'", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printValidationResult({ ok: false, error: "HTTP 401" });
expect(lines.join("\n")).toContain("⚠");
});
test("ok=false includes the error string in output", () => {
const lines: string[] = [];
vi.spyOn(console, "log").mockImplementation((msg: string) => {
lines.push(msg);
});
_printValidationResult({ ok: false, error: "HTTP 401" });
expect(lines.join("\n")).toContain("HTTP 401");
});
});
// ──────────────────────────────────────────────────────────────────────────────
// 4. Regression
// ──────────────────────────────────────────────────────────────────────────────
describe("_discoverAgents regression", () => {
test("returns an array (may be empty) — never throws", async () => {
const result = await _discoverAgents();
expect(Array.isArray(result)).toBe(true);
});
});
@@ -0,0 +1,683 @@
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { saveThreadsIndex } from "../store.js";
// ── schemas used in tests ────────────────────────────────────────────────────
const TURN_SCHEMA = {
title: "hermes-turn",
type: "object" as const,
required: ["index", "role", "content"],
properties: {
index: { type: "integer" as const },
role: { type: "string" as const },
content: { type: "string" as const },
toolCalls: {
anyOf: [
{ type: "array" as const, items: { type: "object" as const } },
{ type: "null" as const },
],
},
reasoning: { anyOf: [{ type: "string" as const }, { type: "null" as const }] },
},
additionalProperties: false,
};
const DETAIL_SCHEMA = {
title: "hermes-detail",
type: "object" as const,
required: ["sessionId", "model", "duration", "turnCount", "turns"],
properties: {
sessionId: { type: "string" as const },
model: { type: "string" as const },
duration: { type: "integer" as const },
turnCount: { type: "integer" as const },
turns: {
type: "array" as const,
items: { type: "string" as const, format: "cas_ref" },
},
},
additionalProperties: false,
};
// ── helpers ───────────────────────────────────────────────────────────────────
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
}
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
await bootstrap(store);
const [turn, detail] = await Promise.all([
putSchema(store, TURN_SCHEMA),
putSchema(store, DETAIL_SCHEMA),
]);
return { turn, detail };
}
// ── fixture ───────────────────────────────────────────────────────────────────
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
// ── thread read XML tag isolation ─────────────────────────────────────────────
describe("thread read XML tag isolation", () => {
test("scenario 1: wraps output in XML tags instead of heading", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
planner: {
description: "Planner",
goal: "You are a planning agent. Your task is to...",
capabilities: [],
procedure: "Plan the work.",
output: "Summarize the plan.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Fix issue #459",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content:
"---\nstatus: ready\nplan: CMWGHQKT58RY4\n---\n\n# Analysis Complete\n## Issue Summary\nThe issue requires XML tag isolation.",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "planner",
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
});
const threadId = "01JTEST0000000000000001" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should wrap output in XML tags
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
// Should not have ### Content heading
expect(markdown).not.toContain("### Content");
// Should preserve markdown headings inside output tags
expect(markdown).toContain("# Analysis Complete");
expect(markdown).toContain("## Issue Summary");
});
test("scenario 2: wraps prompt in XML tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
planner: {
description: "Planner",
goal: "You are a planning agent. Your task is to analyze and plan.",
capabilities: [],
procedure: "Plan the work.",
output: "Summarize the plan.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Fix issue",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "---\nstatus: ready\n---\n\nContent here...",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "planner",
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
});
const threadId = "01JTEST0000000000000002" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should wrap prompt in XML tags
expect(markdown).toContain("<prompt>");
expect(markdown).toContain("</prompt>");
expect(markdown).toContain("You are a planning agent. Your task is to analyze and plan.");
// Should not have ### Prompt heading
expect(markdown).not.toContain("### Prompt");
// Should wrap output in XML tags
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
});
test("scenario 3: same role repeated does not show prompt twice", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
writer: {
description: "Writer",
goal: "You are a writer agent.",
capabilities: [],
procedure: "Write content.",
output: "Summarize writing.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Write something",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "writer",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1 as CasRef,
role: "writer",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step2 });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should only show prompt tags once
const promptCount = (markdown.match(/<prompt>/g) ?? []).length;
expect(promptCount).toBe(1);
});
test("scenario 4: step with no detail shows no output tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
worker: {
description: "Worker",
goal: "You are a worker agent.",
capabilities: [],
procedure: "Do work.",
output: "Summarize work.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Do stuff",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should not have output tags
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
// Step header should still be displayed
expect(markdown).toContain("## Step 1: worker");
// Prompt should still be shown
expect(markdown).toContain("<prompt>");
});
test("scenario 5: empty content shows no output tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Do stuff",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
// A detail ref that doesn't exist → extractLastAssistantContent returns null
const missingDetailRef = "missingdetail0" as CasRef;
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: missingDetailRef,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should not have output tags
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
});
test("scenario 6: thread read with --start flag shows task section", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000006" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true);
// Should include task section
expect(markdown).toContain("# Thread");
expect(markdown).toContain("## Task");
expect(markdown).toContain("Initial prompt");
// Prompts should use XML tags
expect(markdown).toContain("<prompt>");
});
test("scenario 7: thread read with --before parameter", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
roleB: {
description: "Role B",
goal: "Goal for roleB",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
roleC: {
description: "Role C",
goal: "Goal for roleC",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1 as CasRef,
role: "roleB",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step3 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step2 as CasRef,
role: "roleC",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000007" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step3 });
const markdown = await cmdThreadRead(
tmpDir,
threadId,
THREAD_READ_DEFAULT_QUOTA,
step2 as CasRef,
false,
);
// Should only show roleA
expect(markdown).toContain("roleA");
expect(markdown).not.toContain("roleB");
expect(markdown).not.toContain("roleC");
// Should use XML tags
expect(markdown).toContain("<prompt>");
});
test("scenario 9: special characters in content are preserved", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
writer: {
description: "Writer",
goal: "You are a writer.",
capabilities: [],
procedure: "Write content.",
output: "Summarize.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Write something",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Content with <special> & characters > like <this>",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "writer",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000008" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Special characters should be preserved as-is
expect(markdown).toContain("Content with <special> & characters > like <this>");
});
test("scenario 10: quota limit with XML tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const steps: CasRef[] = [];
let prev: CasRef | null = null;
for (let i = 0; i < 5; i++) {
const step = (await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
})) as CasRef;
steps.push(step);
prev = step;
}
const threadId = "01JTEST0000000000000009" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[steps.length - 1]! });
// Use very small quota
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
// Should have skip hint
expect(markdown).toContain("earlier step");
// Should have XML tags for displayed steps
if (markdown.includes("<prompt>")) {
expect(markdown).toContain("</prompt>");
}
});
});
@@ -22,48 +22,48 @@ function runCli(args: string[]): { stdout: string; stderr: string; exitCode: num
}
}
describe("thread step --count CLI parsing", () => {
describe("thread exec --count CLI parsing", () => {
test("--help shows -c/--count option", () => {
const result = runCli(["thread", "step", "--help"]);
const result = runCli(["thread", "exec", "--help"]);
expect(result.stdout).toContain("--count");
expect(result.stdout).toContain("-c");
});
test("description says 'one or more steps'", () => {
const result = runCli(["thread", "step", "--help"]);
const result = runCli(["thread", "exec", "--help"]);
expect(result.stdout).toContain("one or more steps");
});
});
describe("cmdThreadStep count logic", () => {
describe("cmdThreadExec count logic", () => {
test("count=0 fails with validation error", () => {
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "0"]);
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "0"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("negative count fails with validation error", () => {
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "-1"]);
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "-1"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("non-integer count fails with validation error", () => {
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "1.5"]);
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "1.5"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("count=1 is the default (no -c flag)", () => {
// Without -c, it should attempt to run 1 step (failing on missing thread, not on count validation)
const result = runCli(["thread", "step", "FAKE_THREAD_ID"]);
const result = runCli(["thread", "exec", "FAKE_THREAD_ID"]);
expect(result.exitCode).not.toBe(0);
// Should NOT contain "positive integer" error — should fail on thread lookup instead
expect(result.stderr).not.toContain("positive integer");
});
test("count=3 passes validation (fails on thread lookup)", () => {
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "3"]);
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "3"]);
expect(result.exitCode).not.toBe(0);
// Should NOT contain "positive integer" error — should fail on thread/storage lookup
expect(result.stderr).not.toContain("positive integer");
@@ -5,9 +5,9 @@ import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdStepShow } from "../commands/step.js";
import {
cmdThreadRead,
cmdThreadStepDetails,
extractLastAssistantContent,
THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js";
@@ -198,10 +198,10 @@ describe("extractLastAssistantContent", () => {
});
});
// ── cmdThreadRead: ### Content section ───────────────────────────────────────
// ── cmdThreadRead: <output> section ──────────────────────────────────────────
describe("cmdThreadRead ### Content section", () => {
test("includes ### Content before ### Output when detail has assistant turns", async () => {
describe("cmdThreadRead <output> section", () => {
test("includes <output> tags when detail has assistant turns", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
@@ -264,12 +264,13 @@ describe("cmdThreadRead ### Content section", () => {
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).toContain("### Content");
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
expect(markdown).toContain("The assistant response text");
expect(markdown).not.toContain("### Output");
expect(markdown).not.toContain("### Content");
});
test("omits ### Content when detail has no matching assistant turns", async () => {
test("omits <output> tags when detail has no matching assistant turns", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
@@ -308,14 +309,15 @@ describe("cmdThreadRead ### Content section", () => {
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
expect(markdown).not.toContain("### Content");
expect(markdown).not.toContain("### Output");
});
});
// ── cmdThreadStepDetails ──────────────────────────────────────────────────────
// ── cmdStepShow ───────────────────────────────────────────────────────────────
describe("cmdThreadStepDetails", () => {
describe("cmdStepShow", () => {
test("returns expanded detail node with turns inlined", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
@@ -363,7 +365,7 @@ describe("cmdThreadStepDetails", () => {
agent: "uwf-hermes",
});
const result = await cmdThreadStepDetails(tmpDir, stepHash);
const result = await cmdStepShow(tmpDir, stepHash);
expect(result).toMatchObject({
sessionId: "sess42",
@@ -384,9 +386,9 @@ describe("cmdThreadStepDetails", () => {
});
});
// ── cmdThreadRead: ### Prompt deduplication ───────────────────────────────────
// ── cmdThreadRead: <prompt> deduplication ───────────────────────────────────
describe("cmdThreadRead ### Prompt deduplication", () => {
describe("cmdThreadRead <prompt> deduplication", () => {
async function makeThreadWithRoles(uwf: UwfStore, roles: string[]): Promise<string> {
const roleMap: Record<string, unknown> = {};
for (const r of [...new Set(roles)]) {
@@ -434,36 +436,36 @@ describe("cmdThreadRead ### Prompt deduplication", () => {
return stepHash;
}
test("same consecutive role shows ### Prompt once", async () => {
test("same consecutive role shows <prompt> once", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["writer", "writer"]);
const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/### Prompt/g) ?? []).length;
const count = (markdown.match(/<prompt>/g) ?? []).length;
expect(count).toBe(1);
});
test("different consecutive roles each show ### Prompt", async () => {
test("different consecutive roles each show <prompt>", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["planner", "coder"]);
const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/### Prompt/g) ?? []).length;
const count = (markdown.match(/<prompt>/g) ?? []).length;
expect(count).toBe(2);
});
test("non-consecutive same role shows ### Prompt twice", async () => {
test("non-consecutive same role shows <prompt> twice", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["roleA", "roleB", "roleA"]);
const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/### Prompt/g) ?? []).length;
const count = (markdown.match(/<prompt>/g) ?? []).length;
expect(count).toBe(2);
});
});
@@ -584,9 +586,9 @@ describe("cmdThreadRead start section / before / quota", () => {
// ── Tests that call process.exit must be last ─────────────────────────────────
describe("cmdThreadStepDetails (process.exit tests - must be last)", () => {
describe("cmdStepShow (process.exit tests - must be last)", () => {
test("throws when step hash does not exist", async () => {
await expect(cmdThreadStepDetails(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
await expect(cmdStepShow(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
});
test("before with unknown hash rejects", async () => {
@@ -0,0 +1,147 @@
import { mkdir, readdir, readFile, rename, rm, writeFile } from "node:fs/promises";
import { join } from "node:path";
import type { RunningThreadItem, ThreadId } from "@uncaged/workflow-protocol";
import type { RunningMarker } from "./types.js";
/**
* Get the path to the running markers directory.
*/
export function getRunningDir(storageRoot: string): string {
return join(storageRoot, "running");
}
/**
* Get the path to a specific thread's marker file.
*/
export function getMarkerPath(storageRoot: string, threadId: ThreadId): string {
return join(getRunningDir(storageRoot), `${threadId}.json`);
}
/**
* Check if a PID is still running.
* Returns true if the process exists, false otherwise.
*/
export function isPidAlive(pid: number): boolean {
try {
// process.kill with signal 0 checks existence without killing
process.kill(pid, 0);
return true;
} catch {
// ESRCH means process doesn't exist
return false;
}
}
/**
* Create a marker file for a running thread.
* Writes to a temp file in the same directory, then atomically renames.
*/
export async function createMarker(storageRoot: string, marker: RunningMarker): Promise<void> {
const runningDir = getRunningDir(storageRoot);
await mkdir(runningDir, { recursive: true });
const markerPath = getMarkerPath(storageRoot, marker.thread);
const tempPath = join(runningDir, `.${marker.thread}-${process.pid}.tmp`);
const content = JSON.stringify(marker, null, 2);
await writeFile(tempPath, content, "utf8");
await rename(tempPath, markerPath);
}
/**
* Delete a marker file for a thread.
*/
export async function deleteMarker(storageRoot: string, threadId: ThreadId): Promise<void> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
await rm(markerPath);
} catch {
// Ignore errors if file doesn't exist
}
}
/**
* Read a marker file. Returns null if file doesn't exist or is invalid.
*/
export async function readMarker(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
const content = await readFile(markerPath, "utf8");
const marker = JSON.parse(content) as RunningMarker;
return marker;
} catch {
return null;
}
}
/**
* List all running threads, filtering out stale markers.
*/
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadItem[]> {
const runningDir = getRunningDir(storageRoot);
let files: string[];
try {
files = await readdir(runningDir);
} catch {
// Directory doesn't exist or can't be read
return [];
}
const results: RunningThreadItem[] = [];
for (const filename of files) {
if (!filename.endsWith(".json")) {
continue;
}
const threadId = filename.slice(0, -5) as ThreadId;
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
// Invalid marker file
continue;
}
if (!isPidAlive(marker.pid)) {
// Stale marker - process no longer exists
await deleteMarker(storageRoot, threadId);
continue;
}
results.push({
thread: marker.thread,
workflow: marker.workflow,
pid: marker.pid,
startedAt: marker.startedAt,
});
}
return results;
}
/**
* Check if a thread is currently executing in the background.
* Returns the marker if running, null otherwise.
*/
export async function isThreadRunning(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
return null;
}
if (!isPidAlive(marker.pid)) {
// Stale marker
await deleteMarker(storageRoot, threadId);
return null;
}
return marker;
}
@@ -0,0 +1,11 @@
export {
createMarker,
deleteMarker,
getMarkerPath,
getRunningDir,
isPidAlive,
isThreadRunning,
listRunningThreads,
readMarker,
} from "./background.js";
export type { RunningMarker } from "./types.js";
@@ -0,0 +1,9 @@
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
/** Marker file stored at ~/.uncaged/workflow/running/<thread-id>.json */
export type RunningMarker = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
+201 -47
View File
@@ -1,8 +1,7 @@
#!/usr/bin/env bun
import type { ThreadId } from "@uncaged/workflow-protocol";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { Command } from "commander";
import { stringify as yamlStringify } from "yaml";
import {
cmdCasGet,
cmdCasHas,
@@ -17,19 +16,19 @@ import {
import { cmdLogClean, cmdLogList, cmdLogShow } from "./commands/log.js";
import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js";
import { cmdSkillCli } from "./commands/skill.js";
import { cmdStepFork, cmdStepList, cmdStepShow } from "./commands/step.js";
import {
cmdThreadFork,
cmdThreadKill,
cmdThreadCancel,
cmdThreadExec,
cmdThreadList,
cmdThreadRead,
cmdThreadShow,
cmdThreadStart,
cmdThreadStep,
cmdThreadStepDetails,
cmdThreadSteps,
cmdThreadStop,
THREAD_READ_DEFAULT_QUOTA,
type ThreadStatus,
} from "./commands/thread.js";
import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js";
import { cmdWorkflowAdd, cmdWorkflowList, cmdWorkflowShow } from "./commands/workflow.js";
import { formatOutput, type OutputFormat } from "./format.js";
import { resolveStorageRoot } from "./store.js";
@@ -52,20 +51,27 @@ const program = new Command();
const pkg = await import("../package.json", { with: { type: "json" } });
program
.name("uwf")
.description("Stateless workflow CLI")
.description(
"Stateless workflow CLI\n\n" +
"Four-layer architecture:\n" +
" workflow → thread → step → turn\n" +
" 模板定义 执行实例 单步结果 agent内部交互",
)
.version(pkg.default.version, "-V, --version");
program.option("--format <fmt>", "Output format: json or yaml", "json");
const workflow = program.command("workflow").description("Workflow registry and CAS");
const workflow = program
.command("workflow")
.description("Workflow definitions (layer 1: templates)");
workflow
.command("put")
.command("add")
.description("Register a workflow from YAML")
.argument("<file>", "Workflow YAML file")
.action((file: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdWorkflowPut(storageRoot, file);
const result = await cmdWorkflowAdd(storageRoot, file);
writeOutput(result);
});
});
@@ -93,7 +99,7 @@ workflow
});
});
const thread = program.command("thread").description("Thread lifecycle and execution");
const thread = program.command("thread").description("Thread execution (layer 2: instances)");
thread
.command("start")
@@ -109,24 +115,46 @@ thread
});
thread
.command("step")
.command("exec")
.description("Execute one or more steps")
.argument("<thread-id>", "Thread ULID")
.option("--agent <cmd>", "Override agent command")
.option("-c, --count <number>", "Number of steps to run (default: 1)")
.action((threadId: string, opts: { agent: string | undefined; count: string | undefined }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const count = opts.count !== undefined ? Number(opts.count) : 1;
const results = await cmdThreadStep(storageRoot, threadId, agentOverride, count);
if (results.length === 1) {
writeOutput(results[0]);
} else {
writeOutput(results);
}
});
});
.option("--background", "Run in background and return immediately")
.option("--_background-worker", "Internal flag for background worker process", false)
.action(
(
threadId: string,
opts: {
agent: string | undefined;
count: string | undefined;
background: boolean;
_backgroundWorker: boolean;
},
) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const count = opts.count !== undefined ? Number(opts.count) : 1;
const background = opts.background ?? false;
const backgroundWorker = opts._backgroundWorker ?? false;
const results = await cmdThreadExec(
storageRoot,
threadId,
agentOverride,
count,
background,
backgroundWorker,
);
if (results.length === 1) {
writeOutput(results[0]);
} else {
writeOutput(results);
}
});
},
);
thread
.command("show")
@@ -142,36 +170,49 @@ thread
thread
.command("list")
.description("List active threads")
.option("--all", "Include archived threads")
.action((opts: { all: boolean }) => {
.description("List threads")
.option("--status <status>", "Filter by status: idle, running, or completed")
.action((opts: { status: string | undefined }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadList(storageRoot, opts.all);
const validStatuses: ThreadStatus[] = ["idle", "running", "completed"];
let statusFilter: ThreadStatus | null = null;
if (opts.status !== undefined) {
if (!validStatuses.includes(opts.status as ThreadStatus)) {
process.stderr.write(
`Invalid status: ${opts.status}. Must be one of: idle, running, completed\n`,
);
process.exit(1);
}
statusFilter = opts.status as ThreadStatus;
}
const result = await cmdThreadList(storageRoot, statusFilter);
writeOutput(result);
});
});
thread
.command("kill")
.description("Terminate and archive a thread")
.command("stop")
.description("Stop background execution of a thread (keep thread active)")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadKill(storageRoot, threadId);
const result = await cmdThreadStop(storageRoot, threadId);
writeOutput(result);
});
});
thread
.command("steps")
.description("List all steps in a thread")
.command("cancel")
.description("Cancel a thread (stop execution and move to history)")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadSteps(storageRoot, threadId);
const result = await cmdThreadCancel(storageRoot, threadId);
writeOutput(result);
});
});
@@ -205,28 +246,141 @@ thread
},
);
thread
const step = program.command("step").description("Step results (layer 3: single cycle)");
step
.command("list")
.description("List all steps in a thread")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdStepList(storageRoot, threadId);
writeOutput(result);
});
});
step
.command("show")
.description("Show details of a specific step")
.argument("<step-hash>", "CAS hash of the StepNode")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const detail = await cmdStepShow(storageRoot, stepHash as CasRef);
writeOutput(detail);
});
});
// step read is not yet registered (half-baked, see step.ts cmdStepRead)
step
.command("fork")
.description("Fork a thread from a specific step")
.argument("<step-hash>", "CAS hash of the StartNode or StepNode to fork from")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadFork(storageRoot, stepHash);
const result = await cmdStepFork(storageRoot, stepHash as CasRef);
writeOutput(result);
});
});
// ── Deprecation Handlers ──────────────────────────────────────────────────────
// These commands have been removed. Show helpful error messages.
workflow
.command("put")
.description("[DEPRECATED] Use 'workflow add' instead")
.argument("<file>", "Workflow YAML file")
.action(() => {
process.stderr.write(`Error: Command 'workflow put' has been removed.
Use 'workflow add' instead.
For more information, see: uwf help workflow add
`);
process.exit(1);
});
thread
.command("step")
.description("[DEPRECATED] Use 'thread exec' instead")
.argument("<thread-id>", "Thread ULID")
.allowUnknownOption()
.action(() => {
process.stderr.write(`Error: Command 'thread step' has been removed.
Use 'thread exec' instead.
For more information, see: uwf help thread exec
`);
process.exit(1);
});
thread
.command("steps")
.description("[DEPRECATED] Use 'step list' instead")
.argument("<thread-id>", "Thread ULID")
.action(() => {
process.stderr.write(`Error: Command 'thread steps' has been removed.
Use 'step list' instead.
For more information, see: uwf help step list
`);
process.exit(1);
});
thread
.command("step-details")
.description("Dump the full detail node of a step as YAML")
.argument("<step-hash>", "CAS hash of the StepNode")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const detail = await cmdThreadStepDetails(storageRoot, stepHash);
process.stdout.write(yamlStringify(detail));
});
.description("[DEPRECATED] Use 'step show' instead")
.argument("<step-hash>", "Step hash")
.action(() => {
process.stderr.write(`Error: Command 'thread step-details' has been removed.
Use 'step show' instead.
For more information, see: uwf help step show
`);
process.exit(1);
});
thread
.command("fork")
.description("[DEPRECATED] Use 'step fork' instead")
.argument("<step-hash>", "Step hash")
.action(() => {
process.stderr.write(`Error: Command 'thread fork' has been removed.
Use 'step fork' instead.
For more information, see: uwf help step fork
`);
process.exit(1);
});
thread
.command("kill")
.description("[DEPRECATED] Use 'thread stop' or 'thread cancel' instead")
.argument("<thread-id>", "Thread ULID")
.action(() => {
process.stderr.write(`Error: Command 'thread kill' has been removed.
Use 'thread stop' to stop background execution (keep thread active),
or 'thread cancel' to cancel and archive the thread.
For more information, see:
uwf help thread stop
uwf help thread cancel
`);
process.exit(1);
});
thread
.command("running")
.description("[DEPRECATED] Use 'thread list --status running' instead")
.action(() => {
process.stderr.write(`Error: Command 'thread running' has been removed.
Use 'thread list --status running' instead.
For more information, see: uwf help thread list
`);
process.exit(1);
});
const skill = program.command("skill").description("Built-in skill references for agents");
+259 -162
View File
@@ -1,4 +1,4 @@
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { existsSync, mkdirSync, readdirSync, readFileSync, statSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { stdin as input, stdout as output } from "node:process";
import { createInterface } from "node:readline/promises";
@@ -137,75 +137,182 @@ function apiKeyEnvName(providerName: string): string {
return `${providerName.toUpperCase().replace(/[^A-Z0-9]/g, "_")}_API_KEY`;
}
// ──────────────────────────────────────────────────────────────────────────────
// Extracted helpers — _discoverAgents
// ──────────────────────────────────────────────────────────────────────────────
/**
* Scans directories from a PATH string for uwf-* executables.
*/
export async function _searchPathDirs(pathEnv: string): Promise<string[]> {
if (!pathEnv) return [];
const dirs = pathEnv.split(":").filter((d) => d.length > 0);
const agents = new Set<string>();
for (const dir of dirs) {
_scanDirForAgents(dir, agents);
}
return Array.from(agents).sort();
}
function _scanDirForAgents(dir: string, agents: Set<string>): void {
try {
if (!existsSync(dir)) return;
const entries = readdirSync(dir);
for (const entry of entries) {
if (!entry.startsWith("uwf-") || entry === "uwf") continue;
if (_isExecutableFile(join(dir, entry))) {
agents.add(entry);
}
}
} catch {
// Skip inaccessible directories
}
}
function _isExecutableFile(fullPath: string): boolean {
try {
const s = statSync(fullPath);
return s.isFile() && (s.mode & 0o111) !== 0;
} catch {
return false;
}
}
/**
* Parses the stdout of `which -a` into sorted unique basenames.
*/
export function _parseWhichOutput(text: string): string[] {
if (!text) return [];
const agents = new Set<string>();
for (const line of text.trim().split("\n")) {
if (!line) continue;
const basename = line.split("/").pop() ?? "";
if (basename.startsWith("uwf-") && basename !== "uwf") {
agents.add(basename);
}
}
return Array.from(agents).sort();
}
/**
* Discover uwf-* agent binaries in PATH.
* Returns sorted list of binary names (e.g., ["uwf-hermes", "uwf-claude-code"]).
*/
async function _discoverAgents(): Promise<string[]> {
export async function _discoverAgents(): Promise<string[]> {
try {
const agents = await _tryWhichDiscovery();
if (agents !== null) return agents;
return await _searchPathDirs(process.env.PATH ?? "");
} catch {
return [];
}
}
async function _tryWhichDiscovery(): Promise<string[] | null> {
try {
// Use which -a to find all uwf-* binaries in PATH
const proc = Bun.spawn(["which", "-a", "uwf-hermes", "uwf-claude-code", "uwf-cursor"], {
stdout: "pipe",
stderr: "pipe",
});
const text = await new Response(proc.stdout).text();
await proc.exited;
if (proc.exitCode !== 0) {
// Try alternative approach: search PATH directories manually
const pathEnv = process.env.PATH || "";
const pathDirs = pathEnv.split(":").filter((d) => d.length > 0);
const agents = new Set<string>();
for (const dir of pathDirs) {
try {
if (!existsSync(dir)) continue;
const { readdirSync, statSync } = await import("node:fs");
const entries = readdirSync(dir);
for (const entry of entries) {
if (!entry.startsWith("uwf-") || entry === "uwf") continue;
const fullPath = join(dir, entry);
try {
const stat = statSync(fullPath);
// Check if executable (owner, group, or other has execute bit)
if (stat.isFile() && (stat.mode & 0o111) !== 0) {
agents.add(entry);
}
} catch {
// Skip if can't stat
}
}
} catch {
// Skip inaccessible directories
}
}
return Array.from(agents).sort();
}
// Parse which output - each line is a path to a binary
const paths = text
.trim()
.split("\n")
.filter((line) => line.length > 0);
const agents = new Set<string>();
for (const path of paths) {
const basename = path.split("/").pop();
if (basename?.startsWith("uwf-") && basename !== "uwf") {
agents.add(basename);
}
}
return Array.from(agents).sort();
if (proc.exitCode !== 0) return null;
return _parseWhichOutput(text);
} catch {
// If all fails, return empty array
return [];
return null;
}
}
// ──────────────────────────────────────────────────────────────────────────────
// Extracted helpers — onData closure (promptSecret)
// ──────────────────────────────────────────────────────────────────────────────
/** Returns true for newline, carriage return, or EOF (EOT). */
export function _isTerminator(c: string): boolean {
return c === "\n" || c === "\r" || c === "";
}
/** Returns true for DEL or backspace. */
export function _isBackspace(c: string): boolean {
return c === "" || c === "\b";
}
// ──────────────────────────────────────────────────────────────────────────────
// Extracted helpers — cmdSetupInteractive
// ──────────────────────────────────────────────────────────────────────────────
type ProviderEntry = { name: string; label: string; baseUrl: string };
/** Prints the numbered provider list and custom option to stdout. */
export function _printProviderMenu(providers: readonly ProviderEntry[]): void {
const numWidth = String(providers.length + 1).length;
for (let i = 0; i < providers.length; i++) {
const p = providers[i];
if (!p) continue;
const num = String(i + 1).padStart(numWidth);
console.log(` ${num}) ${p.label.padEnd(28)} ${p.baseUrl}`);
}
const customNum = String(providers.length + 1).padStart(numWidth);
console.log(` ${customNum}) Custom (enter name and URL manually)\n`);
}
/** Resolves a numeric choice string to a preset provider, or null for custom/invalid. */
export function _resolveProviderChoice(
choice: string,
providers: readonly ProviderEntry[],
): { providerName: string; baseUrl: string } | null {
const n = Number.parseInt(choice, 10);
if (Number.isNaN(n) || n < 1 || n > providers.length) return null;
const p = providers[n - 1];
if (!p) return null;
return { providerName: p.name, baseUrl: p.baseUrl };
}
/** Resolves numeric index or literal model name to a model string. */
export function _resolveModelChoice(input: string, models: string[]): string {
const n = Number.parseInt(input, 10);
if (!Number.isNaN(n) && n >= 1 && n <= models.length) {
return models[n - 1] ?? input;
}
return input;
}
/** Prints the multi-column model list to stdout. */
export function _printModelMenu(models: string[], termCols: number): void {
const nw = String(models.length).length;
const maxLen = models.reduce((m, s) => Math.max(m, s.length), 0);
const colWidth = nw + 2 + maxLen + 4;
const cols = Math.max(1, Math.floor(termCols / colWidth));
const rows = Math.ceil(models.length / cols);
for (let r = 0; r < rows; r++) {
let line = "";
for (let c = 0; c < cols; c++) {
const idx = c * rows + r;
if (idx >= models.length) break;
const num = String(idx + 1).padStart(nw);
const name = (models[idx] ?? "").padEnd(maxLen);
line += ` ${num}) ${name} `;
}
console.log(line.trimEnd());
}
}
type ValidationResult = { ok: boolean; error: string | null };
/** Prints the model validation result to stdout. */
export function _printValidationResult(validation: ValidationResult): void {
if (validation.ok) {
console.log("✓ Model verified — connection successful.\n");
} else {
console.log(`\n⚠ Warning: Could not reach model — ${validation.error}`);
console.log(
" Config saved, but you may want to try a different model or check your API key.\n",
);
}
}
// ──────────────────────────────────────────────────────────────────────────────
/**
* Merge setup args into config.yaml structure. Non-destructive — preserves existing entries.
*/
@@ -281,6 +388,46 @@ export async function cmdSetup(args: SetupArgs): Promise<Record<string, unknown>
};
}
type SecretState = {
buf: string;
rawWasSet: boolean;
resolve: (value: string) => void;
onData: (chunk: string) => void;
};
function _handleSecretTerminator(state: SecretState): void {
if (process.stdin.isTTY) process.stdin.setRawMode(state.rawWasSet);
process.stdin.pause();
process.stdin.removeListener("data", state.onData);
process.stdout.write("\n");
state.resolve(state.buf.trim());
}
function _handleSecretBackspace(state: SecretState): void {
if (state.buf.length > 0) {
state.buf = state.buf.slice(0, -1);
process.stdout.write("\b \b");
}
}
function _handleSecretChar(c: string, state: SecretState): boolean {
if (_isTerminator(c)) {
_handleSecretTerminator(state);
return true;
}
if (_isBackspace(c)) {
_handleSecretBackspace(state);
return false;
}
if (c === "") {
if (process.stdin.isTTY) process.stdin.setRawMode(state.rawWasSet);
process.exit(130);
}
state.buf += c;
process.stdout.write("*");
return false;
}
/** Read a line with terminal echo disabled (for secrets). */
async function promptSecret(label: string): Promise<string> {
process.stdout.write(label);
@@ -292,33 +439,13 @@ async function promptSecret(label: string): Promise<string> {
process.stdin.resume();
process.stdin.setEncoding("utf8");
let buf = "";
const onData = (chunk: string) => {
const state: SecretState = { buf: "", rawWasSet, resolve, onData: () => {} };
state.onData = (chunk: string) => {
for (const c of chunk.toString()) {
if (c === "\n" || c === "\r" || c === "\u0004") {
if (process.stdin.isTTY) process.stdin.setRawMode(rawWasSet);
process.stdin.pause();
process.stdin.removeListener("data", onData);
process.stdout.write("\n");
resolve(buf.trim());
return;
}
if (c === "\u007F" || c === "\b") {
if (buf.length > 0) {
buf = buf.slice(0, -1);
process.stdout.write("\b \b");
}
continue;
}
if (c === "\u0003") {
if (process.stdin.isTTY) process.stdin.setRawMode(rawWasSet);
process.exit(130);
}
buf += c;
process.stdout.write("*");
if (_handleSecretChar(c, state)) return;
}
};
process.stdin.on("data", onData);
process.stdin.on("data", state.onData);
});
}
@@ -344,6 +471,56 @@ async function fetchModels(baseUrl: string, apiKey: string): Promise<string[]> {
}
}
async function _promptProviderSelection(
rl: ReturnType<typeof createInterface>,
): Promise<{ providerName: string; baseUrl: string }> {
console.log("Select a provider:\n");
_printProviderMenu(PRESET_PROVIDERS);
const choice = (await rl.question(`Choose [1-${PRESET_PROVIDERS.length + 1}]: `)).trim();
const choiceNum = Number.parseInt(choice, 10);
if (Number.isNaN(choiceNum) || choiceNum < 1 || choiceNum > PRESET_PROVIDERS.length + 1) {
throw new Error(`Invalid choice: ${choice}`);
}
const preset = _resolveProviderChoice(choice, PRESET_PROVIDERS);
if (preset) {
const selected = PRESET_PROVIDERS[choiceNum - 1];
if (selected) {
console.log(`\n → ${selected.label} (${selected.baseUrl})\n`);
}
return preset;
}
const providerName = (await rl.question("Provider name (e.g. my-proxy): ")).trim();
if (!providerName) throw new Error("Provider name required");
const baseUrl = (await rl.question("OpenAI-compatible API base URL: ")).trim();
if (!baseUrl) throw new Error("Base URL required");
return { providerName, baseUrl };
}
async function _promptModelSelection(
rl: ReturnType<typeof createInterface>,
baseUrl: string,
apiKey: string,
): Promise<string> {
console.log("\nFetching available models...");
const models = await fetchModels(baseUrl, apiKey);
if (models.length === 0) {
console.log("Could not fetch models. Enter model name manually.");
const model = (await rl.question("Default model (e.g. qwen-plus, gpt-4o): ")).trim();
if (!model) throw new Error("Model required");
return model;
}
console.log(`\nAvailable models (${models.length}):\n`);
_printModelMenu(models, process.stdout.columns || 100);
console.log(`\nChoose a number, or type a model name directly.`);
const modelInput = (await rl.question(`Default model [1-${models.length}]: `)).trim();
if (!modelInput) throw new Error("Model required");
return _resolveModelChoice(modelInput, models);
}
/**
* Interactive setup — prompts user for provider, API key, model.
*/
@@ -353,39 +530,7 @@ export async function cmdSetupInteractive(storageRoot: string): Promise<Record<s
try {
console.log("Configure LLM provider for uwf workflow agents.\n");
// 1. Provider selection
const numWidth = String(PRESET_PROVIDERS.length + 1).length;
console.log("Select a provider:\n");
for (let i = 0; i < PRESET_PROVIDERS.length; i++) {
const p = PRESET_PROVIDERS[i];
if (!p) continue;
const num = String(i + 1).padStart(numWidth);
console.log(` ${num}) ${p.label.padEnd(28)} ${p.baseUrl}`);
}
const customNum = String(PRESET_PROVIDERS.length + 1).padStart(numWidth);
console.log(` ${customNum}) Custom (enter name and URL manually)\n`);
const choice = (await rl.question(`Choose [1-${PRESET_PROVIDERS.length + 1}]: `)).trim();
const choiceNum = Number.parseInt(choice, 10);
if (Number.isNaN(choiceNum) || choiceNum < 1 || choiceNum > PRESET_PROVIDERS.length + 1) {
throw new Error(`Invalid choice: ${choice}`);
}
let providerName: string;
let baseUrl: string;
if (choiceNum <= PRESET_PROVIDERS.length) {
const selected = PRESET_PROVIDERS[choiceNum - 1];
if (!selected) throw new Error("Invalid selection");
providerName = selected.name;
baseUrl = selected.baseUrl;
console.log(`\n → ${selected.label} (${selected.baseUrl})\n`);
} else {
providerName = (await rl.question("Provider name (e.g. my-proxy): ")).trim();
if (!providerName) throw new Error("Provider name required");
baseUrl = (await rl.question("OpenAI-compatible API base URL: ")).trim();
if (!baseUrl) throw new Error("Base URL required");
}
const { providerName, baseUrl } = await _promptProviderSelection(rl);
// 2. API key
rl.close();
@@ -394,47 +539,8 @@ export async function cmdSetupInteractive(storageRoot: string): Promise<Record<s
// 3. Model selection
const rl2 = createInterface({ input, output });
console.log("\nFetching available models...");
const models = await fetchModels(baseUrl, apiKey);
let model: string;
if (models.length > 0) {
console.log(`\nAvailable models (${models.length}):\n`);
const nw = String(models.length).length;
// Multi-column layout
const maxLen = models.reduce((m, s) => Math.max(m, s.length), 0);
const colWidth = nw + 2 + maxLen + 4; // " N) name "
const termCols = process.stdout.columns || 100;
const cols = Math.max(1, Math.floor(termCols / colWidth));
const rows = Math.ceil(models.length / cols);
for (let r = 0; r < rows; r++) {
let line = "";
for (let c = 0; c < cols; c++) {
const idx = c * rows + r;
if (idx >= models.length) break;
const num = String(idx + 1).padStart(nw);
const name = (models[idx] ?? "").padEnd(maxLen);
line += ` ${num}) ${name} `;
}
console.log(line.trimEnd());
}
console.log(`\nChoose a number, or type a model name directly.`);
const modelInput = (await rl2.question(`Default model [1-${models.length}]: `)).trim();
if (!modelInput) throw new Error("Model required");
const modelNum = Number.parseInt(modelInput, 10);
if (!Number.isNaN(modelNum) && modelNum >= 1 && modelNum <= models.length) {
model = models[modelNum - 1] ?? modelInput;
} else {
model = modelInput;
}
} else {
console.log("Could not fetch models. Enter model name manually.");
model = (await rl2.question("Default model (e.g. qwen-plus, gpt-4o): ")).trim();
if (!model) throw new Error("Model required");
}
const model = await _promptModelSelection(rl2, baseUrl, apiKey);
rl2.close();
console.log(`${providerName}/${model}\n`);
const setupResult = await cmdSetup({
@@ -447,17 +553,8 @@ export async function cmdSetupInteractive(storageRoot: string): Promise<Record<s
// Show validation result
if (setupResult.validation && typeof setupResult.validation === "object") {
const v = setupResult.validation as { ok: boolean; error?: string };
if (v.ok) {
console.log("✓ Model verified — connection successful.\n");
} else {
console.log(`\n⚠ Warning: Could not reach model — ${v.error}`);
console.log(
" Config saved, but you may want to try a different model or check your API key.\n",
);
}
_printValidationResult(setupResult.validation as ValidationResult);
}
console.log("Setup complete! Get started:\n");
console.log(" uwf workflow put <workflow.yaml> Register a workflow");
console.log(' uwf thread start <name> -p "..." Start a thread');
@@ -0,0 +1,227 @@
import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas";
import { getSchema } from "@uncaged/json-cas";
import type {
CasRef,
StartNodePayload,
StepNodePayload,
ThreadId,
} from "@uncaged/workflow-protocol";
import { loadThreadsIndex, type UwfStore } from "../store.js";
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
stepsNewestFirst: StepNodePayload[];
headIsStart: boolean;
};
type OrderedStepItem = {
hash: CasRef;
payload: StepNodePayload;
timestamp: number;
};
function fail(message: string): never {
process.stderr.write(`${message}\n`);
process.exit(1);
}
function walkChain(uwf: UwfStore, headHash: CasRef): ChainState {
const headNode = uwf.store.get(headHash);
if (headNode === null) {
fail(`CAS node not found: ${headHash}`);
}
if (headNode.type === uwf.schemas.startNode) {
return {
startHash: headHash,
start: headNode.payload as StartNodePayload,
stepsNewestFirst: [],
headIsStart: true,
};
}
if (headNode.type !== uwf.schemas.stepNode) {
fail(`head ${headHash} is not a StartNode or StepNode`);
}
const stepsNewestFirst: StepNodePayload[] = [];
let hash: CasRef | null = headHash;
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null) {
fail(`CAS node not found while walking chain: ${hash}`);
}
if (node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
stepsNewestFirst.push(payload);
hash = payload.prev;
}
const newest = stepsNewestFirst[0];
if (newest === undefined) {
fail(`empty step chain at head ${headHash}`);
}
const startNode = uwf.store.get(newest.start);
if (startNode === null || startNode.type !== uwf.schemas.startNode) {
fail(`StartNode not found: ${newest.start}`);
}
return {
startHash: newest.start,
start: startNode.payload as StartNodePayload,
stepsNewestFirst,
headIsStart: false,
};
}
function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown {
const node = uwf.store.get(outputRef);
if (node === null) {
return {};
}
return node.payload;
}
/**
* Recursively expand all cas_ref fields in a CAS node's payload,
* replacing hash strings with the referenced node's expanded payload.
*/
function expandDeep(store: CasStore, hash: CasRef, visited?: Set<string>): unknown {
const seen = visited ?? new Set<string>();
if (seen.has(hash)) return hash; // cycle guard
seen.add(hash);
const node = store.get(hash);
if (node === null) return hash;
const schema = getSchema(store, node.type);
if (schema === null) return node.payload;
return expandValue(store, schema, node.payload, seen);
}
function expandCasRefField(store: CasStore, value: unknown, visited: Set<string>): unknown {
if (typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
return value;
}
function expandAnyOfField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!Array.isArray(schema.anyOf)) return value;
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
function expandArrayField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!schema.items || !Array.isArray(value)) return value;
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
function expandObjectField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) {
return value;
}
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
function expandValue(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (schema.format === "cas_ref") return expandCasRefField(store, value, visited);
if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited);
if (schema.type === "array") return expandArrayField(store, schema, value, visited);
return expandObjectField(store, schema, value, visited);
}
function collectOrderedSteps(
uwf: UwfStore,
headHash: CasRef,
chain: ChainState,
): OrderedStepItem[] {
let hash: CasRef | null = headHash;
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null || node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
hashToNode.set(hash, { payload, timestamp: node.timestamp });
hash = payload.prev;
}
let cur: CasRef | null = chain.headIsStart ? null : headHash;
const ordered: OrderedStepItem[] = [];
while (cur !== null) {
const entry = hashToNode.get(cur);
if (entry === undefined) {
break;
}
ordered.push({ hash: cur, ...entry });
cur = entry.payload.prev;
}
ordered.reverse();
return ordered;
}
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
return head;
}
export {
type ChainState,
collectOrderedSteps,
expandAnyOfField,
expandArrayField,
expandCasRefField,
expandDeep,
expandObjectField,
expandOutput,
expandValue,
fail,
type OrderedStepItem,
resolveHeadHash,
walkChain,
};
+145
View File
@@ -0,0 +1,145 @@
import type {
CasRef,
StartEntry,
StepEntry,
StepNodePayload,
ThreadForkOutput,
ThreadId,
ThreadStepsOutput,
} from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js";
import {
collectOrderedSteps,
expandDeep,
expandOutput,
fail,
resolveHeadHash,
walkChain,
} from "./shared.js";
/**
* List all steps in a thread (previously: thread steps)
*/
export async function cmdStepList(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadStepsOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const startNode = uwf.store.get(chain.startHash);
if (startNode === null) {
fail(`StartNode not found: ${chain.startHash}`);
}
const startEntry: StartEntry = {
hash: chain.startHash,
workflow: chain.start.workflow,
prompt: chain.start.prompt,
timestamp: startNode.timestamp,
};
const stepEntries: StepEntry[] = [];
const ordered = collectOrderedSteps(uwf, headHash, chain);
for (const item of ordered) {
stepEntries.push({
hash: item.hash,
role: item.payload.role,
output: expandOutput(uwf, item.payload.output),
detail: item.payload.detail ?? null,
agent: item.payload.agent,
timestamp: item.timestamp,
});
}
return {
thread: threadId,
workflow: chain.start.workflow,
steps: [startEntry, ...stepEntries],
};
}
/**
* Show details of a specific step (previously: thread step-details)
*/
export async function cmdStepShow(storageRoot: string, stepHash: CasRef): Promise<unknown> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.detail) {
fail(`step ${stepHash} has no detail`);
}
return expandDeep(uwf.store, payload.detail);
}
/**
* Fork a thread from a specific step (previously: thread fork)
*/
export async function cmdStepFork(
storageRoot: string,
stepHash: CasRef,
): Promise<ThreadForkOutput> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StartNode or StepNode`);
}
const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
return {
thread: newThreadId,
forkedFrom: {
step: stepHash,
},
};
}
/**
* Read a step's agent output as markdown (new command - requires #462)
* TODO: Implement once unified agent detail/turn schema is available
*/
export async function cmdStepRead(
storageRoot: string,
stepHash: CasRef,
_before: number | null = null,
): Promise<string> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.output) {
fail(`step ${stepHash} has no output`);
}
// TODO: Implement progressive turn reading with --before N
// For now, return a placeholder
const outputNode = uwf.store.get(payload.output);
if (outputNode === null) {
fail(`output node not found: ${payload.output}`);
}
// Return the output as JSON for now
// Once #462 is implemented, this will properly format frontmatter + markdown
return JSON.stringify(outputNode.payload, null, 2);
}
+213 -322
View File
@@ -1,8 +1,7 @@
import { execFileSync } from "node:child_process";
import { execFileSync, spawn } from "node:child_process";
import { access, readFile } from "node:fs/promises";
import { dirname, isAbsolute, resolve as resolvePath } from "node:path";
import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas";
import { getSchema, validate } from "@uncaged/json-cas";
import { validate } from "@uncaged/json-cas";
import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-agent-kit";
import { evaluate } from "@uncaged/workflow-moderator";
import type {
@@ -10,24 +9,26 @@ import type {
AgentConfig,
CasRef,
ModeratorContext,
StartEntry,
RunningThreadsOutput,
StartNodePayload,
StartOutput,
StepContext,
StepEntry,
StepNodePayload,
StepOutput,
ThreadForkOutput,
ThreadId,
ThreadListItem,
ThreadStepsOutput,
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/workflow-protocol";
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv";
import { parse, stringify } from "yaml";
import {
createMarker,
deleteMarker,
isThreadRunning,
listRunningThreads,
} from "../background/index.js";
import {
appendThreadHistory,
createUwfStore,
@@ -41,6 +42,14 @@ import {
type UwfStore,
} from "../store.js";
import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js";
import {
type ChainState,
collectOrderedSteps,
expandOutput,
fail,
type OrderedStepItem,
walkChain,
} from "./shared.js";
import { materializeWorkflowPayload } from "./workflow.js";
const END_ROLE = "$END";
@@ -52,35 +61,13 @@ const PL_AGENT_SPAWN = "R5J2W8N4";
const PL_AGENT_DONE = "C6P9E3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6";
const PL_BACKGROUND_START = "X7Q4W9M2";
function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null);
fail(message);
}
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
stepsNewestFirst: StepNodePayload[];
headIsStart: boolean;
};
type OrderedStepItem = {
hash: CasRef;
payload: StepNodePayload;
timestamp: number;
};
export type KillOutput = {
thread: ThreadId;
archived: boolean;
};
function fail(message: string): never {
process.stderr.write(`${message}\n`);
process.exit(1);
}
/**
* Check if a string looks like a file path (contains path separators or has .yaml/.yml extension).
*/
@@ -321,6 +308,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId,
head: activeHead,
done: false,
background: null,
};
}
@@ -331,232 +319,77 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId,
head: hist.head,
done: true,
background: null,
};
}
fail(`thread not found: ${threadId}`);
}
export type ThreadStatus = "idle" | "running" | "completed";
export type ThreadListItemWithStatus = ThreadListItem & {
status: ThreadStatus;
};
async function threadListItemFromActive(
storageRoot: string,
uwf: UwfStore,
threadId: ThreadId,
head: CasRef,
): Promise<ThreadListItem | null> {
): Promise<ThreadListItemWithStatus | null> {
const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) {
return null;
}
return { thread: threadId, workflow, head };
// Check if thread is currently running in background
const runningMarker = await isThreadRunning(storageRoot, threadId);
const status: ThreadStatus = runningMarker !== null ? "running" : "idle";
return { thread: threadId, workflow, head, status };
}
export async function cmdThreadList(
storageRoot: string,
includeAll: boolean,
): Promise<ThreadListItem[]> {
statusFilter: ThreadStatus | null,
): Promise<ThreadListItemWithStatus[]> {
const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot);
const items: ThreadListItem[] = [];
const items: ThreadListItemWithStatus[] = [];
// Add active threads
for (const [threadId, head] of Object.entries(index)) {
const item = await threadListItemFromActive(uwf, threadId as ThreadId, head);
const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, head);
if (item !== null) {
items.push(item);
}
}
if (!includeAll) {
return items;
// Add completed threads if requested
if (statusFilter === "completed" || statusFilter === null) {
const activeIds = new Set(items.map((i) => i.thread));
const history = await loadThreadHistory(storageRoot);
for (const entry of history) {
if (!activeIds.has(entry.thread)) {
items.push({
thread: entry.thread,
workflow: entry.workflow,
head: entry.head,
status: "completed",
});
}
}
}
const activeIds = new Set(items.map((i) => i.thread));
const history = await loadThreadHistory(storageRoot);
for (const entry of history) {
if (!activeIds.has(entry.thread)) {
items.push({
thread: entry.thread,
workflow: entry.workflow,
head: entry.head,
});
}
// Apply status filter if provided
if (statusFilter !== null) {
return items.filter((item) => item.status === statusFilter);
}
return items;
}
function walkChain(uwf: UwfStore, headHash: CasRef): ChainState {
const headNode = uwf.store.get(headHash);
if (headNode === null) {
fail(`CAS node not found: ${headHash}`);
}
if (headNode.type === uwf.schemas.startNode) {
return {
startHash: headHash,
start: headNode.payload as StartNodePayload,
stepsNewestFirst: [],
headIsStart: true,
};
}
if (headNode.type !== uwf.schemas.stepNode) {
fail(`head ${headHash} is not a StartNode or StepNode`);
}
const stepsNewestFirst: StepNodePayload[] = [];
let hash: CasRef | null = headHash;
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null) {
fail(`CAS node not found while walking chain: ${hash}`);
}
if (node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
stepsNewestFirst.push(payload);
hash = payload.prev;
}
const newest = stepsNewestFirst[0];
if (newest === undefined) {
fail(`empty step chain at head ${headHash}`);
}
const startNode = uwf.store.get(newest.start);
if (startNode === null || startNode.type !== uwf.schemas.startNode) {
fail(`StartNode not found: ${newest.start}`);
}
return {
startHash: newest.start,
start: startNode.payload as StartNodePayload,
stepsNewestFirst,
headIsStart: false,
};
}
function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown {
const node = uwf.store.get(outputRef);
if (node === null) {
return {};
}
return node.payload;
}
/**
* Recursively expand all cas_ref fields in a CAS node's payload,
* replacing hash strings with the referenced node's expanded payload.
*/
function expandDeep(store: CasStore, hash: CasRef, visited?: Set<string>): unknown {
const seen = visited ?? new Set<string>();
if (seen.has(hash)) return hash; // cycle guard
seen.add(hash);
const node = store.get(hash);
if (node === null) return hash;
const schema = getSchema(store, node.type);
if (schema === null) return node.payload;
return expandValue(store, schema, node.payload, seen);
}
function expandCasRefField(store: CasStore, value: unknown, visited: Set<string>): unknown {
if (typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
return value;
}
function expandAnyOfField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!Array.isArray(schema.anyOf)) return value;
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
function expandArrayField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!schema.items || !Array.isArray(value)) return value;
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
function expandObjectField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) {
return value;
}
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
function expandValue(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (schema.format === "cas_ref") return expandCasRefField(store, value, visited);
if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited);
if (schema.type === "array") return expandArrayField(store, schema, value, visited);
return expandObjectField(store, schema, value, visited);
}
function collectOrderedSteps(
uwf: UwfStore,
headHash: CasRef,
chain: ChainState,
): OrderedStepItem[] {
let hash: CasRef | null = headHash;
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null || node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
hashToNode.set(hash, { payload, timestamp: node.timestamp });
hash = payload.prev;
}
let cur: CasRef | null = chain.headIsStart ? null : headHash;
const ordered: OrderedStepItem[] = [];
while (cur !== null) {
const entry = hashToNode.get(cur);
if (entry === undefined) {
break;
}
ordered.push({ hash: cur, ...entry });
cur = entry.payload.prev;
}
ordered.reverse();
return ordered;
}
function formatYaml(value: unknown): string {
return stringify(value, { aliasDuplicateObjects: false }).trimEnd();
}
@@ -656,14 +489,14 @@ function formatStepPrompt(
): string {
if (!roleDef || shownPromptRoles.has(role)) return "";
shownPromptRoles.add(role);
return ["", "", "### Prompt", "", roleDef.goal].join("\n");
return ["", "", "<prompt>", roleDef.goal, "</prompt>"].join("\n");
}
function formatStepContent(uwf: UwfStore, item: OrderedStepItem): string {
if (!item.payload.detail) return "";
const content = extractLastAssistantContent(uwf, item.payload.detail);
if (content === null) return "";
return ["", "", "### Content", "", content].join("\n");
return ["", "", "<output>", content, "</output>"].join("\n");
}
function formatStartSection(options: {
@@ -804,13 +637,11 @@ function spawnAgent(
role: string,
edgePrompt: string,
): CasRef {
const argv = [...agent.args, threadId, role];
const env = { ...process.env, UWF_EDGE_PROMPT: edgePrompt };
const argv = [...agent.args, "--thread", threadId, "--role", role, "--prompt", edgePrompt];
let stdout: string;
try {
stdout = execFileSync(agent.command, argv, {
encoding: "utf8",
env,
stdio: ["ignore", "pipe", "pipe"],
maxBuffer: 50 * 1024 * 1024, // 50 MB — stream-json output can be large
});
@@ -850,31 +681,65 @@ async function archiveThread(
});
}
export async function cmdThreadStep(
export async function cmdThreadExec(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
count: number,
background: boolean,
backgroundWorker: boolean,
): Promise<StepOutput[]> {
if (count < 1 || !Number.isInteger(count)) {
fail(`--count must be a positive integer, got: ${count}`);
}
// Check if thread is already running in background (unless we ARE the background worker)
if (!backgroundWorker) {
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
fail(`thread already executing in background (PID: ${runningMarker.pid})`);
}
}
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
if (background && !backgroundWorker) {
// Spawn background process
return cmdThreadStepBackground(storageRoot, threadId, agentOverride, count, plog, workflowHash);
}
// If we're the background worker, create marker before execution
let markerCreated = false;
if (backgroundWorker) {
await createMarker(storageRoot, {
thread: threadId,
workflow: workflowHash,
pid: process.pid,
startedAt: Date.now(),
});
markerCreated = true;
}
try {
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
}
}
return results;
} finally {
// Cleanup marker if we created one
if (markerCreated) {
await deleteMarker(storageRoot, threadId);
}
}
return results;
}
async function resolveActiveThreadWorkflowHash(
@@ -891,6 +756,57 @@ async function resolveActiveThreadWorkflowHash(
return chain.start.workflow;
}
async function cmdThreadStepBackground(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
count: number,
plog: ProcessLogger,
workflowHash: CasRef,
): Promise<StepOutput[]> {
// Get current head to return to caller
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
// Spawn detached background process
const scriptPath = process.argv[1];
if (scriptPath === undefined) {
failStep(plog, "unable to determine script path for background execution");
}
const args = ["thread", "exec", threadId, "--count", String(count)];
if (agentOverride !== null) {
args.push("--agent", agentOverride);
}
// Internal flag to signal the background worker to create/cleanup markers
args.push("--_background-worker");
plog.log(PL_BACKGROUND_START, `spawning background process count=${count}`, null);
const child = spawn(scriptPath, args, {
detached: true,
stdio: "ignore",
});
child.unref();
// Return immediately with current state and background flag
return [
{
workflow: workflowHash,
thread: threadId,
head: headHash,
done: false,
background: true,
},
];
}
async function cmdThreadStepOnce(
storageRoot: string,
threadId: ThreadId,
@@ -928,6 +844,7 @@ async function cmdThreadStepOnce(
thread: threadId,
head: headHash,
done: true,
background: null,
};
}
@@ -975,6 +892,7 @@ async function cmdThreadStepOnce(
thread: threadId,
head: newHead,
done,
background: null,
};
}
@@ -991,47 +909,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
fail(`thread not found: ${threadId}`);
}
export async function cmdThreadSteps(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadStepsOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const startNode = uwf.store.get(chain.startHash);
if (startNode === null) {
fail(`StartNode not found: ${chain.startHash}`);
}
const startEntry: StartEntry = {
hash: chain.startHash,
workflow: chain.start.workflow,
prompt: chain.start.prompt,
timestamp: startNode.timestamp,
};
const stepEntries: StepEntry[] = [];
const ordered = collectOrderedSteps(uwf, headHash, chain);
for (const item of ordered) {
stepEntries.push({
hash: item.hash,
role: item.payload.role,
output: expandOutput(uwf, item.payload.output),
detail: item.payload.detail,
agent: item.payload.agent,
timestamp: item.timestamp,
});
}
return {
thread: threadId,
workflow: chain.start.workflow,
steps: [startEntry, ...stepEntries],
};
}
export async function cmdThreadRead(
storageRoot: string,
threadId: ThreadId,
@@ -1059,58 +936,67 @@ export async function cmdThreadRead(
});
}
export async function cmdThreadFork(
storageRoot: string,
stepHash: CasRef,
): Promise<ThreadForkOutput> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StartNode or StepNode`);
}
export type StopOutput = {
thread: ThreadId;
stopped: boolean;
};
const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
export type CancelOutput = {
thread: ThreadId;
cancelled: boolean;
};
return {
thread: newThreadId,
forkedFrom: {
step: stepHash,
},
};
}
export async function cmdThreadStepDetails(
storageRoot: string,
stepHash: CasRef,
): Promise<unknown> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.detail) {
fail(`step ${stepHash} has no detail`);
}
return expandDeep(uwf.store, payload.detail);
}
export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise<KillOutput> {
/**
* Stop background execution of a thread (but keep thread active)
*/
export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker === null) {
process.stderr.write(`Warning: thread ${threadId} is not currently running\n`);
return { thread: threadId, stopped: false };
}
try {
process.kill(runningMarker.pid, "SIGTERM");
} catch {
// Process may have already exited, ignore error
}
await deleteMarker(storageRoot, threadId);
return { thread: threadId, stopped: true };
}
/**
* Cancel a thread (stop execution + move to history)
*/
export async function cmdThreadCancel(
storageRoot: string,
threadId: ThreadId,
): Promise<CancelOutput> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
try {
process.kill(runningMarker.pid, "SIGTERM");
} catch {
// Process may have already exited, ignore error
}
await deleteMarker(storageRoot, threadId);
}
const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) {
@@ -1128,5 +1014,10 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr
};
await appendThreadHistory(storageRoot, historyEntry);
return { thread: threadId, archived: true };
return { thread: threadId, cancelled: true };
}
export async function cmdThreadRunning(storageRoot: string): Promise<RunningThreadsOutput> {
const threads = await listRunningThreads(storageRoot);
return { threads };
}
@@ -29,7 +29,7 @@ export type WorkflowListEntry = {
origin: WorkflowOrigin;
};
export type WorkflowPutOutput = {
export type WorkflowAddOutput = {
name: string;
hash: CasRef;
};
@@ -111,10 +111,10 @@ export async function materializeWorkflowPayload(
};
}
export async function cmdWorkflowPut(
export async function cmdWorkflowAdd(
storageRoot: string,
filePath: string,
): Promise<WorkflowPutOutput> {
): Promise<WorkflowAddOutput> {
let text: string;
try {
text = await readFile(filePath, "utf8");
@@ -19,7 +19,14 @@ mock.module("../src/tools/index.js", () => ({
getBuiltinTools: () => [],
}));
import { executeTurnTools, runBuiltinLoop, shouldNudge } from "../src/loop.js";
import {
executeTurnTools,
extractFinalText,
runBuiltinLoop,
shouldInjectDeadlineWarning,
shouldNudge,
shouldProcessToolCalls,
} from "../src/loop.js";
const fakeProvider = {} as any;
const fakeToolCtx = {} as any;
@@ -154,3 +161,96 @@ describe("runBuiltinLoop integration", () => {
expect(original.length).toBe(1);
});
});
describe("shouldInjectDeadlineWarning", () => {
test("5.1 returns true when turn count reaches warning threshold and not yet warned", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, false)).toBe(true);
});
test("5.2 returns false when already warned", () => {
expect(shouldInjectDeadlineWarning(7, 10, true, false)).toBe(false);
});
test("5.3 returns false when noTools is true", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, true)).toBe(false);
});
test("5.4 returns false when turns remaining > DEADLINE_WARNING_TURNS", () => {
expect(shouldInjectDeadlineWarning(5, 10, false, false)).toBe(false);
});
test("5.5 returns true when exactly at warning threshold", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, false)).toBe(true);
});
test("5.6 returns false when turns remaining is 0", () => {
expect(shouldInjectDeadlineWarning(10, 10, false, false)).toBe(false);
});
});
describe("shouldProcessToolCalls", () => {
test("6.1 returns true when toolCalls present and noTools=false", () => {
expect(shouldProcessToolCalls([{ id: "x", name: "read", arguments: "{}" }], false)).toBe(true);
});
test("6.2 returns false when toolCalls is null", () => {
expect(shouldProcessToolCalls(null, false)).toBe(false);
});
test("6.3 returns false when toolCalls is empty array", () => {
expect(shouldProcessToolCalls([], false)).toBe(false);
});
test("6.4 returns false when noTools=true", () => {
expect(shouldProcessToolCalls([{ id: "x", name: "read", arguments: "{}" }], true)).toBe(false);
});
test("6.5 returns true when multiple tool calls present", () => {
expect(
shouldProcessToolCalls(
[
{ id: "x1", name: "read", arguments: "{}" },
{ id: "x2", name: "write", arguments: "{}" },
],
false,
),
).toBe(true);
});
});
describe("extractFinalText", () => {
test("7.1 returns last assistant message content", () => {
const messages = [
{ role: "system" as const, content: "sys", tool_calls: null },
{ role: "assistant" as const, content: "first", tool_calls: null },
{ role: "assistant" as const, content: "last", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("last");
});
test("7.2 returns empty string when no assistant messages", () => {
expect(extractFinalText([{ role: "system" as const, content: "sys", tool_calls: null }])).toBe(
"",
);
});
test("7.3 skips assistant messages with null content", () => {
const messages = [
{ role: "assistant" as const, content: "first", tool_calls: null },
{
role: "assistant" as const,
content: null,
tool_calls: [{ id: "x", name: "t", arguments: "{}" }],
},
{ role: "assistant" as const, content: "second", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("second");
});
test("7.4 skips assistant messages with empty content", () => {
const messages = [
{ role: "assistant" as const, content: "first", tool_calls: null },
{ role: "assistant" as const, content: "", tool_calls: null },
{ role: "user" as const, content: "nudge", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("first");
});
test("7.5 handles empty messages array", () => {
expect(extractFinalText([])).toBe("");
});
test("7.6 handles messages with only user and system roles", () => {
const messages = [
{ role: "system" as const, content: "sys", tool_calls: null },
{ role: "user" as const, content: "query", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("");
});
});
+191 -82
View File
@@ -1,7 +1,12 @@
import type { ResolvedLlmProvider } from "@uncaged/workflow-agent-kit";
import { createLogger } from "@uncaged/workflow-util";
import { type ChatMessage, chatCompletionWithTools, type LlmToolCall } from "./llm/index.js";
import {
type ChatMessage,
chatCompletionWithTools,
type LlmToolCall,
type OpenAiToolDefinition,
} from "./llm/index.js";
import { appendSessionTurn } from "./session.js";
import {
builtinToolsToOpenAi,
@@ -80,10 +85,184 @@ export type ShouldNudgeOptions = {
const MAX_NUDGES = 3;
const DEADLINE_WARNING_TURNS = 3;
export function shouldInjectDeadlineWarning(
turn: number,
maxTurns: number,
alreadyWarned: boolean,
noTools: boolean,
): boolean {
const turnsRemaining = maxTurns - turn;
return (
!noTools && !alreadyWarned && turnsRemaining > 0 && turnsRemaining <= DEADLINE_WARNING_TURNS
);
}
export function shouldProcessToolCalls(toolCalls: LlmToolCall[] | null, noTools: boolean): boolean {
return !noTools && toolCalls !== null && toolCalls.length > 0;
}
export function extractFinalText(messages: ChatMessage[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
return msg.content;
}
}
return "";
}
function injectDeadlineWarning(messages: ChatMessage[], turnsRemaining: number): void {
log("4NRXW6KT", `${turnsRemaining} turns remaining, injecting deadline warning`);
messages.push({
role: "user",
content:
`⚠️ You have ${turnsRemaining} turns remaining. ` +
"Wrap up your work and output the YAML frontmatter starting with `---`. " +
"If you cannot finish in time, output frontmatter with `status: failed` and describe what remains.",
});
}
type HandleTextOnlyTurnResult = {
shouldBreak: boolean;
finalText: string;
turnCount: number;
nudgeCount: number;
turnAdjustment: number;
};
async function handleTextOnlyTurn(
text: string,
messages: ChatMessage[],
storageRoot: string,
sessionId: string,
noTools: boolean,
turn: number,
maxTurns: number,
currentNudgeCount: number,
): Promise<HandleTextOnlyTurnResult> {
await appendTurn(storageRoot, sessionId, {
role: "assistant",
content: text,
toolCalls: null,
reasoning: null,
});
const turnCount = 1;
let nudgeCount = currentNudgeCount;
let turnAdjustment = 0;
if (shouldNudge({ noTools, text, turn, maxTurns })) {
nudgeCount += 1;
log("7FXQM2KN", `text-only turn without frontmatter, nudge ${nudgeCount}/${MAX_NUDGES}`);
const nudge =
"You stopped calling tools but your response does not start with the required `---` YAML frontmatter. " +
"Either continue using tools to complete your work, or output your final response starting with `---`.";
messages.push({ role: "user", content: nudge });
// Nudge doesn't consume turn budget (up to MAX_NUDGES)
if (nudgeCount <= MAX_NUDGES) {
turnAdjustment = -1;
}
return { shouldBreak: false, finalText: "", turnCount, nudgeCount, turnAdjustment };
}
return { shouldBreak: true, finalText: text, turnCount, nudgeCount, turnAdjustment };
}
async function handleToolCallTurn(
content: string,
toolCalls: LlmToolCall[],
messages: ChatMessage[],
storageRoot: string,
sessionId: string,
toolCtx: ToolContext,
): Promise<number> {
await appendTurn(storageRoot, sessionId, {
role: "assistant",
content,
toolCalls: mapToolCallsForPayload(toolCalls),
reasoning: null,
});
let turnCount = 1;
// Execute tools
turnCount += await executeTurnTools(toolCalls, toolCtx, messages, storageRoot, sessionId);
return turnCount;
}
export function shouldNudge({ noTools, text, turn, maxTurns }: ShouldNudgeOptions): boolean {
return !noTools && !text.trimStart().startsWith("---") && turn < maxTurns - 1;
}
type ProcessLoopIterationResult = {
shouldBreak: boolean;
finalText: string;
turnCount: number;
nudgeCount: number;
turnAdjustment: number;
};
async function processLoopIteration(
options: RunBuiltinLoopOptions,
messages: ChatMessage[],
openAiTools: OpenAiToolDefinition[],
turn: number,
nudgeCount: number,
): Promise<ProcessLoopIterationResult> {
const response = await chatCompletionWithTools(
options.provider,
messages,
openAiTools.length > 0 ? openAiTools : null,
);
// When noTools is set, ignore any tool_calls the LLM might still return
const effectiveToolCalls = options.noTools ? null : (response.toolCalls ?? null);
const assistantMessage: ChatMessage = {
role: "assistant",
content: response.content,
tool_calls: effectiveToolCalls,
};
messages.push(assistantMessage);
if (!shouldProcessToolCalls(effectiveToolCalls, options.noTools)) {
const text = response.content ?? "";
const result = await handleTextOnlyTurn(
text,
messages,
options.storageRoot,
options.sessionId,
options.noTools,
turn,
options.maxTurns,
nudgeCount,
);
return result;
}
// At this point, effectiveToolCalls is guaranteed to be non-null and non-empty
const turnCount = await handleToolCallTurn(
response.content ?? "",
effectiveToolCalls as LlmToolCall[],
messages,
options.storageRoot,
options.sessionId,
options.toolCtx,
);
return {
shouldBreak: false,
finalText: "",
turnCount,
nudgeCount,
turnAdjustment: 0,
};
}
/** Agent run loop: LLM ↔ tools until no tool_calls or maxTurns. */
export async function runBuiltinLoop(
options: RunBuiltinLoopOptions,
@@ -99,95 +278,25 @@ export async function runBuiltinLoop(
log("8K2M4N7P", `builtin loop turn ${turn + 1}/${options.maxTurns}`);
// Warn agent when approaching turn limit
const turnsRemaining = options.maxTurns - turn;
if (!options.noTools && !deadlineWarned && turnsRemaining <= DEADLINE_WARNING_TURNS) {
if (shouldInjectDeadlineWarning(turn, options.maxTurns, deadlineWarned, options.noTools)) {
deadlineWarned = true;
log("4NRXW6KT", `${turnsRemaining} turns remaining, injecting deadline warning`);
messages.push({
role: "user",
content:
`⚠️ You have ${turnsRemaining} turns remaining. ` +
"Wrap up your work and output the YAML frontmatter starting with `---`. " +
"If you cannot finish in time, output frontmatter with `status: failed` and describe what remains.",
});
const turnsRemaining = options.maxTurns - turn;
injectDeadlineWarning(messages, turnsRemaining);
}
const response = await chatCompletionWithTools(
options.provider,
messages,
openAiTools.length > 0 ? openAiTools : null,
);
const result = await processLoopIteration(options, messages, openAiTools, turn, nudgeCount);
turnCount += result.turnCount;
nudgeCount = result.nudgeCount;
turn += result.turnAdjustment;
// When noTools is set, ignore any tool_calls the LLM might still return
const effectiveToolCalls = options.noTools ? null : (response.toolCalls ?? null);
const assistantMessage: ChatMessage = {
role: "assistant",
content: response.content,
tool_calls: effectiveToolCalls,
};
messages.push(assistantMessage);
if (effectiveToolCalls === null || effectiveToolCalls.length === 0) {
const text = response.content ?? "";
await appendTurn(options.storageRoot, options.sessionId, {
role: "assistant",
content: text,
toolCalls: null,
reasoning: null,
});
turnCount += 1;
if (shouldNudge({ noTools: options.noTools, text, turn, maxTurns: options.maxTurns })) {
nudgeCount += 1;
log("7FXQM2KN", `text-only turn without frontmatter, nudge ${nudgeCount}/${MAX_NUDGES}`);
const nudge =
"You stopped calling tools but your response does not start with the required `---` YAML frontmatter. " +
"Either continue using tools to complete your work, or output your final response starting with `---`.";
messages.push({ role: "user", content: nudge });
// Nudge doesn't consume turn budget (up to MAX_NUDGES)
if (nudgeCount <= MAX_NUDGES) {
turn -= 1;
}
continue;
}
finalText = text;
if (result.shouldBreak) {
finalText = result.finalText;
break;
}
// Assistant turn with tool calls
await appendTurn(options.storageRoot, options.sessionId, {
role: "assistant",
content: response.content ?? "",
toolCalls: mapToolCallsForPayload(effectiveToolCalls),
reasoning: null,
});
turnCount += 1;
// Execute tools
turnCount += await executeTurnTools(
effectiveToolCalls,
options.toolCtx,
messages,
options.storageRoot,
options.sessionId,
);
}
if (finalText === "" && messages.length > 0) {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
finalText = msg.content;
break;
}
}
if (finalText === "") {
finalText = extractFinalText(messages);
}
return { finalText, messages, turnCount };
@@ -154,6 +154,99 @@ describe("parseClaudeCodeStreamOutput", () => {
});
});
describe("parseClaudeCodeStreamOutput — helper extraction", () => {
test("processSystemLine sets model from system message", () => {
const lines = [
JSON.stringify({ type: "system", model: "claude-opus-4" }),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.model).toBe("claude-opus-4");
});
test("processAssistantLine skips empty content", () => {
const lines = [
JSON.stringify({ type: "assistant", message: { role: "assistant", content: [] } }),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(0);
});
test("processUserLine skips when no tool_result items", () => {
const lines = [
JSON.stringify({
type: "user",
message: { role: "user", content: [{ type: "text", text: "hi" }] },
}),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(0);
});
test("turn indices are sequential across mixed assistant and user lines", () => {
const lines = [
JSON.stringify({
type: "assistant",
message: { role: "assistant", content: [{ type: "text", text: "A" }] },
}),
JSON.stringify({
type: "user",
message: { role: "user", content: [{ type: "tool_result", content: "R" }] },
}),
JSON.stringify({
type: "assistant",
message: { role: "assistant", content: [{ type: "text", text: "B" }] },
}),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 3,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(3);
expect(parsed!.turns.map((t) => t.index)).toEqual([0, 1, 2]);
});
});
describe("storeClaudeCodeDetail", () => {
const baseParsed: ClaudeCodeParsedResult = {
type: "result",
@@ -16,7 +16,7 @@ const log = createLogger({ sink: { kind: "stderr" } });
const CLAUDE_COMMAND = "claude";
const CLAUDE_MAX_TURNS = 90;
const CLAUDE_MODEL = process.env["CLAUDE_MODEL"] ?? null;
const CLAUDE_MODEL = process.env.CLAUDE_MODEL ?? null;
function buildHistorySummary(steps: AgentContext["steps"]): string {
if (steps.length === 0) {
@@ -146,13 +146,13 @@ async function runClaudeCode(ctx: AgentContext): Promise<AgentRunResult> {
// Try resuming a cached session for re-entry scenarios (e.g. reviewer reject → developer re-entry).
if (!ctx.isFirstVisit) {
const cachedSessionId = await getCachedSessionId(ctx.threadId, ctx.role);
const cachedSessionId = await getCachedSessionId("claude-code", ctx.threadId, ctx.role);
if (cachedSessionId !== null) {
try {
const { stdout } = await spawnClaudeResume(cachedSessionId, fullPrompt);
const result = await processClaudeOutput(stdout, ctx.store);
if (result.sessionId !== undefined && result.sessionId !== "") {
await setCachedSessionId(ctx.threadId, ctx.role, result.sessionId);
await setCachedSessionId("claude-code", ctx.threadId, ctx.role, result.sessionId);
}
return result;
} catch (err) {
@@ -169,7 +169,7 @@ async function runClaudeCode(ctx: AgentContext): Promise<AgentRunResult> {
const { stdout } = await spawnClaudeRun(fullPrompt);
const result = await processClaudeOutput(stdout, ctx.store);
if (result.sessionId !== undefined && result.sessionId !== "") {
await setCachedSessionId(ctx.threadId, ctx.role, result.sessionId);
await setCachedSessionId("claude-code", ctx.threadId, ctx.role, result.sessionId);
}
return result;
}
@@ -34,7 +34,7 @@ export const CLAUDE_CODE_DETAIL_SCHEMA: JSONSchema = {
},
turns: {
type: "array",
items: { type: "string" },
items: { type: "string", format: "cas_ref" },
},
},
additionalProperties: false,
@@ -67,101 +67,105 @@ function extractToolResultContent(content: unknown[]): string {
return results.join("\n");
}
/**
* Parse Claude Code stream-json (NDJSON) output.
* Each line is a JSON object with type: "system" | "assistant" | "user" | "result".
*/
export function parseClaudeCodeStreamOutput(stdout: string): ClaudeCodeParsedResult | null {
const lines = stdout.trim().split("\n");
const turns: ClaudeCodeTurnPayload[] = [];
let resultLine: Record<string, unknown> | null = null;
let model = "";
let turnIndex = 0;
type ParseState = {
turns: ClaudeCodeTurnPayload[];
resultLine: Record<string, unknown> | null;
model: string;
turnIndex: number;
};
for (const line of lines) {
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
continue;
}
if (!isRecord(parsed)) continue;
const type = parsed.type;
if (type === "system" && typeof parsed.model === "string") {
model = parsed.model;
}
if (type === "assistant" && isRecord(parsed.message)) {
const msg = parsed.message;
const content = Array.isArray(msg.content) ? msg.content : [];
const textContent = extractTextContent(content as unknown[]);
const toolCalls = extractToolCalls(content as unknown[]);
// Only record turns that have actual content
if (textContent !== "" || toolCalls.length > 0) {
turns.push({
index: turnIndex++,
role: "assistant",
content: textContent,
toolCalls: toolCalls.length > 0 ? toolCalls : null,
});
}
}
if (type === "user" && isRecord(parsed.message)) {
const msg = parsed.message;
const content = Array.isArray(msg.content) ? msg.content : [];
const resultContent = extractToolResultContent(content as unknown[]);
if (resultContent !== "") {
turns.push({
index: turnIndex++,
role: "tool_result",
content: resultContent,
toolCalls: null,
});
}
}
if (type === "result") {
resultLine = parsed;
}
function processSystemLine(parsed: Record<string, unknown>, state: ParseState): void {
if (typeof parsed.model === "string") {
state.model = parsed.model;
}
}
if (resultLine === null) return null;
function processAssistantLine(parsed: Record<string, unknown>, state: ParseState): void {
if (!isRecord(parsed.message)) return;
const content = Array.isArray(parsed.message.content) ? parsed.message.content : [];
const textContent = extractTextContent(content as unknown[]);
const toolCalls = extractToolCalls(content as unknown[]);
if (textContent !== "" || toolCalls.length > 0) {
state.turns.push({
index: state.turnIndex++,
role: "assistant",
content: textContent,
toolCalls: toolCalls.length > 0 ? toolCalls : null,
});
}
}
const sessionId = resultLine.session_id;
const result = resultLine.result;
const subtype = resultLine.subtype;
function processUserLine(parsed: Record<string, unknown>, state: ParseState): void {
if (!isRecord(parsed.message)) return;
const content = Array.isArray(parsed.message.content) ? parsed.message.content : [];
const resultContent = extractToolResultContent(content as unknown[]);
if (resultContent !== "") {
state.turns.push({
index: state.turnIndex++,
role: "tool_result",
content: resultContent,
toolCalls: null,
});
}
}
function processLine(line: string, state: ParseState): void {
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
if (!isRecord(parsed)) return;
const type = parsed.type;
if (type === "system") processSystemLine(parsed, state);
else if (type === "assistant") processAssistantLine(parsed, state);
else if (type === "user") processUserLine(parsed, state);
else if (type === "result") state.resultLine = parsed;
}
function assembleResult(state: ParseState): ClaudeCodeParsedResult | null {
if (state.resultLine === null) return null;
const sessionId = state.resultLine.session_id;
const result = state.resultLine.result;
const subtype = state.resultLine.subtype;
if (typeof sessionId !== "string" || typeof result !== "string" || typeof subtype !== "string") {
return null;
}
const usage = isRecord(resultLine.usage) ? resultLine.usage : {};
const usage = isRecord(state.resultLine.usage) ? state.resultLine.usage : {};
return {
type: safeString(resultLine.type, "result"),
type: safeString(state.resultLine.type, "result"),
subtype: subtype as ClaudeCodeParsedResult["subtype"],
result,
sessionId,
numTurns: safeNumber(resultLine.num_turns),
totalCostUsd: safeNumber(resultLine.total_cost_usd),
durationMs: safeNumber(resultLine.duration_ms),
model,
stopReason: safeString(resultLine.stop_reason),
numTurns: safeNumber(state.resultLine.num_turns),
totalCostUsd: safeNumber(state.resultLine.total_cost_usd),
durationMs: safeNumber(state.resultLine.duration_ms),
model: state.model,
stopReason: safeString(state.resultLine.stop_reason),
usage: {
inputTokens: safeNumber(usage.input_tokens),
outputTokens: safeNumber(usage.output_tokens),
cacheReadInputTokens: safeNumber(usage.cache_read_input_tokens),
cacheCreationInputTokens: safeNumber(usage.cache_creation_input_tokens),
},
turns,
turns: state.turns,
};
}
/**
* Parse Claude Code stream-json (NDJSON) output.
* Each line is a JSON object with type: "system" | "assistant" | "user" | "result".
*/
export function parseClaudeCodeStreamOutput(stdout: string): ClaudeCodeParsedResult | null {
const lines = stdout.trim().split("\n");
const state: ParseState = { turns: [], resultLine: null, model: "", turnIndex: 0 };
for (const line of lines) {
processLine(line, state);
}
return assembleResult(state);
}
/**
* Legacy: parse Claude Code plain JSON output (non-streaming).
* Falls back when stream-json is not available.
@@ -4,6 +4,96 @@ import { HermesAcpClient } from "../src/acp-client.js";
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
describe("handleSessionUpdate — helper extraction", () => {
let client: HermesAcpClient;
beforeEach(() => {
client = new HermesAcpClient();
});
afterEach(async () => {
await client.close();
});
it("agent_message_chunk accumulates text in messageChunks", () => {
(client as any).handleSessionUpdate({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello" },
});
(client as any).handleSessionUpdate({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " world" },
});
expect((client as any).messageChunks).toEqual(["hello", " world"]);
});
it("agent_thought_chunk accumulates reasoning in reasoningChunks", () => {
(client as any).handleSessionUpdate({
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking" },
});
expect((client as any).reasoningChunks).toEqual(["thinking"]);
});
it("tool_call registers a pending tool and flushes message chunks", () => {
(client as any).messageChunks = ["pre-tool text"];
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call",
title: "Bash",
rawInput: { command: "ls" },
toolCallId: "tc-1",
});
expect((client as any).pendingTools.get("tc-1")).toEqual({
name: "Bash",
args: JSON.stringify({ command: "ls" }),
});
expect((client as any).messageChunks).toEqual([]);
expect((client as any).messages).toHaveLength(1);
expect((client as any).messages[0].role).toBe("assistant");
});
it("tool_call_update completed pushes tool_call and tool messages", () => {
(client as any).pendingTools.set("tc-2", { name: "Read", args: '{"path":"/foo"}' });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-2",
rawOutput: "file contents",
});
const msgs = (client as any).messages as Array<{
role: string;
tool_calls: unknown;
content: string | null;
}>;
expect(msgs).toHaveLength(2);
expect(msgs[0].role).toBe("assistant");
expect(msgs[0].tool_calls).toEqual([
{ function: { name: "Read", arguments: '{"path":"/foo"}' } },
]);
expect(msgs[1].role).toBe("tool");
expect(msgs[1].content).toBe("file contents");
expect((client as any).pendingTools.has("tc-2")).toBe(false);
});
it("tool_call_update with non-string rawOutput JSON-stringifies it", () => {
(client as any).pendingTools.set("tc-3", { name: "Fetch", args: "" });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-3",
rawOutput: { html: "<p>page</p>" },
});
const msgs = (client as any).messages as Array<{ role: string; content: string | null }>;
expect(msgs[1].content).toBe(JSON.stringify({ html: "<p>page</p>" }));
});
it("unknown updateType is a no-op", () => {
(client as any).handleSessionUpdate({ sessionUpdate: "unknown_type", data: {} });
expect((client as any).messages).toHaveLength(0);
expect((client as any).messageChunks).toHaveLength(0);
});
});
describe("HermesAcpClient", () => {
let client: HermesAcpClient;
@@ -245,72 +245,75 @@ export class HermesAcpClient {
// ---- Session update → structured messages ----
private handleSessionUpdate(update: Record<string, unknown>): void {
const updateType = update.sessionUpdate as string;
switch (updateType) {
case "agent_message_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
switch (update.sessionUpdate as string) {
case "agent_message_chunk":
this.handleAgentMessageChunk(update);
break;
}
case "agent_thought_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
case "agent_thought_chunk":
this.handleAgentThoughtChunk(update);
break;
}
case "tool_call": {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args = rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
// Flush accumulated assistant text before tool call
this.flushAssistantMessage();
case "tool_call":
this.handleToolCall(update);
break;
}
case "tool_call_update": {
const status = update.status as string | undefined;
if (status === "completed" || status === "failed") {
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
case "tool_call_update":
this.handleToolCallUpdate(update);
break;
}
default:
break;
}
}
private handleAgentMessageChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
}
private handleAgentThoughtChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
}
private handleToolCall(update: Record<string, unknown>): void {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args = rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
this.flushAssistantMessage();
}
private handleToolCallUpdate(update: Record<string, unknown>): void {
const status = update.status as string | undefined;
if (status !== "completed" && status !== "failed") return;
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
/** Flush any accumulated text/reasoning into an assistant message. */
private flushAssistantMessage(): void {
const text = this.messageChunks.join("");
@@ -1,5 +1,22 @@
// Re-export session cache from the shared agent-kit package.
export { getCachedSessionId, setCachedSessionId } from "@uncaged/workflow-agent-kit";
// Re-export session cache from the shared agent-kit package with agent name injected.
import {
getCachedSessionId as getCachedSessionIdBase,
setCachedSessionId as setCachedSessionIdBase,
} from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
return getCachedSessionIdBase("hermes", threadId, role);
}
export async function setCachedSessionId(
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
return setCachedSessionIdBase("hermes", threadId, role, sessionId);
}
export function isResumeDisabled(): boolean {
// Hermes ACP session/resume is broken: _restore fails for custom providers
@@ -0,0 +1,247 @@
import { mkdir, readdir, readFile, rm, stat, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { getCachedSessionId, getCachePath, setCachedSessionId } from "../src/session-cache.js";
import { resolveStorageRoot } from "../src/storage.js";
describe("session-cache", () => {
let originalStorageRoot: string;
let testStorageRoot: string;
beforeEach(async () => {
// Create a temporary test storage root
originalStorageRoot = resolveStorageRoot();
testStorageRoot = join(originalStorageRoot, "test-cache", `test-${Date.now()}`);
await mkdir(testStorageRoot, { recursive: true });
// Override the storage root for testing
process.env.WORKFLOW_STORAGE_ROOT = testStorageRoot;
});
afterEach(async () => {
// Clean up test storage root
await rm(testStorageRoot, { recursive: true, force: true });
delete process.env.WORKFLOW_STORAGE_ROOT;
});
describe("getCachePath", () => {
test("returns agent-specific file path", () => {
const path = getCachePath("claude-code");
expect(path).toMatch(/\/cache\/claude-code-sessions\.json$/);
});
test("returns different paths for different agents", () => {
const pathClaudeCode = getCachePath("claude-code");
const pathHermes = getCachePath("hermes");
expect(pathClaudeCode).not.toBe(pathHermes);
expect(pathClaudeCode).toMatch(/claude-code-sessions\.json$/);
expect(pathHermes).toMatch(/hermes-sessions\.json$/);
});
test("handles agent names with special characters", () => {
const path1 = getCachePath("my-agent");
const path2 = getCachePath("my_agent");
expect(path1).toMatch(/my-agent-sessions\.json$/);
expect(path2).toMatch(/my_agent-sessions\.json$/);
});
});
describe("session isolation", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("sessions are isolated per agent", async () => {
// Cache different session IDs for each agent
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Each agent should retrieve its own session ID
const sessionCC = await getCachedSessionId("claude-code", threadId, role);
const sessionHermes = await getCachedSessionId("hermes", threadId, role);
expect(sessionCC).toBe("session-cc-001");
expect(sessionHermes).toBe("session-hermes-001");
});
test("updating one agent's cache does not affect another", async () => {
// Set initial sessions for both agents
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Update claude-code's session
await setCachedSessionId("claude-code", threadId, role, "session-cc-002");
// Hermes's session should remain unchanged
const sessionHermes = await getCachedSessionId("hermes", threadId, role);
expect(sessionHermes).toBe("session-hermes-001");
// Claude-code should have the new session
const sessionCC = await getCachedSessionId("claude-code", threadId, role);
expect(sessionCC).toBe("session-cc-002");
});
test("missing session returns null for specific agent", async () => {
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("empty session ID is treated as missing", async () => {
await setCachedSessionId("claude-code", threadId, role, "");
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
});
describe("file system operations", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("cache directory is created if missing", async () => {
const cachePath = getCachePath("claude-code");
const cacheDir = dirname(cachePath);
// Ensure cache dir doesn't exist
await rm(cacheDir, { recursive: true, force: true });
// Write a session
await setCachedSessionId("claude-code", threadId, role, "session-001");
// Cache directory should be created
const stats = await stat(cacheDir);
expect(stats.isDirectory()).toBe(true);
});
test("multiple agents create separate cache files", async () => {
// Cache sessions for multiple agents
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Separate cache files should exist
const pathCC = getCachePath("claude-code");
const pathHermes = getCachePath("hermes");
const contentCC = JSON.parse(await readFile(pathCC, "utf8")) as Record<string, string>;
const contentHermes = JSON.parse(await readFile(pathHermes, "utf8")) as Record<
string,
string
>;
expect(contentCC).toHaveProperty(`${threadId}:${role}`, "session-cc-001");
expect(contentHermes).toHaveProperty(`${threadId}:${role}`, "session-hermes-001");
});
test("atomic writes prevent partial reads", async () => {
// Write a session
await setCachedSessionId("claude-code", threadId, role, "session-001");
// The final file should exist (no .tmp files left behind)
const cachePath = getCachePath("claude-code");
const dir = dirname(cachePath);
const files = await readdir(dir);
expect(files).toContain("claude-code-sessions.json");
expect(files.every((f) => !f.endsWith(".tmp"))).toBe(true);
});
});
describe("legacy migration", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("old agent-sessions.json is ignored", async () => {
// Create old agent-sessions.json file
const oldCachePath = join(resolveStorageRoot(), "cache", "agent-sessions.json");
await mkdir(dirname(oldCachePath), { recursive: true });
await writeFile(
oldCachePath,
JSON.stringify({
"01234567890123456789012345:developer": "old-session-001",
}),
"utf8",
);
// Query with the new per-agent cache
const session = await getCachedSessionId("claude-code", threadId, role);
// Should return null (old cache is ignored)
expect(session).toBeNull();
});
test("new per-agent cache takes precedence", async () => {
// Create both old and new cache files
const oldPath = join(resolveStorageRoot(), "cache", "agent-sessions.json");
await mkdir(dirname(oldPath), { recursive: true });
await writeFile(
oldPath,
JSON.stringify({
[`${threadId}:${role}`]: "old-session",
}),
"utf8",
);
await setCachedSessionId("claude-code", threadId, role, "new-session");
// The new per-agent cache value should be returned
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBe("new-session");
});
});
describe("error handling", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("invalid JSON in cache file returns empty cache", async () => {
// Create a corrupted cache file
const cachePath = getCachePath("claude-code");
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, "{ invalid json }", "utf8");
// Should return null (treating corrupted cache as empty)
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("non-object JSON in cache file returns empty cache", async () => {
// Create a cache file with non-object JSON
const cachePath = getCachePath("claude-code");
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, JSON.stringify(["not", "an", "object"]), "utf8");
// Should return null
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("cache entries with non-string values are ignored", async () => {
// Create a cache file with mixed types
const cachePath = getCachePath("claude-code");
const cacheData = {
"thread1:role1": "valid-session",
"thread2:role2": 12345, // number
"thread3:role3": null, // null
"thread4:role4": "", // empty string
};
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, JSON.stringify(cacheData), "utf8");
// Valid string entries should be returned
const session1 = await getCachedSessionId("claude-code", "thread1" as ThreadId, "role1");
expect(session1).toBe("valid-session");
// Invalid entries should return null
const session2 = await getCachedSessionId("claude-code", "thread2" as ThreadId, "role2");
const session3 = await getCachedSessionId("claude-code", "thread3" as ThreadId, "role3");
const session4 = await getCachedSessionId("claude-code", "thread4" as ThreadId, "role4");
expect(session2).toBeNull();
expect(session3).toBeNull();
expect(session4).toBeNull(); // empty string is treated as missing
});
});
});
+6 -11
View File
@@ -21,14 +21,6 @@ function fail(message: string): never {
throw new Error(message);
}
function readEdgePrompt(): string {
const value = process.env.UWF_EDGE_PROMPT;
if (value === undefined || value === "") {
fail("UWF_EDGE_PROMPT environment variable is required");
}
return value;
}
function walkChain(store: Store, schemas: AgentStore["schemas"], headHash: CasRef): ChainState {
const headNode = store.get(headHash);
if (headNode === null) {
@@ -123,7 +115,11 @@ async function loadWorkflow(store: Store, schemas: AgentStore["schemas"], workfl
* Build agent execution context from thread head in threads.yaml.
* Walks the CAS chain from head to StartNode and expands step outputs.
*/
export async function buildContext(threadId: ThreadId, role: string): Promise<AgentContext> {
export async function buildContext(
threadId: ThreadId,
role: string,
edgePrompt: string,
): Promise<AgentContext> {
const storageRoot = resolveStorageRoot();
const agentStore = await createAgentStore(storageRoot);
const { store, schemas } = agentStore;
@@ -142,7 +138,6 @@ export async function buildContext(threadId: ThreadId, role: string): Promise<Ag
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
@@ -172,6 +167,7 @@ export type BuildContextMeta = {
export async function buildContextWithMeta(
threadId: ThreadId,
role: string,
edgePrompt: string,
): Promise<AgentContext & { meta: BuildContextMeta }> {
const storageRoot = resolveStorageRoot();
const agentStore = await createAgentStore(storageRoot);
@@ -191,7 +187,6 @@ export async function buildContextWithMeta(
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
+1 -1
View File
@@ -12,7 +12,7 @@ export {
export type { FrontmatterFastPathResult } from "./frontmatter.js";
export { tryFrontmatterFastPath } from "./frontmatter.js";
export { createAgent } from "./run.js";
export { getCachedSessionId, setCachedSessionId } from "./session-cache.js";
export { getCachedSessionId, getCachePath, setCachedSessionId } from "./session-cache.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type {
AgentContext,
+19 -11
View File
@@ -22,16 +22,24 @@ function agentLabel(name: string): string {
return `uwf-${name}`;
}
function parseArgv(argv: string[]): { threadId: ThreadId; role: string } {
const threadId = argv[2];
const role = argv[3];
if (threadId === undefined || threadId === "") {
fail("usage: <agent-cli> <thread-id> <role>");
const USAGE = "usage: <agent-cli> --thread <id> --role <role> --prompt <text>";
function getNamedArg(argv: string[], name: string): string {
const idx = argv.indexOf(name);
if (idx === -1 || idx + 1 >= argv.length) {
return "";
}
if (role === undefined || role === "") {
fail("usage: <agent-cli> <thread-id> <role>");
}
return { threadId: threadId as ThreadId, role };
return argv[idx + 1];
}
function parseArgv(argv: string[]): { threadId: ThreadId; role: string; prompt: string } {
const threadId = getNamedArg(argv, "--thread");
const role = getNamedArg(argv, "--role");
const prompt = getNamedArg(argv, "--prompt");
if (threadId === "") fail(USAGE);
if (role === "") fail(USAGE);
if (prompt === "") fail(USAGE);
return { threadId: threadId as ThreadId, role, prompt };
}
function runWithMessage<T>(label: string, fn: () => Promise<T>): Promise<T> {
@@ -103,11 +111,11 @@ async function persistStep(options: {
export function createAgent(options: AgentOptions): () => Promise<void> {
return async function main(): Promise<void> {
const { threadId, role } = parseArgv(process.argv);
const { threadId, role, prompt } = parseArgv(process.argv);
const storageRoot = resolveStorageRoot();
loadDotenv({ path: getEnvPath(storageRoot) });
const ctx = await runWithMessage("context", () => buildContextWithMeta(threadId, role));
const ctx = await runWithMessage("context", () => buildContextWithMeta(threadId, role, prompt));
const roleDef = ctx.workflow.roles[role];
if (roleDef === undefined) {
@@ -8,8 +8,8 @@ import { resolveStorageRoot } from "./storage.js";
type SessionCache = Record<string, string>;
function getCachePath(): string {
return join(resolveStorageRoot(), "cache", "agent-sessions.json");
export function getCachePath(agentName: string): string {
return join(resolveStorageRoot(), "cache", `${agentName}-sessions.json`);
}
function cacheKey(threadId: ThreadId, role: string): string {
@@ -20,8 +20,8 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
async function readCache(): Promise<SessionCache> {
const path = getCachePath();
async function readCache(agentName: string): Promise<SessionCache> {
const path = getCachePath(agentName);
try {
const text = await readFile(path, "utf8");
const raw = JSON.parse(text) as unknown;
@@ -40,36 +40,45 @@ async function readCache(): Promise<SessionCache> {
if (err.code === "ENOENT") {
return {};
}
// Treat JSON parse errors as empty cache
if (err.name === "SyntaxError") {
return {};
}
throw e;
}
}
async function writeCache(cache: SessionCache): Promise<void> {
const path = getCachePath();
async function writeCache(agentName: string, cache: SessionCache): Promise<void> {
const path = getCachePath(agentName);
const dir = dirname(path);
await mkdir(dir, { recursive: true });
// Atomic write: write to temp file then rename to avoid partial reads on concurrent access.
// NOTE: Current workflow execution is serial (execFileSync), so true concurrency doesn't occur.
// This is a safety net for future parallel execution.
const tmpPath = join(dir, `.agent-sessions.${randomBytes(4).toString("hex")}.tmp`);
const tmpPath = join(dir, `.${agentName}-sessions.${randomBytes(4).toString("hex")}.tmp`);
await writeFile(tmpPath, `${JSON.stringify(cache, null, 2)}\n`, "utf8");
await rename(tmpPath, path);
}
/** Read the cached session ID for a thread+role pair. */
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
const cache = await readCache();
export async function getCachedSessionId(
agentName: string,
threadId: ThreadId,
role: string,
): Promise<string | null> {
const cache = await readCache(agentName);
const sessionId = cache[cacheKey(threadId, role)];
return sessionId ?? null;
}
/** Write the session ID for a thread+role pair into the cache. */
export async function setCachedSessionId(
agentName: string,
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
const cache = await readCache();
const cache = await readCache(agentName);
cache[cacheKey(threadId, role)] = sessionId;
await writeCache(cache);
await writeCache(agentName, cache);
}
+1 -1
View File
@@ -13,7 +13,7 @@ export type AgentContext = ModeratorContext & {
*/
outputFormatInstruction: string;
/**
* Edge prompt from the graph transition that led to this role (UWF_EDGE_PROMPT).
* Edge prompt from the graph transition that led to this role (--prompt CLI arg).
* Always the real moderator instruction for this step.
*/
edgePrompt: string;
+3 -1
View File
@@ -31,8 +31,10 @@
"@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3",
"@vitejs/plugin-react": "^6.0.2",
"@vitest/ui": "^4.1.7",
"tailwindcss": "^4.2.4",
"typescript": "^5.8.3",
"vite": "^8.0.13"
"vite": "^8.0.13",
"vitest": "^4.1.7"
}
}
@@ -0,0 +1,83 @@
import type { Edge, Node } from "@xyflow/react";
import { describe, expect, it } from "vitest";
import { LayoutLR } from "../index.js";
function makeNode(id: string): Node {
return { id, type: "role", data: {}, position: { x: 0, y: 0 } } as Node;
}
function makeEdge(source: string, target: string): Edge {
return { id: `${source}-${target}`, source, target } as Edge;
}
describe("LayoutLR / assignLayers", () => {
it("1.1 Empty graph: start gets layer 0, end gets higher layer", () => {
const nodes = [makeNode("start"), makeNode("end")];
const result = LayoutLR(nodes, []);
const start = result.find((n) => n.id === "start");
const end = result.find((n) => n.id === "end");
// start has no position change necessarily, but positions should be assigned
expect(start).toBeDefined();
expect(end).toBeDefined();
// end should be to the right of start
expect((end?.position.x ?? 0) > (start?.position.x ?? 0)).toBe(true);
});
it("1.2 Linear chain: start → A → B → end — layers assigned in order", () => {
const nodes = [makeNode("start"), makeNode("A"), makeNode("B"), makeNode("end")];
const edges = [makeEdge("start", "A"), makeEdge("A", "B"), makeEdge("B", "end")];
const result = LayoutLR(nodes, edges);
const xOf = (id: string) => result.find((n) => n.id === id)?.position.x ?? 0;
expect(xOf("start") < xOf("A")).toBe(true);
expect(xOf("A") < xOf("B")).toBe(true);
expect(xOf("B") < xOf("end")).toBe(true);
});
it("1.3 Diamond: A and B share same layer", () => {
const nodes = [makeNode("start"), makeNode("A"), makeNode("B"), makeNode("C"), makeNode("end")];
const edges = [
makeEdge("start", "A"),
makeEdge("start", "B"),
makeEdge("A", "C"),
makeEdge("B", "C"),
makeEdge("C", "end"),
];
const result = LayoutLR(nodes, edges);
const xOf = (id: string) => result.find((n) => n.id === id)?.position.x ?? 0;
expect(xOf("A")).toBe(xOf("B")); // same layer
expect(xOf("A") < xOf("C")).toBe(true);
expect(xOf("C") < xOf("end")).toBe(true);
});
it("1.4 Isolated node placed in middle layer (not layer 0, not end layer)", () => {
const nodes = [makeNode("start"), makeNode("A"), makeNode("isolated"), makeNode("end")];
const edges = [makeEdge("start", "A"), makeEdge("A", "end")];
const result = LayoutLR(nodes, edges);
const xOf = (id: string) => result.find((n) => n.id === id)?.position.x ?? 0;
const xIsolated = xOf("isolated");
expect(xIsolated > xOf("start")).toBe(true);
expect(xIsolated < xOf("end")).toBe(true);
});
it("1.5 end node is always last (highest x)", () => {
const nodes = [makeNode("start"), makeNode("A"), makeNode("B"), makeNode("end")];
const edges = [makeEdge("start", "A"), makeEdge("A", "B"), makeEdge("B", "end")];
const result = LayoutLR(nodes, edges);
const endX = result.find((n) => n.id === "end")?.position.x ?? 0;
for (const node of result) {
if (node.id !== "end") {
expect(node.position.x < endX).toBe(true);
}
}
});
it("1.6 start node is always first (x = 0 or smallest x)", () => {
const nodes = [makeNode("start"), makeNode("A"), makeNode("end")];
const edges = [makeEdge("start", "A"), makeEdge("A", "end")];
const result = LayoutLR(nodes, edges);
const startX = result.find((n) => n.id === "start")?.position.x ?? 0;
for (const node of result) {
expect(node.position.x >= startX).toBe(true);
}
});
});
@@ -43,6 +43,65 @@ function buildGraph(nodes: Node[], edges: Edge[]) {
return { outgoing, incoming, inDegree };
}
function processTarget(
target: string,
newLayer: number,
layers: Map<string, number>,
inDegree: Map<string, number>,
queue: string[],
): void {
const existingLayer = layers.get(target);
if (existingLayer === undefined) {
layers.set(target, newLayer);
inDegree.set(target, (inDegree.get(target) ?? 1) - 1);
if (inDegree.get(target) === 0) queue.push(target);
} else {
layers.set(target, Math.max(existingLayer, newLayer));
}
}
/**
* BFS 分层(排除 end 节点,稍后单独处理)
*/
function bfsLayers(
outgoing: Map<string, string[]>,
inDegree: Map<string, number>,
layers: Map<string, number>,
): void {
const queue: string[] = ["start"];
while (queue.length > 0) {
const current = queue.shift() ?? "";
const currentLayer = layers.get(current) ?? 0;
for (const target of outgoing.get(current) ?? []) {
if (target === "end") continue;
processTarget(target, currentLayer + 1, layers, inDegree, queue);
}
}
}
/**
* 处理孤立节点(没有被分配层级的非 start/end 节点),放在中间层
*/
function placeIsolatedNodes(nodes: Node[], layers: Map<string, number>, maxLayer: number): void {
const middleLayer = Math.max(1, Math.floor((maxLayer + 1) / 2));
for (const node of nodes) {
if (node.id !== "start" && node.id !== "end" && !layers.has(node.id)) {
layers.set(node.id, middleLayer);
}
}
}
/**
* 计算最大层级(排除 end 节点)
*/
function maxLayerExcludingEnd(layers: Map<string, number>): number {
let max = 0;
for (const [id, layer] of layers) {
if (id !== "end") max = Math.max(max, layer);
}
return max;
}
/**
* 使用拓扑排序将节点分层
* - 'start' 节点固定在第 0 层
@@ -52,62 +111,15 @@ function buildGraph(nodes: Node[], edges: Edge[]) {
function assignLayers(nodes: Node[], edges: Edge[]): Map<string, number> {
const { outgoing, inDegree } = buildGraph(nodes, edges);
const layers = new Map<string, number>();
const queue: string[] = [];
// 1. start 节点固定在第 0 层
layers.set("start", 0);
queue.push("start");
bfsLayers(outgoing, inDegree, layers);
// 2. BFS 分层(排除 end 节点,稍后单独处理)
while (queue.length > 0) {
const current = queue.shift() ?? "";
const currentLayer = layers.get(current) ?? 0;
const afterBfsMax = maxLayerExcludingEnd(layers);
placeIsolatedNodes(nodes, layers, afterBfsMax);
for (const target of outgoing.get(current) ?? []) {
// 跳过 end 节点,稍后处理
if (target === "end") continue;
const newLayer = currentLayer + 1;
const existingLayer = layers.get(target);
if (existingLayer === undefined) {
layers.set(target, newLayer);
inDegree.set(target, (inDegree.get(target) ?? 1) - 1);
if (inDegree.get(target) === 0) {
queue.push(target);
}
} else {
// 如果已有层级,取更大的值(确保所有前驱都在前面)
layers.set(target, Math.max(existingLayer, newLayer));
}
}
}
// 3. 找到当前最大层级
let maxLayer = 0;
for (const layer of layers.values()) {
maxLayer = Math.max(maxLayer, layer);
}
// 4. 处理孤立节点(没有被分配层级的非 start/end 节点)
// 把它们放在中间层
const middleLayer = Math.max(1, Math.floor((maxLayer + 1) / 2));
for (const node of nodes) {
if (node.id !== "start" && node.id !== "end" && !layers.has(node.id)) {
layers.set(node.id, middleLayer);
}
}
// 5. 重新计算最大层级(可能因为孤立节点而变化)
maxLayer = 0;
for (const [id, layer] of layers) {
if (id !== "end") {
maxLayer = Math.max(maxLayer, layer);
}
}
// 6. end 节点固定在最后一层
layers.set("end", maxLayer + 1);
const finalMax = maxLayerExcludingEnd(layers);
layers.set("end", finalMax + 1);
return layers;
}
@@ -30,23 +30,24 @@ export const handlers = define.memoize((use, model) => {
});
};
function isProtectedNode(node: AnyWorkNode): boolean {
return node.type === "start" || node.type === "end";
}
function isFirstConditionalSibling(
edge: { id: string; source: string; type: string | null },
allEdges: { id: string; source: string; type: string | null }[],
): boolean {
if (edge.type !== "conditional") return false;
const siblings = allEdges.filter((e) => e.source === edge.source && e.type === "conditional");
return siblings.length >= 2 && siblings[0].id === edge.id;
}
const onBeforeDelete: OnBeforeDelete<AnyWorkNode> = async ({ nodes, edges }) => {
for (const node of nodes) {
if (node.type === "start" || node.type === "end") {
return false;
}
}
if (nodes.some(isProtectedNode)) return false;
if (edges.length > 0) {
const allEdges = use(edgesModel)[0];
for (const edge of edges) {
if (edge.type !== "conditional") continue;
const siblings = allEdges.filter(
(e) => e.source === edge.source && e.type === "conditional",
);
if (siblings.length >= 2 && siblings[0].id === edge.id) {
return false;
}
}
if (edges.some((e) => isFirstConditionalSibling(e, allEdges))) return false;
}
model.startTransaction();
return true;
@@ -96,25 +97,28 @@ export const handlers = define.memoize((use, model) => {
use(editNodeViewModel)[1].cancel();
}
function handleEscape() {
const [addView, addViewActions] = use(addNodeViewModel);
const [editView, editViewActions] = use(editNodeViewModel);
if (addView) addViewActions.cancel();
if (editView) editViewActions.cancel();
}
function handleUndoRedo(event: React.KeyboardEvent<HTMLDivElement>) {
if (event.code === "KeyZ" && (event.ctrlKey || event.metaKey)) {
if (event.shiftKey) model.redo();
else model.undo();
} else if (event.code === "KeyY" && (event.ctrlKey || event.metaKey)) {
model.redo();
}
}
function handleKeyDown(event: React.KeyboardEvent<HTMLDivElement>) {
if (event.code === "Escape") {
const [addView, addViewActions] = use(addNodeViewModel);
const [editView, editViewActions] = use(editNodeViewModel);
if (addView) addViewActions.cancel();
if (editView) editViewActions.cancel();
handleEscape();
return;
}
if (event.code === "KeyZ") {
if (event.ctrlKey || event.metaKey) {
if (event.shiftKey) model.redo();
else model.undo();
}
} else if (event.code === "KeyY") {
if (event.ctrlKey || event.metaKey) {
model.redo();
}
}
handleUndoRedo(event);
}
function loadSteps(steps: WorkFlowSteps) {
@@ -10,16 +10,15 @@ import {
import { Input } from "../../components/ui/input.tsx";
import { Label } from "../../components/ui/label.tsx";
import { Textarea } from "../../components/ui/textarea.tsx";
import { type AddNodeState, addNodeViewModel } from "../model/index.ts";
import { addNodeViewModel } from "../model/index.ts";
import type { RoleNodeData } from "../type.ts";
type FormProps = {
state: AddNodeState;
onSubmit: (params: { data: RoleNodeData }) => void;
onCancel: () => void;
};
function Form({ state, onSubmit, onCancel }: FormProps): ReactNode {
function Form({ onSubmit, onCancel }: FormProps): ReactNode {
const [name, setName] = useState("新角色");
const [description, setDescription] = useState("");
const [identity, setIdentity] = useState("");
@@ -137,7 +136,7 @@ export function AddNodeDialog(): ReactNode {
}}
>
<DialogContent showCloseButton={false} className="sm:max-w-md">
{state && <Form state={state} onSubmit={commit} onCancel={cancel} />}
{state && <Form onSubmit={commit} onCancel={cancel} />}
</DialogContent>
</Dialog>
);
@@ -0,0 +1,109 @@
import { describe, expect, it } from "vitest";
import { transIn } from "../trans-in.js";
import type { WorkFlowStep } from "../type.js";
function makeStep(name: string, transitions: WorkFlowStep["transitions"]): WorkFlowStep {
return {
role: {
name,
description: "",
identity: "",
prepare: "",
execute: "",
report: "",
},
transitions,
};
}
describe("transIn", () => {
it("4.1 Empty steps → start + end nodes, no edges", () => {
const { nodes, edges } = transIn([]);
expect(nodes).toHaveLength(2);
expect(nodes.find((n) => n.id === "start")).toBeDefined();
expect(nodes.find((n) => n.id === "end")).toBeDefined();
expect(edges).toHaveLength(0);
});
it("4.2 Single step with no END transition → start→role edge exists", () => {
const steps = [makeStep("A", [])];
const { nodes, edges } = transIn(steps);
expect(nodes).toHaveLength(3); // start, end, role-A
const startEdge = edges.find((e) => e.source === "start");
expect(startEdge).toBeDefined();
const roleNode = nodes.find((n) => n.type === "role");
expect(startEdge?.target).toBe(roleNode?.id);
});
it("4.3 Single step with END transition → edge to end node exists", () => {
const steps = [makeStep("A", [{ condition: null, target: "END" }])];
const { edges } = transIn(steps);
const endEdge = edges.find((e) => e.target === "end");
expect(endEdge).toBeDefined();
});
it("4.4 Two steps with default transitions chain", () => {
const steps = [
makeStep("A", [{ condition: null, target: "B" }]),
makeStep("B", [{ condition: null, target: "END" }]),
];
const { edges } = transIn(steps);
// Should have start→A, A→B, B→end
expect(edges.find((e) => e.source === "start")).toBeDefined();
const nodeAId = edges.find((e) => e.source === "start")?.target;
expect(edges.find((e) => e.source === nodeAId && e.target !== "end")).toBeDefined();
expect(edges.find((e) => e.target === "end")).toBeDefined();
// No conditional edges
expect(edges.every((e) => e.type !== "conditional")).toBe(true);
});
it("4.5 Step with multiple transitions → conditional edges", () => {
const steps = [
makeStep("A", [
{ condition: null, target: "B" },
{ condition: "x>0", target: "C" },
]),
makeStep("B", []),
makeStep("C", []),
];
const { edges } = transIn(steps);
const nodeAId = edges.find((e) => e.source === "start")?.target;
const outEdges = edges.filter((e) => e.source === nodeAId);
expect(outEdges.every((e) => e.type === "conditional")).toBe(true);
// else-branch has empty condition
const elseEdge = outEdges.find(
(e) => (e as { data?: { condition?: string } }).data?.condition === "",
);
expect(elseEdge).toBeDefined();
// if-branch has condition
const ifEdge = outEdges.find(
(e) => (e as { data?: { condition?: string } }).data?.condition === "x>0",
);
expect(ifEdge).toBeDefined();
});
it("4.6 With 1 incoming edge: targetHandle = 'input'; with 2: first gets 'input'", () => {
const steps = [
makeStep("A", [{ condition: null, target: "END" }]),
makeStep("B", [{ condition: null, target: "END" }]),
];
const { edges } = transIn(steps);
// start→A and start→B; end has 2 incoming edges
const incomingToEnd = edges.filter((e) => e.target === "end");
expect(incomingToEnd[0].targetHandle).toBe("input");
});
it("4.7 Same role name maps to same node id across steps", () => {
const steps = [
makeStep("A", [{ condition: null, target: "B" }]),
makeStep("B", [{ condition: null, target: "A" }]),
];
const { edges } = transIn(steps);
const aId = edges.find((e) => e.source === "start")?.target;
// B→A edge target should be same node as start→A edge target
const bToAEdge = edges.find(
(e) => e.source !== "start" && e.target === aId && e.target !== "end",
);
expect(bToAEdge).toBeDefined();
});
});
@@ -0,0 +1,137 @@
import { describe, expect, it } from "vitest";
import type { AnyWorkEdge, AnyWorkNode } from "../../type.js";
import { validate } from "../validate.js";
function roleNode(id: string): AnyWorkNode {
return {
id,
type: "role",
data: { name: id, description: "", identity: "", prepare: "", execute: "", report: "" },
position: { x: 0, y: 0 },
} as AnyWorkNode;
}
function startNode(): AnyWorkNode {
return {
id: "start",
type: "start",
data: { label: "Start" },
position: { x: 0, y: 0 },
} as AnyWorkNode;
}
function endNode(): AnyWorkNode {
return {
id: "end",
type: "end",
data: { label: "End" },
position: { x: 0, y: 0 },
} as AnyWorkNode;
}
function defaultEdge(source: string, target: string): AnyWorkEdge {
return { id: `${source}-${target}`, source, target, animated: true } as AnyWorkEdge;
}
function conditionalEdge(source: string, target: string, condition: string): AnyWorkEdge {
return {
id: `${source}-${target}-cond`,
source,
target,
type: "conditional" as const,
data: { condition },
animated: true,
} as AnyWorkEdge;
}
// Helper: build a minimal valid graph with 2 role nodes for validateRoleNodes tests
function baseNodes(...roles: AnyWorkNode[]): AnyWorkNode[] {
return [startNode(), ...roles, endNode()];
}
describe("validateRoleNodes (via validate)", () => {
it("5.1 Role node with no incoming edge → error about missing input", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const nodes = baseNodes(n1, n2);
// n1 has no incoming, n2 has incoming+outgoing
const edges = [defaultEdge("start", "n2"), defaultEdge("n1", "end"), defaultEdge("n2", "end")];
const result = validate(nodes, edges);
const nodeErrors = result.errors.filter((e) => e.nodeId === "n1");
expect(nodeErrors.some((e) => e.message.includes("缺少输入连接"))).toBe(true);
});
it("5.2 Role node with no outgoing edge → error about missing output", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const nodes = baseNodes(n1, n2);
const edges = [
defaultEdge("start", "n1"),
defaultEdge("start", "n2"),
defaultEdge("n2", "end"),
// n1 has no outgoing
];
const result = validate(nodes, edges);
const nodeErrors = result.errors.filter((e) => e.nodeId === "n1");
expect(nodeErrors.some((e) => e.message.includes("缺少输出连接"))).toBe(true);
});
it("5.3 Empty condition on non-first conditional edge → error", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const n3 = roleNode("n3");
const nodes = baseNodes(n1, n2, n3);
const edges = [
defaultEdge("start", "n1"),
conditionalEdge("n1", "n2", ""), // else-branch (index 0) - exempt
conditionalEdge("n1", "n3", ""), // if-branch (index 1) - empty condition → error
defaultEdge("n2", "end"),
defaultEdge("n3", "end"),
];
const result = validate(nodes, edges);
expect(result.errors.some((e) => e.message.includes("条件表达式不能为空"))).toBe(true);
});
it("5.4 Mix of conditional and non-conditional outgoing → error", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const n3 = roleNode("n3");
const nodes = baseNodes(n1, n2, n3);
const edges = [
defaultEdge("start", "n1"),
conditionalEdge("n1", "n2", "x>0"),
defaultEdge("n1", "n3"), // mix → error
defaultEdge("n2", "end"),
defaultEdge("n3", "end"),
];
const result = validate(nodes, edges);
expect(result.errors.some((e) => e.message.includes("所有出边必须附带条件"))).toBe(true);
});
it("5.5 Valid role node (1 in, 1 out default) → no errors for that node", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const nodes = baseNodes(n1, n2);
const edges = [defaultEdge("start", "n1"), defaultEdge("n1", "n2"), defaultEdge("n2", "end")];
const result = validate(nodes, edges);
const roleErrors = result.errors.filter((e) => e.nodeId === "n1" || e.nodeId === "n2");
expect(roleErrors).toHaveLength(0);
});
it("5.6 Valid role node (1 in, 2 conditional out with conditions) → no errors", () => {
const n1 = roleNode("n1");
const n2 = roleNode("n2");
const n3 = roleNode("n3");
const nodes = baseNodes(n1, n2, n3);
const edges = [
defaultEdge("start", "n1"),
conditionalEdge("n1", "n2", ""), // else-branch
conditionalEdge("n1", "n3", "x>0"), // if-branch
defaultEdge("n2", "end"),
defaultEdge("n3", "end"),
];
const result = validate(nodes, edges);
const n1Errors = result.errors.filter((e) => e.nodeId === "n1");
expect(n1Errors).toHaveLength(0);
});
});
@@ -28,6 +28,109 @@ function assignHandles(
}
}
function buildNodeMap(
steps: WorkFlowStep[],
nodes: AnyWorkNode[],
): { nameToId: Map<string, string>; idToOrder: Map<string, number> } {
const nameToId = new Map<string, string>();
const idToOrder = new Map<string, number>();
nameToId.set("END", "end");
idToOrder.set("start", -1);
idToOrder.set("end", steps.length);
for (let si = 0; si < steps.length; si++) {
const step = steps[si];
const nodeId = `n${uuid()}`;
nameToId.set(step.role.name, nodeId);
idToOrder.set(nodeId, si);
nodes.push({ id: nodeId, type: "role", data: { ...step.role }, position: { x: 0, y: 0 } });
}
return { nameToId, idToOrder };
}
function sortTransitions(step: WorkFlowStep): WorkFlowStep["transitions"] {
if (step.transitions.length <= 1) return step.transitions;
return [...step.transitions].sort((a, b) => {
if (a.condition === null && b.condition !== null) return -1;
if (a.condition !== null && b.condition === null) return 1;
return 0;
});
}
function buildStepEdges(
sourceId: string,
step: WorkFlowStep,
nameToId: Map<string, string>,
): { elseEdges: AnyWorkEdge[]; ifEdges: AnyWorkEdge[] } {
const hasMultiple = step.transitions.length > 1;
const sorted = sortTransitions(step);
const elseEdges: AnyWorkEdge[] = [];
const ifEdges: AnyWorkEdge[] = [];
for (let i = 0; i < sorted.length; i++) {
const t = sorted[i];
const targetId = nameToId.get(t.target);
if (!targetId) continue;
const edgeId = `e-${sourceId}-${targetId}-${i}`;
if (hasMultiple || t.condition !== null) {
const edge: ConditionalEdge = {
id: edgeId,
source: sourceId,
target: targetId,
sourceHandle: "output",
targetHandle: "input",
type: "conditional",
data: { condition: t.condition ?? "" },
animated: true,
};
if (hasMultiple && i === 0) elseEdges.push(edge);
else ifEdges.push(edge);
} else {
elseEdges.push({
id: edgeId,
source: sourceId,
target: targetId,
sourceHandle: "output",
targetHandle: "input",
animated: true,
});
}
}
return { elseEdges, ifEdges };
}
function pushStepEdges(
edges: AnyWorkEdge[],
elseEdges: AnyWorkEdge[],
ifEdges: AnyWorkEdge[],
idToOrder: Map<string, number>,
): void {
for (const e of elseEdges) edges.push({ ...e, sourceHandle: "output" });
if (ifEdges.length > 0) {
const ifHandles = ["output-top", "output-bottom"] as const;
const sorted = [...ifEdges].sort(
(a, b) => (idToOrder.get(b.target) ?? 0) - (idToOrder.get(a.target) ?? 0),
);
for (let i = 0; i < sorted.length; i++) {
edges.push({ ...sorted[i], sourceHandle: ifHandles[i % ifHandles.length] });
}
}
}
function assignTargetHandles(edges: AnyWorkEdge[], idToOrder: Map<string, number>): void {
const incomingByTarget = new Map<string, number[]>();
for (let i = 0; i < edges.length; i++) {
const target = edges[i].target;
if (!incomingByTarget.has(target)) incomingByTarget.set(target, []);
incomingByTarget.get(target)?.push(i);
}
for (const indices of incomingByTarget.values()) {
indices.sort(
(a, b) => (idToOrder.get(edges[a].source) ?? 0) - (idToOrder.get(edges[b].source) ?? 0),
);
assignHandles(indices, edges, IN_HANDLES, "targetHandle");
}
}
export function transIn(steps: WorkFlowStep[]): Result {
const startNode: AnyWorkNode = {
id: "start",
@@ -42,30 +145,12 @@ export function transIn(steps: WorkFlowStep[]): Result {
position: { x: 250, y: 0 },
};
if (steps.length === 0) {
return { nodes: [startNode, endNode], edges: [] };
}
if (steps.length === 0) return { nodes: [startNode, endNode], edges: [] };
const nodes: AnyWorkNode[] = [startNode, endNode];
const edges: AnyWorkEdge[] = [];
const nameToId = new Map<string, string>();
const idToOrder = new Map<string, number>();
nameToId.set("END", "end");
idToOrder.set("start", -1);
idToOrder.set("end", steps.length);
for (let si = 0; si < steps.length; si++) {
const step = steps[si];
const nodeId = `n${uuid()}`;
nameToId.set(step.role.name, nodeId);
idToOrder.set(nodeId, si);
nodes.push({
id: nodeId,
type: "role",
data: { ...step.role },
position: { x: 0, y: 0 },
});
}
const { nameToId, idToOrder } = buildNodeMap(steps, nodes);
const firstStepId = nameToId.get(steps[0].role.name) ?? "";
edges.push({
@@ -79,88 +164,11 @@ export function transIn(steps: WorkFlowStep[]): Result {
for (const step of steps) {
const sourceId = nameToId.get(step.role.name) ?? "";
const _sourceOrder = idToOrder.get(sourceId) ?? 0;
const hasMultipleTransitions = step.transitions.length > 1;
const sorted = hasMultipleTransitions
? [...step.transitions].sort((a, b) => {
if (a.condition === null && b.condition !== null) return -1;
if (a.condition !== null && b.condition === null) return 1;
return 0;
})
: step.transitions;
const elseEdges: AnyWorkEdge[] = [];
const ifEdges: AnyWorkEdge[] = [];
for (let i = 0; i < sorted.length; i++) {
const t = sorted[i];
const targetId = nameToId.get(t.target);
if (!targetId) continue;
const edgeId = `e-${sourceId}-${targetId}-${i}`;
if (hasMultipleTransitions || t.condition !== null) {
const edge: ConditionalEdge = {
id: edgeId,
source: sourceId,
target: targetId,
sourceHandle: "output",
targetHandle: "input",
type: "conditional",
data: { condition: t.condition ?? "" },
animated: true,
};
if (hasMultipleTransitions && i === 0) {
elseEdges.push(edge);
} else {
ifEdges.push(edge);
}
} else {
elseEdges.push({
id: edgeId,
source: sourceId,
target: targetId,
sourceHandle: "output",
targetHandle: "input",
animated: true,
});
}
}
// out: else → output (right); if → sort by target order desc (rightmost first), then top/bottom
for (const e of elseEdges) {
edges.push({ ...e, sourceHandle: "output" });
}
if (ifEdges.length > 0) {
const sortedIf = [...ifEdges].sort((a, b) => {
const oa = idToOrder.get(a.target) ?? 0;
const ob = idToOrder.get(b.target) ?? 0;
return ob - oa;
});
const ifHandles = ["output-top", "output-bottom"] as const;
for (let i = 0; i < sortedIf.length; i++) {
edges.push({ ...sortedIf[i], sourceHandle: ifHandles[i % ifHandles.length] });
}
}
const { elseEdges, ifEdges } = buildStepEdges(sourceId, step, nameToId);
pushStepEdges(edges, elseEdges, ifEdges, idToOrder);
}
// in: group by target, sort by source order asc (leftmost first), assign input > input-top > input-bottom
const incomingByTarget = new Map<string, number[]>();
for (let i = 0; i < edges.length; i++) {
const target = edges[i].target;
if (!incomingByTarget.has(target)) incomingByTarget.set(target, []);
incomingByTarget.get(target)?.push(i);
}
for (const indices of incomingByTarget.values()) {
indices.sort((a, b) => {
const oa = idToOrder.get(edges[a].source) ?? 0;
const ob = idToOrder.get(edges[b].source) ?? 0;
return oa - ob;
});
assignHandles(indices, edges, IN_HANDLES, "targetHandle");
}
assignTargetHandles(edges, idToOrder);
return { nodes, edges };
}
@@ -91,6 +91,36 @@ function validateEndNode(
}
}
function hasEmptyConditionOnIfEdge(conditionalEdges: AnyWorkEdge[]): boolean {
return conditionalEdges.slice(1).some((edge) => {
const cond = (edge as ConditionalEdge).data?.condition?.trim();
return !cond;
});
}
function validateRoleNodeEdges(
node: AnyWorkNode,
outEdges: AnyWorkEdge[],
inEdges: AnyWorkEdge[],
errors: ValidationError[],
): void {
if (inEdges.length === 0) {
errors.push({ nodeId: node.id, message: "角色节点缺少输入连接" });
}
if (outEdges.length === 0) {
errors.push({ nodeId: node.id, message: "角色节点缺少输出连接" });
return;
}
if (outEdges.length <= 1) return;
const conditionalEdges = outEdges.filter((e) => e.type === "conditional");
if (conditionalEdges.length !== outEdges.length) {
errors.push({ nodeId: node.id, message: "多输出节点的所有出边必须附带条件" });
} else if (hasEmptyConditionOnIfEdge(conditionalEdges)) {
errors.push({ nodeId: node.id, message: "条件边的条件表达式不能为空" });
}
}
function validateRoleNodes(
roleNodes: AnyWorkNode[],
outgoing: Map<string, AnyWorkEdge[]>,
@@ -98,31 +128,7 @@ function validateRoleNodes(
errors: ValidationError[],
): void {
for (const node of roleNodes) {
const inEdges = incoming.get(node.id) ?? [];
const outEdges = outgoing.get(node.id) ?? [];
if (inEdges.length === 0) {
errors.push({ nodeId: node.id, message: "角色节点缺少输入连接" });
}
if (outEdges.length === 0) {
errors.push({ nodeId: node.id, message: "角色节点缺少输出连接" });
}
if (outEdges.length > 1) {
const conditionalEdges = outEdges.filter((e) => e.type === "conditional");
if (conditionalEdges.length !== outEdges.length) {
errors.push({ nodeId: node.id, message: "多输出节点的所有出边必须附带条件" });
} else {
const ifEdges = conditionalEdges.slice(1);
for (const edge of ifEdges) {
const condEdge = edge as ConditionalEdge;
if (!condEdge.data?.condition?.trim()) {
errors.push({ nodeId: node.id, message: "条件边的条件表达式不能为空" });
break;
}
}
}
}
validateRoleNodeEdges(node, outgoing.get(node.id) ?? [], incoming.get(node.id) ?? [], errors);
}
}
@@ -0,0 +1,14 @@
import path from "node:path";
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
environment: "node",
include: ["src/**/__tests__/**/*.test.ts"],
},
resolve: {
alias: {
"@": path.resolve(import.meta.dirname, "./src"),
},
},
});
+2
View File
@@ -15,6 +15,8 @@ export type {
ProviderConfig,
RoleDefinition,
RoleName,
RunningThreadItem,
RunningThreadsOutput,
Scenario,
StartEntry,
StartNodePayload,
+14
View File
@@ -84,6 +84,7 @@ export type StepOutput = {
thread: ThreadId;
head: CasRef;
done: boolean;
background: boolean | null;
};
/** uwf thread steps — single step entry */
@@ -126,6 +127,19 @@ export type ThreadListItem = {
head: CasRef;
};
/** uwf thread running — single running thread entry */
export type RunningThreadItem = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
/** uwf thread running output */
export type RunningThreadsOutput = {
threads: RunningThreadItem[];
};
// ── 4.6 配置 ────────────────────────────────────────────────────────
/** Alias types for config references */
+89
View File
@@ -0,0 +1,89 @@
#!/usr/bin/env bash
# batch-solve.sh — solve multiple Gitea issues via solve-issue workflow
#
# Usage:
# ./scripts/batch-solve.sh [--agent CMD] [--repo OWNER/REPO] [--count N] ISSUE_NUM...
#
# Examples:
# ./scripts/batch-solve.sh 448 449
# ./scripts/batch-solve.sh --agent "bun run $(pwd)/packages/workflow-agent-claude-code/src/cli.ts" 448 449
# ./scripts/batch-solve.sh --repo uncaged/workflow --count 15 448 449
set -euo pipefail
AGENT=""
REPO="uncaged/workflow"
COUNT=10
ISSUES=()
while [[ $# -gt 0 ]]; do
case "$1" in
--agent) AGENT="$2"; shift 2 ;;
--repo) REPO="$2"; shift 2 ;;
--count) COUNT="$2"; shift 2 ;;
*) ISSUES+=("$1"); shift ;;
esac
done
if [[ ${#ISSUES[@]} -eq 0 ]]; then
echo "Usage: $0 [--agent CMD] [--repo OWNER/REPO] [--count N] ISSUE_NUM..." >&2
exit 1
fi
AGENT_FLAG=""
if [[ -n "$AGENT" ]]; then
AGENT_FLAG="--agent $AGENT"
fi
TOTAL=${#ISSUES[@]}
PASSED=0
FAILED=0
RESULTS=()
echo "━━━ Batch solve: ${TOTAL} issues ━━━"
echo ""
for i in "${!ISSUES[@]}"; do
ISSUE="${ISSUES[$i]}"
NUM=$((i + 1))
echo "┌─── [$NUM/$TOTAL] Issue #${ISSUE} ───"
# Read issue title
TITLE=$(tea issues "$ISSUE" -r "$REPO" 2>/dev/null | head -1 | sed 's/^# #[0-9]* //' | sed 's/ (.*//' || echo "unknown")
echo "│ Title: $TITLE"
# Start thread
PROMPT="Fix issue #${ISSUE} in ${REPO}. Read the issue first with 'tea issues ${ISSUE} -r ${REPO}' for full spec."
THREAD_JSON=$(uwf thread start solve-issue -p "$PROMPT" 2>&1)
THREAD_ID=$(echo "$THREAD_JSON" | python3 -c "import json,sys; print(json.load(sys.stdin)['thread'])")
echo "│ Thread: $THREAD_ID"
# Run steps
echo "│ Running (max $COUNT steps)..."
# shellcheck disable=SC2086
if STEP_OUTPUT=$(uwf thread step "$THREAD_ID" $AGENT_FLAG -c "$COUNT" 2>&1); then
# Check if done
LAST_DONE=$(echo "$STEP_OUTPUT" | python3 -c "import json,sys; lines=sys.stdin.read().strip(); data=json.loads(lines); print(data[-1].get('done', False))")
if [[ "$LAST_DONE" == "True" ]]; then
echo "│ ✅ Done!"
PASSED=$((PASSED + 1))
RESULTS+=("✅ #${ISSUE}${TITLE}")
else
echo "│ ⚠️ Ran out of steps (not done)"
FAILED=$((FAILED + 1))
RESULTS+=("⚠️ #${ISSUE}${TITLE} (incomplete)")
fi
else
echo "│ ❌ Failed"
FAILED=$((FAILED + 1))
RESULTS+=("❌ #${ISSUE}${TITLE} (error)")
fi
echo "└───"
echo ""
done
echo "━━━ Results: ${PASSED}/${TOTAL} passed, ${FAILED} failed ━━━"
for R in "${RESULTS[@]}"; do
echo " $R"
done