Compare commits

..

4 Commits

Author SHA1 Message Date
xingyue 318f8c7fa6 ci: test runner v4 2026-05-25 19:42:50 +08:00
xingyue 32569e4248 ci: test runner v3 2026-05-25 19:41:54 +08:00
xingyue 3e4cc4cd33 ci: retry actions runner test 2026-05-25 19:38:54 +08:00
xingyue 1d4692db50 ci: add gitea actions workflow 2026-05-25 19:36:04 +08:00
33 changed files with 717 additions and 2831 deletions
+28
View File
@@ -0,0 +1,28 @@
name: CI
on:
push:
branches: ['*']
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Bun
uses: oven-sh/setup-bun@v2
- name: Install dependencies
run: bun install
- name: Lint
run: bun run lint
- name: Type check
run: bun run typecheck
- name: Test
run: bun test
+11 -27
View File
@@ -38,26 +38,19 @@ roles:
capabilities:
- coding
procedure: |
IMPORTANT: Always work in a git worktree, NEVER modify the main working directory directly.
Before starting any work, set up an isolated worktree:
1. `cd ~/repos/workflow && git fetch origin` to get latest refs
2. First time (no existing branch):
- `git worktree add ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug> -b fix/<issue-number>-<short-slug> origin/main`
- `cd ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug> && bun install`
3. If bounced back from reviewer or tester (branch already exists):
- The worktree should already exist at `~/repos/workflow-worktrees/fix/<issue-number>-<short-slug>`
- `cd ~/repos/workflow-worktrees/fix/<issue-number>-<short-slug>`
- `git fetch origin && git rebase origin/main`
4. ALL subsequent work must happen inside the worktree directory.
Before starting any work, ensure a clean worktree:
1. `git checkout main && git pull` to get the latest code
2. `git checkout -b fix/<issue-number>-<short-description>` to create a fresh branch
- If bounced back from reviewer or tester, reuse the existing branch and rebase onto latest main:
`git checkout main && git pull && git checkout <branch> && git rebase main`
Then implement TDD:
5. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
6. If bounced back from reviewer or tester: read the previous role's output to understand what needs fixing
7. Write tests first based on the spec
8. Implement the code to make tests pass
9. Ensure `bun run build` passes with no errors
10. Run `bun test` to verify all tests pass
3. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
4. If bounced back from reviewer or tester: read the previous role's output to understand what needs fixing
5. Write tests first based on the spec
6. Implement the code to make tests pass
7. Ensure `bun run build` passes with no errors
8. Run `bun test` to verify all tests pass
output: "List all files changed and provide a summary. Frontmatter must include: status (done or failed)."
frontmatter:
type: object
@@ -73,8 +66,6 @@ roles:
- code-review
- static-analysis
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
Before reviewing, verify the git branch:
1. Run `git branch --show-current` — confirm the branch name references the issue number being worked on
2. If the branch doesn't correspond to the issue, flag it in your output and reject
@@ -108,8 +99,6 @@ roles:
capabilities:
- testing
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
1. Run `bun test` for automated test verification
2. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
3. Verify each scenario in the spec is covered and passing
@@ -130,8 +119,6 @@ roles:
goal: "You are a committer agent. You create a clean commit and push a PR linking the original issue."
capabilities: []
procedure: |
First, cd into the worktree: `cd ~/repos/workflow-worktrees/fix/<issue-number>-*` (find the exact directory)
Note: You inherit the developer's worktree and branch. Do NOT create a new branch.
1. Stage all changes: `git add -A`
2. Commit with a descriptive message referencing the issue: `git commit -m "type: description\n\nFixes #N"`
@@ -139,9 +126,6 @@ roles:
- If push hook fails: capture the error log in your output, mark hook_failed
4. On push success: create a PR via `tea pr create --title "..." --description "..."`
- PR description must follow the project template: What / Why / Changes / Ref sections, with `Fixes #N` in Ref
5. After PR creation, clean up the worktree:
- `cd ~/repos/workflow`
- `git worktree remove ~/repos/workflow-worktrees/fix/<issue-number>-<slug>`
output: "Include PR URL on success or error log on failure. Frontmatter must include: success (true or false)."
frontmatter:
type: object
+13 -91
View File
@@ -6,18 +6,6 @@
Layer 4 entry point for the workflow engine. The `uwf` binary orchestrates one step per invocation: load thread head from `threads.yaml`, run the moderator, spawn the configured agent CLI, run extract, append a CAS step node, and update the head pointer (or archive when `$END`).
### Four-Layer Architecture
```
workflow → thread → step → turn
模板定义 执行实例 单步结果 agent内部交互
```
- **Workflow** (layer 1): YAML template with roles and routing graph
- **Thread** (layer 2): Single workflow execution instance
- **Step** (layer 3): One moderator→agent→extract cycle
- **Turn** (layer 4): Agent-internal interactions (use `step read` or CAS to inspect)
This package has no library `src/index.ts` — it is consumed as a CLI binary only.
**Dependencies:** `@uncaged/json-cas`, `@uncaged/json-cas-fs`, `@uncaged/workflow-agent-kit`, `@uncaged/workflow-moderator`, `@uncaged/workflow-protocol`, `@uncaged/workflow-util`, `commander`, `dotenv`, `yaml`
@@ -42,53 +30,34 @@ bun link packages/cli-workflow
-h, --help Show help
```
### Thread (Layer 2: Execution Instances)
### Thread
| Command | Description |
|---------|-------------|
| `uwf thread start <workflow> -p <prompt>` | Create a thread without executing |
| `uwf thread exec <thread-id> [--agent <cmd>] [-c <count>] [--background]` | Execute one or more moderator→agent→extract cycles |
| `uwf thread step <thread-id> [--agent <cmd>] [-c <count>]` | Execute one or more moderator→agent→extract cycles |
| `uwf thread show <thread-id>` | Show thread head pointer |
| `uwf thread list [--status <idle\|running\|completed>]` | List threads, optionally filtered by status |
| `uwf thread list [--all]` | List active threads (`--all` includes archived) |
| `uwf thread steps <thread-id>` | List all steps chronologically |
| `uwf thread read <thread-id> [--quota N] [--before <hash>] [--start]` | Render thread as readable markdown |
| `uwf thread stop <thread-id>` | Stop background execution (keep thread active) |
| `uwf thread cancel <thread-id>` | Cancel thread (stop + archive to history) |
| `uwf thread fork <step-hash>` | Fork from a specific step |
| `uwf thread step-details <step-hash>` | Dump full detail node as YAML |
| `uwf thread kill <thread-id>` | Terminate and archive |
Examples:
```bash
uwf thread start solve-issue -p "Fix the login redirect bug"
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin
uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV --background
uwf thread list --status running
uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin
uwf thread read 01ARZ3NDEKTSV4RRFFQ69G5FAV --quota 8000
uwf thread stop 01ARZ3NDEKTSV4RRFFQ69G5FAV
```
### Step (Layer 3: Single Cycle Results)
### Workflow
| Command | Description |
|---------|-------------|
| `uwf step list <thread-id>` | List all steps in a thread chronologically |
| `uwf step show <step-hash>` | Show step metadata and frontmatter |
| `uwf step read <step-hash> [--before N]` | Read step output as markdown |
| `uwf step fork <step-hash>` | Fork a thread from a specific step |
Examples:
```bash
uwf step list 01ARZ3NDEKTSV4RRFFQ69G5FAV
uwf step show 32GCDE899RRQ3
uwf step read 32GCDE899RRQ3 --before 3
uwf step fork 32GCDE899RRQ3
```
### Workflow (Layer 1: Templates)
| Command | Description |
|---------|-------------|
| `uwf workflow add <file.yaml>` | Register a workflow from YAML |
| `uwf workflow put <file.yaml>` | Register a workflow from YAML |
| `uwf workflow show <name-or-hash>` | Show workflow definition |
| `uwf workflow list` | List registered workflows |
@@ -130,52 +99,6 @@ Config: `~/.uncaged/workflow/config.yaml`. API keys: `~/.uncaged/workflow/.env`.
| `uwf log show [--thread <id>] [--process <pid>] [--date YYYY-MM-DD]` | Show filtered log entries |
| `uwf log clean [--before YYYY-MM-DD]` | Delete old log files |
## Migration Guide
### Breaking Changes (v0.x → v1.x)
The CLI was reorganized to clarify the four-layer architecture. **No backward compatibility** — old commands have been removed.
#### Renamed Commands
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `workflow put` | `workflow add` | More intuitive verb |
| `thread step` | `thread exec` | Eliminates ambiguity with "step" noun |
| `thread list --all` | `thread list --status completed` | Unified status filtering |
#### Removed Commands (Merged)
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `thread running` | `thread list --status running` | Merged into unified list |
#### Removed Commands (Split)
| Old Command | New Commands | Notes |
|------------|-------------|-------|
| `thread kill` | `thread stop` or `thread cancel` | `stop` keeps thread active, `cancel` archives it |
#### Moved Commands
| Old Command | New Command | Notes |
|------------|-------------|-------|
| `thread steps` | `step list` | Moved to step layer |
| `thread step-details` | `step show` | Moved to step layer |
| `thread fork` | `step fork` | Moved to step layer (forks are step-based) |
#### Deprecation Errors
Old commands now show helpful error messages:
```bash
$ uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV
Error: Command 'thread step' has been removed.
Use 'thread exec' instead.
For more information, see: uwf help thread exec
```
## Internal Structure
```
@@ -186,9 +109,8 @@ src/
├── validate.ts Workflow YAML validation
├── schemas.ts CLI-local schema registration
└── commands/
├── thread.ts Thread lifecycle and exec
├── step.ts Step operations (list/show/read/fork)
├── workflow.ts Workflow registry (add/show/list)
├── thread.ts Thread lifecycle and step execution
├── workflow.ts Workflow registry (put/show/list)
├── cas.ts CAS inspection and schema ops
├── setup.ts Interactive/non-interactive setup
├── skill.ts Built-in skill references
@@ -1,683 +0,0 @@
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { saveThreadsIndex } from "../store.js";
// ── schemas used in tests ────────────────────────────────────────────────────
const TURN_SCHEMA = {
title: "hermes-turn",
type: "object" as const,
required: ["index", "role", "content"],
properties: {
index: { type: "integer" as const },
role: { type: "string" as const },
content: { type: "string" as const },
toolCalls: {
anyOf: [
{ type: "array" as const, items: { type: "object" as const } },
{ type: "null" as const },
],
},
reasoning: { anyOf: [{ type: "string" as const }, { type: "null" as const }] },
},
additionalProperties: false,
};
const DETAIL_SCHEMA = {
title: "hermes-detail",
type: "object" as const,
required: ["sessionId", "model", "duration", "turnCount", "turns"],
properties: {
sessionId: { type: "string" as const },
model: { type: "string" as const },
duration: { type: "integer" as const },
turnCount: { type: "integer" as const },
turns: {
type: "array" as const,
items: { type: "string" as const, format: "cas_ref" },
},
},
additionalProperties: false,
};
// ── helpers ───────────────────────────────────────────────────────────────────
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
}
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
await bootstrap(store);
const [turn, detail] = await Promise.all([
putSchema(store, TURN_SCHEMA),
putSchema(store, DETAIL_SCHEMA),
]);
return { turn, detail };
}
// ── fixture ───────────────────────────────────────────────────────────────────
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
// ── thread read XML tag isolation ─────────────────────────────────────────────
describe("thread read XML tag isolation", () => {
test("scenario 1: wraps output in XML tags instead of heading", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
planner: {
description: "Planner",
goal: "You are a planning agent. Your task is to...",
capabilities: [],
procedure: "Plan the work.",
output: "Summarize the plan.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Fix issue #459",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content:
"---\nstatus: ready\nplan: CMWGHQKT58RY4\n---\n\n# Analysis Complete\n## Issue Summary\nThe issue requires XML tag isolation.",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "planner",
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
});
const threadId = "01JTEST0000000000000001" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should wrap output in XML tags
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
// Should not have ### Content heading
expect(markdown).not.toContain("### Content");
// Should preserve markdown headings inside output tags
expect(markdown).toContain("# Analysis Complete");
expect(markdown).toContain("## Issue Summary");
});
test("scenario 2: wraps prompt in XML tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
planner: {
description: "Planner",
goal: "You are a planning agent. Your task is to analyze and plan.",
capabilities: [],
procedure: "Plan the work.",
output: "Summarize the plan.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Fix issue",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "---\nstatus: ready\n---\n\nContent here...",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "planner",
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
});
const threadId = "01JTEST0000000000000002" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should wrap prompt in XML tags
expect(markdown).toContain("<prompt>");
expect(markdown).toContain("</prompt>");
expect(markdown).toContain("You are a planning agent. Your task is to analyze and plan.");
// Should not have ### Prompt heading
expect(markdown).not.toContain("### Prompt");
// Should wrap output in XML tags
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
});
test("scenario 3: same role repeated does not show prompt twice", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
writer: {
description: "Writer",
goal: "You are a writer agent.",
capabilities: [],
procedure: "Write content.",
output: "Summarize writing.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Write something",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "writer",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1 as CasRef,
role: "writer",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step2 });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should only show prompt tags once
const promptCount = (markdown.match(/<prompt>/g) ?? []).length;
expect(promptCount).toBe(1);
});
test("scenario 4: step with no detail shows no output tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
worker: {
description: "Worker",
goal: "You are a worker agent.",
capabilities: [],
procedure: "Do work.",
output: "Summarize work.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Do stuff",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should not have output tags
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
// Step header should still be displayed
expect(markdown).toContain("## Step 1: worker");
// Prompt should still be shown
expect(markdown).toContain("<prompt>");
});
test("scenario 5: empty content shows no output tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Do stuff",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
// A detail ref that doesn't exist → extractLastAssistantContent returns null
const missingDetailRef = "missingdetail0" as CasRef;
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: missingDetailRef,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Should not have output tags
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
});
test("scenario 6: thread read with --start flag shows task section", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000006" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true);
// Should include task section
expect(markdown).toContain("# Thread");
expect(markdown).toContain("## Task");
expect(markdown).toContain("Initial prompt");
// Prompts should use XML tags
expect(markdown).toContain("<prompt>");
});
test("scenario 7: thread read with --before parameter", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
roleB: {
description: "Role B",
goal: "Goal for roleB",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
roleC: {
description: "Role C",
goal: "Goal for roleC",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1 as CasRef,
role: "roleB",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step3 = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step2 as CasRef,
role: "roleC",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000007" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step3 });
const markdown = await cmdThreadRead(
tmpDir,
threadId,
THREAD_READ_DEFAULT_QUOTA,
step2 as CasRef,
false,
);
// Should only show roleA
expect(markdown).toContain("roleA");
expect(markdown).not.toContain("roleB");
expect(markdown).not.toContain("roleC");
// Should use XML tags
expect(markdown).toContain("<prompt>");
});
test("scenario 9: special characters in content are preserved", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
writer: {
description: "Writer",
goal: "You are a writer.",
capabilities: [],
procedure: "Write content.",
output: "Summarize.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Write something",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Content with <special> & characters > like <this>",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sx",
model: "mx",
duration: 500,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "writer",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000008" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
// Special characters should be preserved as-is
expect(markdown).toContain("Content with <special> & characters > like <this>");
});
test("scenario 10: quota limit with XML tags", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
roleA: {
description: "Role A",
goal: "Goal for roleA",
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const steps: CasRef[] = [];
let prev: CasRef | null = null;
for (let i = 0; i < 5; i++) {
const step = (await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
})) as CasRef;
steps.push(step);
prev = step;
}
const threadId = "01JTEST0000000000000009" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[steps.length - 1]! });
// Use very small quota
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
// Should have skip hint
expect(markdown).toContain("earlier step");
// Should have XML tags for displayed steps
if (markdown.includes("<prompt>")) {
expect(markdown).toContain("</prompt>");
}
});
});
@@ -22,48 +22,48 @@ function runCli(args: string[]): { stdout: string; stderr: string; exitCode: num
}
}
describe("thread exec --count CLI parsing", () => {
describe("thread step --count CLI parsing", () => {
test("--help shows -c/--count option", () => {
const result = runCli(["thread", "exec", "--help"]);
const result = runCli(["thread", "step", "--help"]);
expect(result.stdout).toContain("--count");
expect(result.stdout).toContain("-c");
});
test("description says 'one or more steps'", () => {
const result = runCli(["thread", "exec", "--help"]);
const result = runCli(["thread", "step", "--help"]);
expect(result.stdout).toContain("one or more steps");
});
});
describe("cmdThreadExec count logic", () => {
describe("cmdThreadStep count logic", () => {
test("count=0 fails with validation error", () => {
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "0"]);
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "0"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("negative count fails with validation error", () => {
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "-1"]);
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "-1"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("non-integer count fails with validation error", () => {
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "1.5"]);
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "1.5"]);
expect(result.exitCode).not.toBe(0);
expect(result.stderr).toContain("positive integer");
});
test("count=1 is the default (no -c flag)", () => {
// Without -c, it should attempt to run 1 step (failing on missing thread, not on count validation)
const result = runCli(["thread", "exec", "FAKE_THREAD_ID"]);
const result = runCli(["thread", "step", "FAKE_THREAD_ID"]);
expect(result.exitCode).not.toBe(0);
// Should NOT contain "positive integer" error — should fail on thread lookup instead
expect(result.stderr).not.toContain("positive integer");
});
test("count=3 passes validation (fails on thread lookup)", () => {
const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "3"]);
const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "3"]);
expect(result.exitCode).not.toBe(0);
// Should NOT contain "positive integer" error — should fail on thread/storage lookup
expect(result.stderr).not.toContain("positive integer");
@@ -5,9 +5,9 @@ import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdStepShow } from "../commands/step.js";
import {
cmdThreadRead,
cmdThreadStepDetails,
extractLastAssistantContent,
THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js";
@@ -198,10 +198,10 @@ describe("extractLastAssistantContent", () => {
});
});
// ── cmdThreadRead: <output> section ──────────────────────────────────────────
// ── cmdThreadRead: ### Content section ───────────────────────────────────────
describe("cmdThreadRead <output> section", () => {
test("includes <output> tags when detail has assistant turns", async () => {
describe("cmdThreadRead ### Content section", () => {
test("includes ### Content before ### Output when detail has assistant turns", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
@@ -264,13 +264,12 @@ describe("cmdThreadRead <output> section", () => {
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).toContain("<output>");
expect(markdown).toContain("</output>");
expect(markdown).toContain("### Content");
expect(markdown).toContain("The assistant response text");
expect(markdown).not.toContain("### Content");
expect(markdown).not.toContain("### Output");
});
test("omits <output> tags when detail has no matching assistant turns", async () => {
test("omits ### Content when detail has no matching assistant turns", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
@@ -309,15 +308,14 @@ describe("cmdThreadRead <output> section", () => {
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).not.toContain("<output>");
expect(markdown).not.toContain("</output>");
expect(markdown).not.toContain("### Content");
expect(markdown).not.toContain("### Output");
});
});
// ── cmdStepShow ───────────────────────────────────────────────────────────────
// ── cmdThreadStepDetails ──────────────────────────────────────────────────────
describe("cmdStepShow", () => {
describe("cmdThreadStepDetails", () => {
test("returns expanded detail node with turns inlined", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
@@ -365,7 +363,7 @@ describe("cmdStepShow", () => {
agent: "uwf-hermes",
});
const result = await cmdStepShow(tmpDir, stepHash);
const result = await cmdThreadStepDetails(tmpDir, stepHash);
expect(result).toMatchObject({
sessionId: "sess42",
@@ -386,9 +384,9 @@ describe("cmdStepShow", () => {
});
});
// ── cmdThreadRead: <prompt> deduplication ───────────────────────────────────
// ── cmdThreadRead: ### Prompt deduplication ───────────────────────────────────
describe("cmdThreadRead <prompt> deduplication", () => {
describe("cmdThreadRead ### Prompt deduplication", () => {
async function makeThreadWithRoles(uwf: UwfStore, roles: string[]): Promise<string> {
const roleMap: Record<string, unknown> = {};
for (const r of [...new Set(roles)]) {
@@ -436,36 +434,36 @@ describe("cmdThreadRead <prompt> deduplication", () => {
return stepHash;
}
test("same consecutive role shows <prompt> once", async () => {
test("same consecutive role shows ### Prompt once", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["writer", "writer"]);
const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length;
const count = (markdown.match(/### Prompt/g) ?? []).length;
expect(count).toBe(1);
});
test("different consecutive roles each show <prompt>", async () => {
test("different consecutive roles each show ### Prompt", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["planner", "coder"]);
const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length;
const count = (markdown.match(/### Prompt/g) ?? []).length;
expect(count).toBe(2);
});
test("non-consecutive same role shows <prompt> twice", async () => {
test("non-consecutive same role shows ### Prompt twice", async () => {
const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["roleA", "roleB", "roleA"]);
const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length;
const count = (markdown.match(/### Prompt/g) ?? []).length;
expect(count).toBe(2);
});
});
@@ -586,9 +584,9 @@ describe("cmdThreadRead start section / before / quota", () => {
// ── Tests that call process.exit must be last ─────────────────────────────────
describe("cmdStepShow (process.exit tests - must be last)", () => {
describe("cmdThreadStepDetails (process.exit tests - must be last)", () => {
test("throws when step hash does not exist", async () => {
await expect(cmdStepShow(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
await expect(cmdThreadStepDetails(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
});
test("before with unknown hash rejects", async () => {
@@ -1,147 +0,0 @@
import { mkdir, readdir, readFile, rename, rm, writeFile } from "node:fs/promises";
import { join } from "node:path";
import type { RunningThreadItem, ThreadId } from "@uncaged/workflow-protocol";
import type { RunningMarker } from "./types.js";
/**
* Get the path to the running markers directory.
*/
export function getRunningDir(storageRoot: string): string {
return join(storageRoot, "running");
}
/**
* Get the path to a specific thread's marker file.
*/
export function getMarkerPath(storageRoot: string, threadId: ThreadId): string {
return join(getRunningDir(storageRoot), `${threadId}.json`);
}
/**
* Check if a PID is still running.
* Returns true if the process exists, false otherwise.
*/
export function isPidAlive(pid: number): boolean {
try {
// process.kill with signal 0 checks existence without killing
process.kill(pid, 0);
return true;
} catch {
// ESRCH means process doesn't exist
return false;
}
}
/**
* Create a marker file for a running thread.
* Writes to a temp file in the same directory, then atomically renames.
*/
export async function createMarker(storageRoot: string, marker: RunningMarker): Promise<void> {
const runningDir = getRunningDir(storageRoot);
await mkdir(runningDir, { recursive: true });
const markerPath = getMarkerPath(storageRoot, marker.thread);
const tempPath = join(runningDir, `.${marker.thread}-${process.pid}.tmp`);
const content = JSON.stringify(marker, null, 2);
await writeFile(tempPath, content, "utf8");
await rename(tempPath, markerPath);
}
/**
* Delete a marker file for a thread.
*/
export async function deleteMarker(storageRoot: string, threadId: ThreadId): Promise<void> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
await rm(markerPath);
} catch {
// Ignore errors if file doesn't exist
}
}
/**
* Read a marker file. Returns null if file doesn't exist or is invalid.
*/
export async function readMarker(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
const content = await readFile(markerPath, "utf8");
const marker = JSON.parse(content) as RunningMarker;
return marker;
} catch {
return null;
}
}
/**
* List all running threads, filtering out stale markers.
*/
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadItem[]> {
const runningDir = getRunningDir(storageRoot);
let files: string[];
try {
files = await readdir(runningDir);
} catch {
// Directory doesn't exist or can't be read
return [];
}
const results: RunningThreadItem[] = [];
for (const filename of files) {
if (!filename.endsWith(".json")) {
continue;
}
const threadId = filename.slice(0, -5) as ThreadId;
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
// Invalid marker file
continue;
}
if (!isPidAlive(marker.pid)) {
// Stale marker - process no longer exists
await deleteMarker(storageRoot, threadId);
continue;
}
results.push({
thread: marker.thread,
workflow: marker.workflow,
pid: marker.pid,
startedAt: marker.startedAt,
});
}
return results;
}
/**
* Check if a thread is currently executing in the background.
* Returns the marker if running, null otherwise.
*/
export async function isThreadRunning(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
return null;
}
if (!isPidAlive(marker.pid)) {
// Stale marker
await deleteMarker(storageRoot, threadId);
return null;
}
return marker;
}
@@ -1,11 +0,0 @@
export {
createMarker,
deleteMarker,
getMarkerPath,
getRunningDir,
isPidAlive,
isThreadRunning,
listRunningThreads,
readMarker,
} from "./background.js";
export type { RunningMarker } from "./types.js";
@@ -1,9 +0,0 @@
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
/** Marker file stored at ~/.uncaged/workflow/running/<thread-id>.json */
export type RunningMarker = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
+47 -201
View File
@@ -1,7 +1,8 @@
#!/usr/bin/env bun
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { Command } from "commander";
import { stringify as yamlStringify } from "yaml";
import {
cmdCasGet,
cmdCasHas,
@@ -16,19 +17,19 @@ import {
import { cmdLogClean, cmdLogList, cmdLogShow } from "./commands/log.js";
import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js";
import { cmdSkillCli } from "./commands/skill.js";
import { cmdStepFork, cmdStepList, cmdStepShow } from "./commands/step.js";
import {
cmdThreadCancel,
cmdThreadExec,
cmdThreadFork,
cmdThreadKill,
cmdThreadList,
cmdThreadRead,
cmdThreadShow,
cmdThreadStart,
cmdThreadStop,
cmdThreadStep,
cmdThreadStepDetails,
cmdThreadSteps,
THREAD_READ_DEFAULT_QUOTA,
type ThreadStatus,
} from "./commands/thread.js";
import { cmdWorkflowAdd, cmdWorkflowList, cmdWorkflowShow } from "./commands/workflow.js";
import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js";
import { formatOutput, type OutputFormat } from "./format.js";
import { resolveStorageRoot } from "./store.js";
@@ -51,27 +52,20 @@ const program = new Command();
const pkg = await import("../package.json", { with: { type: "json" } });
program
.name("uwf")
.description(
"Stateless workflow CLI\n\n" +
"Four-layer architecture:\n" +
" workflow → thread → step → turn\n" +
" 模板定义 执行实例 单步结果 agent内部交互",
)
.description("Stateless workflow CLI")
.version(pkg.default.version, "-V, --version");
program.option("--format <fmt>", "Output format: json or yaml", "json");
const workflow = program
.command("workflow")
.description("Workflow definitions (layer 1: templates)");
const workflow = program.command("workflow").description("Workflow registry and CAS");
workflow
.command("add")
.command("put")
.description("Register a workflow from YAML")
.argument("<file>", "Workflow YAML file")
.action((file: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdWorkflowAdd(storageRoot, file);
const result = await cmdWorkflowPut(storageRoot, file);
writeOutput(result);
});
});
@@ -99,7 +93,7 @@ workflow
});
});
const thread = program.command("thread").description("Thread execution (layer 2: instances)");
const thread = program.command("thread").description("Thread lifecycle and execution");
thread
.command("start")
@@ -115,46 +109,24 @@ thread
});
thread
.command("exec")
.command("step")
.description("Execute one or more steps")
.argument("<thread-id>", "Thread ULID")
.option("--agent <cmd>", "Override agent command")
.option("-c, --count <number>", "Number of steps to run (default: 1)")
.option("--background", "Run in background and return immediately")
.option("--_background-worker", "Internal flag for background worker process", false)
.action(
(
threadId: string,
opts: {
agent: string | undefined;
count: string | undefined;
background: boolean;
_backgroundWorker: boolean;
},
) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const count = opts.count !== undefined ? Number(opts.count) : 1;
const background = opts.background ?? false;
const backgroundWorker = opts._backgroundWorker ?? false;
const results = await cmdThreadExec(
storageRoot,
threadId,
agentOverride,
count,
background,
backgroundWorker,
);
if (results.length === 1) {
writeOutput(results[0]);
} else {
writeOutput(results);
}
});
},
);
.action((threadId: string, opts: { agent: string | undefined; count: string | undefined }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const count = opts.count !== undefined ? Number(opts.count) : 1;
const results = await cmdThreadStep(storageRoot, threadId, agentOverride, count);
if (results.length === 1) {
writeOutput(results[0]);
} else {
writeOutput(results);
}
});
});
thread
.command("show")
@@ -170,49 +142,36 @@ thread
thread
.command("list")
.description("List threads")
.option("--status <status>", "Filter by status: idle, running, or completed")
.action((opts: { status: string | undefined }) => {
.description("List active threads")
.option("--all", "Include archived threads")
.action((opts: { all: boolean }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const validStatuses: ThreadStatus[] = ["idle", "running", "completed"];
let statusFilter: ThreadStatus | null = null;
if (opts.status !== undefined) {
if (!validStatuses.includes(opts.status as ThreadStatus)) {
process.stderr.write(
`Invalid status: ${opts.status}. Must be one of: idle, running, completed\n`,
);
process.exit(1);
}
statusFilter = opts.status as ThreadStatus;
}
const result = await cmdThreadList(storageRoot, statusFilter);
const result = await cmdThreadList(storageRoot, opts.all);
writeOutput(result);
});
});
thread
.command("stop")
.description("Stop background execution of a thread (keep thread active)")
.command("kill")
.description("Terminate and archive a thread")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadStop(storageRoot, threadId);
const result = await cmdThreadKill(storageRoot, threadId);
writeOutput(result);
});
});
thread
.command("cancel")
.description("Cancel a thread (stop execution and move to history)")
.command("steps")
.description("List all steps in a thread")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadCancel(storageRoot, threadId);
const result = await cmdThreadSteps(storageRoot, threadId);
writeOutput(result);
});
});
@@ -246,141 +205,28 @@ thread
},
);
const step = program.command("step").description("Step results (layer 3: single cycle)");
step
.command("list")
.description("List all steps in a thread")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdStepList(storageRoot, threadId);
writeOutput(result);
});
});
step
.command("show")
.description("Show details of a specific step")
.argument("<step-hash>", "CAS hash of the StepNode")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const detail = await cmdStepShow(storageRoot, stepHash as CasRef);
writeOutput(detail);
});
});
// step read is not yet registered (half-baked, see step.ts cmdStepRead)
step
thread
.command("fork")
.description("Fork a thread from a specific step")
.argument("<step-hash>", "CAS hash of the StartNode or StepNode to fork from")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdStepFork(storageRoot, stepHash as CasRef);
const result = await cmdThreadFork(storageRoot, stepHash);
writeOutput(result);
});
});
// ── Deprecation Handlers ──────────────────────────────────────────────────────
// These commands have been removed. Show helpful error messages.
workflow
.command("put")
.description("[DEPRECATED] Use 'workflow add' instead")
.argument("<file>", "Workflow YAML file")
.action(() => {
process.stderr.write(`Error: Command 'workflow put' has been removed.
Use 'workflow add' instead.
For more information, see: uwf help workflow add
`);
process.exit(1);
});
thread
.command("step")
.description("[DEPRECATED] Use 'thread exec' instead")
.argument("<thread-id>", "Thread ULID")
.allowUnknownOption()
.action(() => {
process.stderr.write(`Error: Command 'thread step' has been removed.
Use 'thread exec' instead.
For more information, see: uwf help thread exec
`);
process.exit(1);
});
thread
.command("steps")
.description("[DEPRECATED] Use 'step list' instead")
.argument("<thread-id>", "Thread ULID")
.action(() => {
process.stderr.write(`Error: Command 'thread steps' has been removed.
Use 'step list' instead.
For more information, see: uwf help step list
`);
process.exit(1);
});
thread
.command("step-details")
.description("[DEPRECATED] Use 'step show' instead")
.argument("<step-hash>", "Step hash")
.action(() => {
process.stderr.write(`Error: Command 'thread step-details' has been removed.
Use 'step show' instead.
For more information, see: uwf help step show
`);
process.exit(1);
});
thread
.command("fork")
.description("[DEPRECATED] Use 'step fork' instead")
.argument("<step-hash>", "Step hash")
.action(() => {
process.stderr.write(`Error: Command 'thread fork' has been removed.
Use 'step fork' instead.
For more information, see: uwf help step fork
`);
process.exit(1);
});
thread
.command("kill")
.description("[DEPRECATED] Use 'thread stop' or 'thread cancel' instead")
.argument("<thread-id>", "Thread ULID")
.action(() => {
process.stderr.write(`Error: Command 'thread kill' has been removed.
Use 'thread stop' to stop background execution (keep thread active),
or 'thread cancel' to cancel and archive the thread.
For more information, see:
uwf help thread stop
uwf help thread cancel
`);
process.exit(1);
});
thread
.command("running")
.description("[DEPRECATED] Use 'thread list --status running' instead")
.action(() => {
process.stderr.write(`Error: Command 'thread running' has been removed.
Use 'thread list --status running' instead.
For more information, see: uwf help thread list
`);
process.exit(1);
.description("Dump the full detail node of a step as YAML")
.argument("<step-hash>", "CAS hash of the StepNode")
.action((stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const detail = await cmdThreadStepDetails(storageRoot, stepHash);
process.stdout.write(yamlStringify(detail));
});
});
const skill = program.command("skill").description("Built-in skill references for agents");
@@ -1,227 +0,0 @@
import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas";
import { getSchema } from "@uncaged/json-cas";
import type {
CasRef,
StartNodePayload,
StepNodePayload,
ThreadId,
} from "@uncaged/workflow-protocol";
import { loadThreadsIndex, type UwfStore } from "../store.js";
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
stepsNewestFirst: StepNodePayload[];
headIsStart: boolean;
};
type OrderedStepItem = {
hash: CasRef;
payload: StepNodePayload;
timestamp: number;
};
function fail(message: string): never {
process.stderr.write(`${message}\n`);
process.exit(1);
}
function walkChain(uwf: UwfStore, headHash: CasRef): ChainState {
const headNode = uwf.store.get(headHash);
if (headNode === null) {
fail(`CAS node not found: ${headHash}`);
}
if (headNode.type === uwf.schemas.startNode) {
return {
startHash: headHash,
start: headNode.payload as StartNodePayload,
stepsNewestFirst: [],
headIsStart: true,
};
}
if (headNode.type !== uwf.schemas.stepNode) {
fail(`head ${headHash} is not a StartNode or StepNode`);
}
const stepsNewestFirst: StepNodePayload[] = [];
let hash: CasRef | null = headHash;
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null) {
fail(`CAS node not found while walking chain: ${hash}`);
}
if (node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
stepsNewestFirst.push(payload);
hash = payload.prev;
}
const newest = stepsNewestFirst[0];
if (newest === undefined) {
fail(`empty step chain at head ${headHash}`);
}
const startNode = uwf.store.get(newest.start);
if (startNode === null || startNode.type !== uwf.schemas.startNode) {
fail(`StartNode not found: ${newest.start}`);
}
return {
startHash: newest.start,
start: startNode.payload as StartNodePayload,
stepsNewestFirst,
headIsStart: false,
};
}
function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown {
const node = uwf.store.get(outputRef);
if (node === null) {
return {};
}
return node.payload;
}
/**
* Recursively expand all cas_ref fields in a CAS node's payload,
* replacing hash strings with the referenced node's expanded payload.
*/
function expandDeep(store: CasStore, hash: CasRef, visited?: Set<string>): unknown {
const seen = visited ?? new Set<string>();
if (seen.has(hash)) return hash; // cycle guard
seen.add(hash);
const node = store.get(hash);
if (node === null) return hash;
const schema = getSchema(store, node.type);
if (schema === null) return node.payload;
return expandValue(store, schema, node.payload, seen);
}
function expandCasRefField(store: CasStore, value: unknown, visited: Set<string>): unknown {
if (typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
return value;
}
function expandAnyOfField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!Array.isArray(schema.anyOf)) return value;
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
function expandArrayField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!schema.items || !Array.isArray(value)) return value;
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
function expandObjectField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) {
return value;
}
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
function expandValue(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (schema.format === "cas_ref") return expandCasRefField(store, value, visited);
if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited);
if (schema.type === "array") return expandArrayField(store, schema, value, visited);
return expandObjectField(store, schema, value, visited);
}
function collectOrderedSteps(
uwf: UwfStore,
headHash: CasRef,
chain: ChainState,
): OrderedStepItem[] {
let hash: CasRef | null = headHash;
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null || node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
hashToNode.set(hash, { payload, timestamp: node.timestamp });
hash = payload.prev;
}
let cur: CasRef | null = chain.headIsStart ? null : headHash;
const ordered: OrderedStepItem[] = [];
while (cur !== null) {
const entry = hashToNode.get(cur);
if (entry === undefined) {
break;
}
ordered.push({ hash: cur, ...entry });
cur = entry.payload.prev;
}
ordered.reverse();
return ordered;
}
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
return head;
}
export {
type ChainState,
collectOrderedSteps,
expandAnyOfField,
expandArrayField,
expandCasRefField,
expandDeep,
expandObjectField,
expandOutput,
expandValue,
fail,
type OrderedStepItem,
resolveHeadHash,
walkChain,
};
-145
View File
@@ -1,145 +0,0 @@
import type {
CasRef,
StartEntry,
StepEntry,
StepNodePayload,
ThreadForkOutput,
ThreadId,
ThreadStepsOutput,
} from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js";
import {
collectOrderedSteps,
expandDeep,
expandOutput,
fail,
resolveHeadHash,
walkChain,
} from "./shared.js";
/**
* List all steps in a thread (previously: thread steps)
*/
export async function cmdStepList(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadStepsOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const startNode = uwf.store.get(chain.startHash);
if (startNode === null) {
fail(`StartNode not found: ${chain.startHash}`);
}
const startEntry: StartEntry = {
hash: chain.startHash,
workflow: chain.start.workflow,
prompt: chain.start.prompt,
timestamp: startNode.timestamp,
};
const stepEntries: StepEntry[] = [];
const ordered = collectOrderedSteps(uwf, headHash, chain);
for (const item of ordered) {
stepEntries.push({
hash: item.hash,
role: item.payload.role,
output: expandOutput(uwf, item.payload.output),
detail: item.payload.detail ?? null,
agent: item.payload.agent,
timestamp: item.timestamp,
});
}
return {
thread: threadId,
workflow: chain.start.workflow,
steps: [startEntry, ...stepEntries],
};
}
/**
* Show details of a specific step (previously: thread step-details)
*/
export async function cmdStepShow(storageRoot: string, stepHash: CasRef): Promise<unknown> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.detail) {
fail(`step ${stepHash} has no detail`);
}
return expandDeep(uwf.store, payload.detail);
}
/**
* Fork a thread from a specific step (previously: thread fork)
*/
export async function cmdStepFork(
storageRoot: string,
stepHash: CasRef,
): Promise<ThreadForkOutput> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StartNode or StepNode`);
}
const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
return {
thread: newThreadId,
forkedFrom: {
step: stepHash,
},
};
}
/**
* Read a step's agent output as markdown (new command - requires #462)
* TODO: Implement once unified agent detail/turn schema is available
*/
export async function cmdStepRead(
storageRoot: string,
stepHash: CasRef,
_before: number | null = null,
): Promise<string> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.output) {
fail(`step ${stepHash} has no output`);
}
// TODO: Implement progressive turn reading with --before N
// For now, return a placeholder
const outputNode = uwf.store.get(payload.output);
if (outputNode === null) {
fail(`output node not found: ${payload.output}`);
}
// Return the output as JSON for now
// Once #462 is implemented, this will properly format frontmatter + markdown
return JSON.stringify(outputNode.payload, null, 2);
}
+318 -209
View File
@@ -1,7 +1,8 @@
import { execFileSync, spawn } from "node:child_process";
import { execFileSync } from "node:child_process";
import { access, readFile } from "node:fs/promises";
import { dirname, isAbsolute, resolve as resolvePath } from "node:path";
import { validate } from "@uncaged/json-cas";
import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas";
import { getSchema, validate } from "@uncaged/json-cas";
import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-agent-kit";
import { evaluate } from "@uncaged/workflow-moderator";
import type {
@@ -9,26 +10,24 @@ import type {
AgentConfig,
CasRef,
ModeratorContext,
RunningThreadsOutput,
StartEntry,
StartNodePayload,
StartOutput,
StepContext,
StepEntry,
StepNodePayload,
StepOutput,
ThreadForkOutput,
ThreadId,
ThreadListItem,
ThreadStepsOutput,
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/workflow-protocol";
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv";
import { parse, stringify } from "yaml";
import {
createMarker,
deleteMarker,
isThreadRunning,
listRunningThreads,
} from "../background/index.js";
import {
appendThreadHistory,
createUwfStore,
@@ -42,14 +41,6 @@ import {
type UwfStore,
} from "../store.js";
import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js";
import {
type ChainState,
collectOrderedSteps,
expandOutput,
fail,
type OrderedStepItem,
walkChain,
} from "./shared.js";
import { materializeWorkflowPayload } from "./workflow.js";
const END_ROLE = "$END";
@@ -61,13 +52,35 @@ const PL_AGENT_SPAWN = "R5J2W8N4";
const PL_AGENT_DONE = "C6P9E3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6";
const PL_BACKGROUND_START = "X7Q4W9M2";
function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null);
fail(message);
}
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
stepsNewestFirst: StepNodePayload[];
headIsStart: boolean;
};
type OrderedStepItem = {
hash: CasRef;
payload: StepNodePayload;
timestamp: number;
};
export type KillOutput = {
thread: ThreadId;
archived: boolean;
};
function fail(message: string): never {
process.stderr.write(`${message}\n`);
process.exit(1);
}
/**
* Check if a string looks like a file path (contains path separators or has .yaml/.yml extension).
*/
@@ -308,7 +321,6 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId,
head: activeHead,
done: false,
background: null,
};
}
@@ -319,75 +331,230 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId,
head: hist.head,
done: true,
background: null,
};
}
fail(`thread not found: ${threadId}`);
}
export type ThreadStatus = "idle" | "running" | "completed";
export type ThreadListItemWithStatus = ThreadListItem & {
status: ThreadStatus;
};
async function threadListItemFromActive(
storageRoot: string,
uwf: UwfStore,
threadId: ThreadId,
head: CasRef,
): Promise<ThreadListItemWithStatus | null> {
): Promise<ThreadListItem | null> {
const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) {
return null;
}
// Check if thread is currently running in background
const runningMarker = await isThreadRunning(storageRoot, threadId);
const status: ThreadStatus = runningMarker !== null ? "running" : "idle";
return { thread: threadId, workflow, head, status };
return { thread: threadId, workflow, head };
}
export async function cmdThreadList(
storageRoot: string,
statusFilter: ThreadStatus | null,
): Promise<ThreadListItemWithStatus[]> {
includeAll: boolean,
): Promise<ThreadListItem[]> {
const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot);
const items: ThreadListItemWithStatus[] = [];
const items: ThreadListItem[] = [];
// Add active threads
for (const [threadId, head] of Object.entries(index)) {
const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, head);
const item = await threadListItemFromActive(uwf, threadId as ThreadId, head);
if (item !== null) {
items.push(item);
}
}
// Add completed threads if requested
if (statusFilter === "completed" || statusFilter === null) {
const activeIds = new Set(items.map((i) => i.thread));
const history = await loadThreadHistory(storageRoot);
for (const entry of history) {
if (!activeIds.has(entry.thread)) {
items.push({
thread: entry.thread,
workflow: entry.workflow,
head: entry.head,
status: "completed",
});
}
if (!includeAll) {
return items;
}
const activeIds = new Set(items.map((i) => i.thread));
const history = await loadThreadHistory(storageRoot);
for (const entry of history) {
if (!activeIds.has(entry.thread)) {
items.push({
thread: entry.thread,
workflow: entry.workflow,
head: entry.head,
});
}
}
// Apply status filter if provided
if (statusFilter !== null) {
return items.filter((item) => item.status === statusFilter);
return items;
}
function walkChain(uwf: UwfStore, headHash: CasRef): ChainState {
const headNode = uwf.store.get(headHash);
if (headNode === null) {
fail(`CAS node not found: ${headHash}`);
}
return items;
if (headNode.type === uwf.schemas.startNode) {
return {
startHash: headHash,
start: headNode.payload as StartNodePayload,
stepsNewestFirst: [],
headIsStart: true,
};
}
if (headNode.type !== uwf.schemas.stepNode) {
fail(`head ${headHash} is not a StartNode or StepNode`);
}
const stepsNewestFirst: StepNodePayload[] = [];
let hash: CasRef | null = headHash;
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null) {
fail(`CAS node not found while walking chain: ${hash}`);
}
if (node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
stepsNewestFirst.push(payload);
hash = payload.prev;
}
const newest = stepsNewestFirst[0];
if (newest === undefined) {
fail(`empty step chain at head ${headHash}`);
}
const startNode = uwf.store.get(newest.start);
if (startNode === null || startNode.type !== uwf.schemas.startNode) {
fail(`StartNode not found: ${newest.start}`);
}
return {
startHash: newest.start,
start: startNode.payload as StartNodePayload,
stepsNewestFirst,
headIsStart: false,
};
}
function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown {
const node = uwf.store.get(outputRef);
if (node === null) {
return {};
}
return node.payload;
}
/**
* Recursively expand all cas_ref fields in a CAS node's payload,
* replacing hash strings with the referenced node's expanded payload.
*/
function expandDeep(store: CasStore, hash: CasRef, visited?: Set<string>): unknown {
const seen = visited ?? new Set<string>();
if (seen.has(hash)) return hash; // cycle guard
seen.add(hash);
const node = store.get(hash);
if (node === null) return hash;
const schema = getSchema(store, node.type);
if (schema === null) return node.payload;
return expandValue(store, schema, node.payload, seen);
}
function expandCasRefField(store: CasStore, value: unknown, visited: Set<string>): unknown {
if (typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
return value;
}
function expandAnyOfField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!Array.isArray(schema.anyOf)) return value;
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
function expandArrayField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!schema.items || !Array.isArray(value)) return value;
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
function expandObjectField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) {
return value;
}
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
function expandValue(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (schema.format === "cas_ref") return expandCasRefField(store, value, visited);
if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited);
if (schema.type === "array") return expandArrayField(store, schema, value, visited);
return expandObjectField(store, schema, value, visited);
}
function collectOrderedSteps(
uwf: UwfStore,
headHash: CasRef,
chain: ChainState,
): OrderedStepItem[] {
let hash: CasRef | null = headHash;
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null || node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
hashToNode.set(hash, { payload, timestamp: node.timestamp });
hash = payload.prev;
}
let cur: CasRef | null = chain.headIsStart ? null : headHash;
const ordered: OrderedStepItem[] = [];
while (cur !== null) {
const entry = hashToNode.get(cur);
if (entry === undefined) {
break;
}
ordered.push({ hash: cur, ...entry });
cur = entry.payload.prev;
}
ordered.reverse();
return ordered;
}
function formatYaml(value: unknown): string {
@@ -489,14 +656,14 @@ function formatStepPrompt(
): string {
if (!roleDef || shownPromptRoles.has(role)) return "";
shownPromptRoles.add(role);
return ["", "", "<prompt>", roleDef.goal, "</prompt>"].join("\n");
return ["", "", "### Prompt", "", roleDef.goal].join("\n");
}
function formatStepContent(uwf: UwfStore, item: OrderedStepItem): string {
if (!item.payload.detail) return "";
const content = extractLastAssistantContent(uwf, item.payload.detail);
if (content === null) return "";
return ["", "", "<output>", content, "</output>"].join("\n");
return ["", "", "### Content", "", content].join("\n");
}
function formatStartSection(options: {
@@ -637,11 +804,13 @@ function spawnAgent(
role: string,
edgePrompt: string,
): CasRef {
const argv = [...agent.args, "--thread", threadId, "--role", role, "--prompt", edgePrompt];
const argv = [...agent.args, threadId, role];
const env = { ...process.env, UWF_EDGE_PROMPT: edgePrompt };
let stdout: string;
try {
stdout = execFileSync(agent.command, argv, {
encoding: "utf8",
env,
stdio: ["ignore", "pipe", "pipe"],
maxBuffer: 50 * 1024 * 1024, // 50 MB — stream-json output can be large
});
@@ -681,65 +850,31 @@ async function archiveThread(
});
}
export async function cmdThreadExec(
export async function cmdThreadStep(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
count: number,
background: boolean,
backgroundWorker: boolean,
): Promise<StepOutput[]> {
if (count < 1 || !Number.isInteger(count)) {
fail(`--count must be a positive integer, got: ${count}`);
}
// Check if thread is already running in background (unless we ARE the background worker)
if (!backgroundWorker) {
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
fail(`thread already executing in background (PID: ${runningMarker.pid})`);
}
}
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
if (background && !backgroundWorker) {
// Spawn background process
return cmdThreadStepBackground(storageRoot, threadId, agentOverride, count, plog, workflowHash);
}
// If we're the background worker, create marker before execution
let markerCreated = false;
if (backgroundWorker) {
await createMarker(storageRoot, {
thread: threadId,
workflow: workflowHash,
pid: process.pid,
startedAt: Date.now(),
});
markerCreated = true;
}
try {
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
}
}
return results;
} finally {
// Cleanup marker if we created one
if (markerCreated) {
await deleteMarker(storageRoot, threadId);
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
}
}
return results;
}
async function resolveActiveThreadWorkflowHash(
@@ -756,57 +891,6 @@ async function resolveActiveThreadWorkflowHash(
return chain.start.workflow;
}
async function cmdThreadStepBackground(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
count: number,
plog: ProcessLogger,
workflowHash: CasRef,
): Promise<StepOutput[]> {
// Get current head to return to caller
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
// Spawn detached background process
const scriptPath = process.argv[1];
if (scriptPath === undefined) {
failStep(plog, "unable to determine script path for background execution");
}
const args = ["thread", "exec", threadId, "--count", String(count)];
if (agentOverride !== null) {
args.push("--agent", agentOverride);
}
// Internal flag to signal the background worker to create/cleanup markers
args.push("--_background-worker");
plog.log(PL_BACKGROUND_START, `spawning background process count=${count}`, null);
const child = spawn(scriptPath, args, {
detached: true,
stdio: "ignore",
});
child.unref();
// Return immediately with current state and background flag
return [
{
workflow: workflowHash,
thread: threadId,
head: headHash,
done: false,
background: true,
},
];
}
async function cmdThreadStepOnce(
storageRoot: string,
threadId: ThreadId,
@@ -844,7 +928,6 @@ async function cmdThreadStepOnce(
thread: threadId,
head: headHash,
done: true,
background: null,
};
}
@@ -892,7 +975,6 @@ async function cmdThreadStepOnce(
thread: threadId,
head: newHead,
done,
background: null,
};
}
@@ -909,6 +991,47 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
fail(`thread not found: ${threadId}`);
}
export async function cmdThreadSteps(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadStepsOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const startNode = uwf.store.get(chain.startHash);
if (startNode === null) {
fail(`StartNode not found: ${chain.startHash}`);
}
const startEntry: StartEntry = {
hash: chain.startHash,
workflow: chain.start.workflow,
prompt: chain.start.prompt,
timestamp: startNode.timestamp,
};
const stepEntries: StepEntry[] = [];
const ordered = collectOrderedSteps(uwf, headHash, chain);
for (const item of ordered) {
stepEntries.push({
hash: item.hash,
role: item.payload.role,
output: expandOutput(uwf, item.payload.output),
detail: item.payload.detail,
agent: item.payload.agent,
timestamp: item.timestamp,
});
}
return {
thread: threadId,
workflow: chain.start.workflow,
steps: [startEntry, ...stepEntries],
};
}
export async function cmdThreadRead(
storageRoot: string,
threadId: ThreadId,
@@ -936,67 +1059,58 @@ export async function cmdThreadRead(
});
}
export type StopOutput = {
thread: ThreadId;
stopped: boolean;
};
export async function cmdThreadFork(
storageRoot: string,
stepHash: CasRef,
): Promise<ThreadForkOutput> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StartNode or StepNode`);
}
export type CancelOutput = {
thread: ThreadId;
cancelled: boolean;
};
/**
* Stop background execution of a thread (but keep thread active)
*/
export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> {
const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
index[newThreadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker === null) {
process.stderr.write(`Warning: thread ${threadId} is not currently running\n`);
return { thread: threadId, stopped: false };
}
try {
process.kill(runningMarker.pid, "SIGTERM");
} catch {
// Process may have already exited, ignore error
}
await deleteMarker(storageRoot, threadId);
return { thread: threadId, stopped: true };
return {
thread: newThreadId,
forkedFrom: {
step: stepHash,
},
};
}
/**
* Cancel a thread (stop execution + move to history)
*/
export async function cmdThreadCancel(
export async function cmdThreadStepDetails(
storageRoot: string,
threadId: ThreadId,
): Promise<CancelOutput> {
stepHash: CasRef,
): Promise<unknown> {
const uwf = await createUwfStore(storageRoot);
const node = uwf.store.get(stepHash);
if (node === null) {
fail(`CAS node not found: ${stepHash}`);
}
if (node.type !== uwf.schemas.stepNode) {
fail(`node ${stepHash} is not a StepNode`);
}
const payload = node.payload as StepNodePayload;
if (!payload.detail) {
fail(`step ${stepHash} has no detail`);
}
return expandDeep(uwf.store, payload.detail);
}
export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise<KillOutput> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
}
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
try {
process.kill(runningMarker.pid, "SIGTERM");
} catch {
// Process may have already exited, ignore error
}
await deleteMarker(storageRoot, threadId);
}
const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) {
@@ -1014,10 +1128,5 @@ export async function cmdThreadCancel(
};
await appendThreadHistory(storageRoot, historyEntry);
return { thread: threadId, cancelled: true };
}
export async function cmdThreadRunning(storageRoot: string): Promise<RunningThreadsOutput> {
const threads = await listRunningThreads(storageRoot);
return { threads };
return { thread: threadId, archived: true };
}
@@ -29,7 +29,7 @@ export type WorkflowListEntry = {
origin: WorkflowOrigin;
};
export type WorkflowAddOutput = {
export type WorkflowPutOutput = {
name: string;
hash: CasRef;
};
@@ -111,10 +111,10 @@ export async function materializeWorkflowPayload(
};
}
export async function cmdWorkflowAdd(
export async function cmdWorkflowPut(
storageRoot: string,
filePath: string,
): Promise<WorkflowAddOutput> {
): Promise<WorkflowPutOutput> {
let text: string;
try {
text = await readFile(filePath, "utf8");
@@ -19,14 +19,7 @@ mock.module("../src/tools/index.js", () => ({
getBuiltinTools: () => [],
}));
import {
executeTurnTools,
extractFinalText,
runBuiltinLoop,
shouldInjectDeadlineWarning,
shouldNudge,
shouldProcessToolCalls,
} from "../src/loop.js";
import { executeTurnTools, runBuiltinLoop, shouldNudge } from "../src/loop.js";
const fakeProvider = {} as any;
const fakeToolCtx = {} as any;
@@ -161,96 +154,3 @@ describe("runBuiltinLoop integration", () => {
expect(original.length).toBe(1);
});
});
describe("shouldInjectDeadlineWarning", () => {
test("5.1 returns true when turn count reaches warning threshold and not yet warned", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, false)).toBe(true);
});
test("5.2 returns false when already warned", () => {
expect(shouldInjectDeadlineWarning(7, 10, true, false)).toBe(false);
});
test("5.3 returns false when noTools is true", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, true)).toBe(false);
});
test("5.4 returns false when turns remaining > DEADLINE_WARNING_TURNS", () => {
expect(shouldInjectDeadlineWarning(5, 10, false, false)).toBe(false);
});
test("5.5 returns true when exactly at warning threshold", () => {
expect(shouldInjectDeadlineWarning(7, 10, false, false)).toBe(true);
});
test("5.6 returns false when turns remaining is 0", () => {
expect(shouldInjectDeadlineWarning(10, 10, false, false)).toBe(false);
});
});
describe("shouldProcessToolCalls", () => {
test("6.1 returns true when toolCalls present and noTools=false", () => {
expect(shouldProcessToolCalls([{ id: "x", name: "read", arguments: "{}" }], false)).toBe(true);
});
test("6.2 returns false when toolCalls is null", () => {
expect(shouldProcessToolCalls(null, false)).toBe(false);
});
test("6.3 returns false when toolCalls is empty array", () => {
expect(shouldProcessToolCalls([], false)).toBe(false);
});
test("6.4 returns false when noTools=true", () => {
expect(shouldProcessToolCalls([{ id: "x", name: "read", arguments: "{}" }], true)).toBe(false);
});
test("6.5 returns true when multiple tool calls present", () => {
expect(
shouldProcessToolCalls(
[
{ id: "x1", name: "read", arguments: "{}" },
{ id: "x2", name: "write", arguments: "{}" },
],
false,
),
).toBe(true);
});
});
describe("extractFinalText", () => {
test("7.1 returns last assistant message content", () => {
const messages = [
{ role: "system" as const, content: "sys", tool_calls: null },
{ role: "assistant" as const, content: "first", tool_calls: null },
{ role: "assistant" as const, content: "last", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("last");
});
test("7.2 returns empty string when no assistant messages", () => {
expect(extractFinalText([{ role: "system" as const, content: "sys", tool_calls: null }])).toBe(
"",
);
});
test("7.3 skips assistant messages with null content", () => {
const messages = [
{ role: "assistant" as const, content: "first", tool_calls: null },
{
role: "assistant" as const,
content: null,
tool_calls: [{ id: "x", name: "t", arguments: "{}" }],
},
{ role: "assistant" as const, content: "second", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("second");
});
test("7.4 skips assistant messages with empty content", () => {
const messages = [
{ role: "assistant" as const, content: "first", tool_calls: null },
{ role: "assistant" as const, content: "", tool_calls: null },
{ role: "user" as const, content: "nudge", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("first");
});
test("7.5 handles empty messages array", () => {
expect(extractFinalText([])).toBe("");
});
test("7.6 handles messages with only user and system roles", () => {
const messages = [
{ role: "system" as const, content: "sys", tool_calls: null },
{ role: "user" as const, content: "query", tool_calls: null },
];
expect(extractFinalText(messages)).toBe("");
});
});
+82 -191
View File
@@ -1,12 +1,7 @@
import type { ResolvedLlmProvider } from "@uncaged/workflow-agent-kit";
import { createLogger } from "@uncaged/workflow-util";
import {
type ChatMessage,
chatCompletionWithTools,
type LlmToolCall,
type OpenAiToolDefinition,
} from "./llm/index.js";
import { type ChatMessage, chatCompletionWithTools, type LlmToolCall } from "./llm/index.js";
import { appendSessionTurn } from "./session.js";
import {
builtinToolsToOpenAi,
@@ -85,184 +80,10 @@ export type ShouldNudgeOptions = {
const MAX_NUDGES = 3;
const DEADLINE_WARNING_TURNS = 3;
export function shouldInjectDeadlineWarning(
turn: number,
maxTurns: number,
alreadyWarned: boolean,
noTools: boolean,
): boolean {
const turnsRemaining = maxTurns - turn;
return (
!noTools && !alreadyWarned && turnsRemaining > 0 && turnsRemaining <= DEADLINE_WARNING_TURNS
);
}
export function shouldProcessToolCalls(toolCalls: LlmToolCall[] | null, noTools: boolean): boolean {
return !noTools && toolCalls !== null && toolCalls.length > 0;
}
export function extractFinalText(messages: ChatMessage[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
return msg.content;
}
}
return "";
}
function injectDeadlineWarning(messages: ChatMessage[], turnsRemaining: number): void {
log("4NRXW6KT", `${turnsRemaining} turns remaining, injecting deadline warning`);
messages.push({
role: "user",
content:
`⚠️ You have ${turnsRemaining} turns remaining. ` +
"Wrap up your work and output the YAML frontmatter starting with `---`. " +
"If you cannot finish in time, output frontmatter with `status: failed` and describe what remains.",
});
}
type HandleTextOnlyTurnResult = {
shouldBreak: boolean;
finalText: string;
turnCount: number;
nudgeCount: number;
turnAdjustment: number;
};
async function handleTextOnlyTurn(
text: string,
messages: ChatMessage[],
storageRoot: string,
sessionId: string,
noTools: boolean,
turn: number,
maxTurns: number,
currentNudgeCount: number,
): Promise<HandleTextOnlyTurnResult> {
await appendTurn(storageRoot, sessionId, {
role: "assistant",
content: text,
toolCalls: null,
reasoning: null,
});
const turnCount = 1;
let nudgeCount = currentNudgeCount;
let turnAdjustment = 0;
if (shouldNudge({ noTools, text, turn, maxTurns })) {
nudgeCount += 1;
log("7FXQM2KN", `text-only turn without frontmatter, nudge ${nudgeCount}/${MAX_NUDGES}`);
const nudge =
"You stopped calling tools but your response does not start with the required `---` YAML frontmatter. " +
"Either continue using tools to complete your work, or output your final response starting with `---`.";
messages.push({ role: "user", content: nudge });
// Nudge doesn't consume turn budget (up to MAX_NUDGES)
if (nudgeCount <= MAX_NUDGES) {
turnAdjustment = -1;
}
return { shouldBreak: false, finalText: "", turnCount, nudgeCount, turnAdjustment };
}
return { shouldBreak: true, finalText: text, turnCount, nudgeCount, turnAdjustment };
}
async function handleToolCallTurn(
content: string,
toolCalls: LlmToolCall[],
messages: ChatMessage[],
storageRoot: string,
sessionId: string,
toolCtx: ToolContext,
): Promise<number> {
await appendTurn(storageRoot, sessionId, {
role: "assistant",
content,
toolCalls: mapToolCallsForPayload(toolCalls),
reasoning: null,
});
let turnCount = 1;
// Execute tools
turnCount += await executeTurnTools(toolCalls, toolCtx, messages, storageRoot, sessionId);
return turnCount;
}
export function shouldNudge({ noTools, text, turn, maxTurns }: ShouldNudgeOptions): boolean {
return !noTools && !text.trimStart().startsWith("---") && turn < maxTurns - 1;
}
type ProcessLoopIterationResult = {
shouldBreak: boolean;
finalText: string;
turnCount: number;
nudgeCount: number;
turnAdjustment: number;
};
async function processLoopIteration(
options: RunBuiltinLoopOptions,
messages: ChatMessage[],
openAiTools: OpenAiToolDefinition[],
turn: number,
nudgeCount: number,
): Promise<ProcessLoopIterationResult> {
const response = await chatCompletionWithTools(
options.provider,
messages,
openAiTools.length > 0 ? openAiTools : null,
);
// When noTools is set, ignore any tool_calls the LLM might still return
const effectiveToolCalls = options.noTools ? null : (response.toolCalls ?? null);
const assistantMessage: ChatMessage = {
role: "assistant",
content: response.content,
tool_calls: effectiveToolCalls,
};
messages.push(assistantMessage);
if (!shouldProcessToolCalls(effectiveToolCalls, options.noTools)) {
const text = response.content ?? "";
const result = await handleTextOnlyTurn(
text,
messages,
options.storageRoot,
options.sessionId,
options.noTools,
turn,
options.maxTurns,
nudgeCount,
);
return result;
}
// At this point, effectiveToolCalls is guaranteed to be non-null and non-empty
const turnCount = await handleToolCallTurn(
response.content ?? "",
effectiveToolCalls as LlmToolCall[],
messages,
options.storageRoot,
options.sessionId,
options.toolCtx,
);
return {
shouldBreak: false,
finalText: "",
turnCount,
nudgeCount,
turnAdjustment: 0,
};
}
/** Agent run loop: LLM ↔ tools until no tool_calls or maxTurns. */
export async function runBuiltinLoop(
options: RunBuiltinLoopOptions,
@@ -278,25 +99,95 @@ export async function runBuiltinLoop(
log("8K2M4N7P", `builtin loop turn ${turn + 1}/${options.maxTurns}`);
// Warn agent when approaching turn limit
if (shouldInjectDeadlineWarning(turn, options.maxTurns, deadlineWarned, options.noTools)) {
const turnsRemaining = options.maxTurns - turn;
if (!options.noTools && !deadlineWarned && turnsRemaining <= DEADLINE_WARNING_TURNS) {
deadlineWarned = true;
const turnsRemaining = options.maxTurns - turn;
injectDeadlineWarning(messages, turnsRemaining);
log("4NRXW6KT", `${turnsRemaining} turns remaining, injecting deadline warning`);
messages.push({
role: "user",
content:
`⚠️ You have ${turnsRemaining} turns remaining. ` +
"Wrap up your work and output the YAML frontmatter starting with `---`. " +
"If you cannot finish in time, output frontmatter with `status: failed` and describe what remains.",
});
}
const result = await processLoopIteration(options, messages, openAiTools, turn, nudgeCount);
turnCount += result.turnCount;
nudgeCount = result.nudgeCount;
turn += result.turnAdjustment;
const response = await chatCompletionWithTools(
options.provider,
messages,
openAiTools.length > 0 ? openAiTools : null,
);
if (result.shouldBreak) {
finalText = result.finalText;
// When noTools is set, ignore any tool_calls the LLM might still return
const effectiveToolCalls = options.noTools ? null : (response.toolCalls ?? null);
const assistantMessage: ChatMessage = {
role: "assistant",
content: response.content,
tool_calls: effectiveToolCalls,
};
messages.push(assistantMessage);
if (effectiveToolCalls === null || effectiveToolCalls.length === 0) {
const text = response.content ?? "";
await appendTurn(options.storageRoot, options.sessionId, {
role: "assistant",
content: text,
toolCalls: null,
reasoning: null,
});
turnCount += 1;
if (shouldNudge({ noTools: options.noTools, text, turn, maxTurns: options.maxTurns })) {
nudgeCount += 1;
log("7FXQM2KN", `text-only turn without frontmatter, nudge ${nudgeCount}/${MAX_NUDGES}`);
const nudge =
"You stopped calling tools but your response does not start with the required `---` YAML frontmatter. " +
"Either continue using tools to complete your work, or output your final response starting with `---`.";
messages.push({ role: "user", content: nudge });
// Nudge doesn't consume turn budget (up to MAX_NUDGES)
if (nudgeCount <= MAX_NUDGES) {
turn -= 1;
}
continue;
}
finalText = text;
break;
}
// Assistant turn with tool calls
await appendTurn(options.storageRoot, options.sessionId, {
role: "assistant",
content: response.content ?? "",
toolCalls: mapToolCallsForPayload(effectiveToolCalls),
reasoning: null,
});
turnCount += 1;
// Execute tools
turnCount += await executeTurnTools(
effectiveToolCalls,
options.toolCtx,
messages,
options.storageRoot,
options.sessionId,
);
}
if (finalText === "") {
finalText = extractFinalText(messages);
if (finalText === "" && messages.length > 0) {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
finalText = msg.content;
break;
}
}
}
return { finalText, messages, turnCount };
@@ -154,99 +154,6 @@ describe("parseClaudeCodeStreamOutput", () => {
});
});
describe("parseClaudeCodeStreamOutput — helper extraction", () => {
test("processSystemLine sets model from system message", () => {
const lines = [
JSON.stringify({ type: "system", model: "claude-opus-4" }),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.model).toBe("claude-opus-4");
});
test("processAssistantLine skips empty content", () => {
const lines = [
JSON.stringify({ type: "assistant", message: { role: "assistant", content: [] } }),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(0);
});
test("processUserLine skips when no tool_result items", () => {
const lines = [
JSON.stringify({
type: "user",
message: { role: "user", content: [{ type: "text", text: "hi" }] },
}),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 0,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(0);
});
test("turn indices are sequential across mixed assistant and user lines", () => {
const lines = [
JSON.stringify({
type: "assistant",
message: { role: "assistant", content: [{ type: "text", text: "A" }] },
}),
JSON.stringify({
type: "user",
message: { role: "user", content: [{ type: "tool_result", content: "R" }] },
}),
JSON.stringify({
type: "assistant",
message: { role: "assistant", content: [{ type: "text", text: "B" }] },
}),
JSON.stringify({
type: "result",
subtype: "success",
result: "ok",
session_id: "s1",
num_turns: 3,
total_cost_usd: 0,
duration_ms: 0,
stop_reason: "end_turn",
}),
];
const parsed = parseClaudeCodeStreamOutput(lines.join("\n"));
expect(parsed).not.toBeNull();
expect(parsed!.turns).toHaveLength(3);
expect(parsed!.turns.map((t) => t.index)).toEqual([0, 1, 2]);
});
});
describe("storeClaudeCodeDetail", () => {
const baseParsed: ClaudeCodeParsedResult = {
type: "result",
@@ -146,13 +146,13 @@ async function runClaudeCode(ctx: AgentContext): Promise<AgentRunResult> {
// Try resuming a cached session for re-entry scenarios (e.g. reviewer reject → developer re-entry).
if (!ctx.isFirstVisit) {
const cachedSessionId = await getCachedSessionId("claude-code", ctx.threadId, ctx.role);
const cachedSessionId = await getCachedSessionId(ctx.threadId, ctx.role);
if (cachedSessionId !== null) {
try {
const { stdout } = await spawnClaudeResume(cachedSessionId, fullPrompt);
const result = await processClaudeOutput(stdout, ctx.store);
if (result.sessionId !== undefined && result.sessionId !== "") {
await setCachedSessionId("claude-code", ctx.threadId, ctx.role, result.sessionId);
await setCachedSessionId(ctx.threadId, ctx.role, result.sessionId);
}
return result;
} catch (err) {
@@ -169,7 +169,7 @@ async function runClaudeCode(ctx: AgentContext): Promise<AgentRunResult> {
const { stdout } = await spawnClaudeRun(fullPrompt);
const result = await processClaudeOutput(stdout, ctx.store);
if (result.sessionId !== undefined && result.sessionId !== "") {
await setCachedSessionId("claude-code", ctx.threadId, ctx.role, result.sessionId);
await setCachedSessionId(ctx.threadId, ctx.role, result.sessionId);
}
return result;
}
@@ -34,7 +34,7 @@ export const CLAUDE_CODE_DETAIL_SCHEMA: JSONSchema = {
},
turns: {
type: "array",
items: { type: "string", format: "cas_ref" },
items: { type: "string" },
},
},
additionalProperties: false,
@@ -67,103 +67,99 @@ function extractToolResultContent(content: unknown[]): string {
return results.join("\n");
}
type ParseState = {
turns: ClaudeCodeTurnPayload[];
resultLine: Record<string, unknown> | null;
model: string;
turnIndex: number;
};
function processSystemLine(parsed: Record<string, unknown>, state: ParseState): void {
if (typeof parsed.model === "string") {
state.model = parsed.model;
}
}
function processAssistantLine(parsed: Record<string, unknown>, state: ParseState): void {
if (!isRecord(parsed.message)) return;
const content = Array.isArray(parsed.message.content) ? parsed.message.content : [];
const textContent = extractTextContent(content as unknown[]);
const toolCalls = extractToolCalls(content as unknown[]);
if (textContent !== "" || toolCalls.length > 0) {
state.turns.push({
index: state.turnIndex++,
role: "assistant",
content: textContent,
toolCalls: toolCalls.length > 0 ? toolCalls : null,
});
}
}
function processUserLine(parsed: Record<string, unknown>, state: ParseState): void {
if (!isRecord(parsed.message)) return;
const content = Array.isArray(parsed.message.content) ? parsed.message.content : [];
const resultContent = extractToolResultContent(content as unknown[]);
if (resultContent !== "") {
state.turns.push({
index: state.turnIndex++,
role: "tool_result",
content: resultContent,
toolCalls: null,
});
}
}
function processLine(line: string, state: ParseState): void {
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
if (!isRecord(parsed)) return;
const type = parsed.type;
if (type === "system") processSystemLine(parsed, state);
else if (type === "assistant") processAssistantLine(parsed, state);
else if (type === "user") processUserLine(parsed, state);
else if (type === "result") state.resultLine = parsed;
}
function assembleResult(state: ParseState): ClaudeCodeParsedResult | null {
if (state.resultLine === null) return null;
const sessionId = state.resultLine.session_id;
const result = state.resultLine.result;
const subtype = state.resultLine.subtype;
if (typeof sessionId !== "string" || typeof result !== "string" || typeof subtype !== "string") {
return null;
}
const usage = isRecord(state.resultLine.usage) ? state.resultLine.usage : {};
return {
type: safeString(state.resultLine.type, "result"),
subtype: subtype as ClaudeCodeParsedResult["subtype"],
result,
sessionId,
numTurns: safeNumber(state.resultLine.num_turns),
totalCostUsd: safeNumber(state.resultLine.total_cost_usd),
durationMs: safeNumber(state.resultLine.duration_ms),
model: state.model,
stopReason: safeString(state.resultLine.stop_reason),
usage: {
inputTokens: safeNumber(usage.input_tokens),
outputTokens: safeNumber(usage.output_tokens),
cacheReadInputTokens: safeNumber(usage.cache_read_input_tokens),
cacheCreationInputTokens: safeNumber(usage.cache_creation_input_tokens),
},
turns: state.turns,
};
}
/**
* Parse Claude Code stream-json (NDJSON) output.
* Each line is a JSON object with type: "system" | "assistant" | "user" | "result".
*/
export function parseClaudeCodeStreamOutput(stdout: string): ClaudeCodeParsedResult | null {
const lines = stdout.trim().split("\n");
const state: ParseState = { turns: [], resultLine: null, model: "", turnIndex: 0 };
const turns: ClaudeCodeTurnPayload[] = [];
let resultLine: Record<string, unknown> | null = null;
let model = "";
let turnIndex = 0;
for (const line of lines) {
processLine(line, state);
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
continue;
}
if (!isRecord(parsed)) continue;
const type = parsed.type;
if (type === "system" && typeof parsed.model === "string") {
model = parsed.model;
}
if (type === "assistant" && isRecord(parsed.message)) {
const msg = parsed.message;
const content = Array.isArray(msg.content) ? msg.content : [];
const textContent = extractTextContent(content as unknown[]);
const toolCalls = extractToolCalls(content as unknown[]);
// Only record turns that have actual content
if (textContent !== "" || toolCalls.length > 0) {
turns.push({
index: turnIndex++,
role: "assistant",
content: textContent,
toolCalls: toolCalls.length > 0 ? toolCalls : null,
});
}
}
if (type === "user" && isRecord(parsed.message)) {
const msg = parsed.message;
const content = Array.isArray(msg.content) ? msg.content : [];
const resultContent = extractToolResultContent(content as unknown[]);
if (resultContent !== "") {
turns.push({
index: turnIndex++,
role: "tool_result",
content: resultContent,
toolCalls: null,
});
}
}
if (type === "result") {
resultLine = parsed;
}
}
return assembleResult(state);
if (resultLine === null) return null;
const sessionId = resultLine.session_id;
const result = resultLine.result;
const subtype = resultLine.subtype;
if (typeof sessionId !== "string" || typeof result !== "string" || typeof subtype !== "string") {
return null;
}
const usage = isRecord(resultLine.usage) ? resultLine.usage : {};
return {
type: safeString(resultLine.type, "result"),
subtype: subtype as ClaudeCodeParsedResult["subtype"],
result,
sessionId,
numTurns: safeNumber(resultLine.num_turns),
totalCostUsd: safeNumber(resultLine.total_cost_usd),
durationMs: safeNumber(resultLine.duration_ms),
model,
stopReason: safeString(resultLine.stop_reason),
usage: {
inputTokens: safeNumber(usage.input_tokens),
outputTokens: safeNumber(usage.output_tokens),
cacheReadInputTokens: safeNumber(usage.cache_read_input_tokens),
cacheCreationInputTokens: safeNumber(usage.cache_creation_input_tokens),
},
turns,
};
}
/**
@@ -4,96 +4,6 @@ import { HermesAcpClient } from "../src/acp-client.js";
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
describe("handleSessionUpdate — helper extraction", () => {
let client: HermesAcpClient;
beforeEach(() => {
client = new HermesAcpClient();
});
afterEach(async () => {
await client.close();
});
it("agent_message_chunk accumulates text in messageChunks", () => {
(client as any).handleSessionUpdate({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello" },
});
(client as any).handleSessionUpdate({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " world" },
});
expect((client as any).messageChunks).toEqual(["hello", " world"]);
});
it("agent_thought_chunk accumulates reasoning in reasoningChunks", () => {
(client as any).handleSessionUpdate({
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking" },
});
expect((client as any).reasoningChunks).toEqual(["thinking"]);
});
it("tool_call registers a pending tool and flushes message chunks", () => {
(client as any).messageChunks = ["pre-tool text"];
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call",
title: "Bash",
rawInput: { command: "ls" },
toolCallId: "tc-1",
});
expect((client as any).pendingTools.get("tc-1")).toEqual({
name: "Bash",
args: JSON.stringify({ command: "ls" }),
});
expect((client as any).messageChunks).toEqual([]);
expect((client as any).messages).toHaveLength(1);
expect((client as any).messages[0].role).toBe("assistant");
});
it("tool_call_update completed pushes tool_call and tool messages", () => {
(client as any).pendingTools.set("tc-2", { name: "Read", args: '{"path":"/foo"}' });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-2",
rawOutput: "file contents",
});
const msgs = (client as any).messages as Array<{
role: string;
tool_calls: unknown;
content: string | null;
}>;
expect(msgs).toHaveLength(2);
expect(msgs[0].role).toBe("assistant");
expect(msgs[0].tool_calls).toEqual([
{ function: { name: "Read", arguments: '{"path":"/foo"}' } },
]);
expect(msgs[1].role).toBe("tool");
expect(msgs[1].content).toBe("file contents");
expect((client as any).pendingTools.has("tc-2")).toBe(false);
});
it("tool_call_update with non-string rawOutput JSON-stringifies it", () => {
(client as any).pendingTools.set("tc-3", { name: "Fetch", args: "" });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-3",
rawOutput: { html: "<p>page</p>" },
});
const msgs = (client as any).messages as Array<{ role: string; content: string | null }>;
expect(msgs[1].content).toBe(JSON.stringify({ html: "<p>page</p>" }));
});
it("unknown updateType is a no-op", () => {
(client as any).handleSessionUpdate({ sessionUpdate: "unknown_type", data: {} });
expect((client as any).messages).toHaveLength(0);
expect((client as any).messageChunks).toHaveLength(0);
});
});
describe("HermesAcpClient", () => {
let client: HermesAcpClient;
@@ -245,75 +245,72 @@ export class HermesAcpClient {
// ---- Session update → structured messages ----
private handleSessionUpdate(update: Record<string, unknown>): void {
switch (update.sessionUpdate as string) {
case "agent_message_chunk":
this.handleAgentMessageChunk(update);
const updateType = update.sessionUpdate as string;
switch (updateType) {
case "agent_message_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
break;
case "agent_thought_chunk":
this.handleAgentThoughtChunk(update);
}
case "agent_thought_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
break;
case "tool_call":
this.handleToolCall(update);
}
case "tool_call": {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args = rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
// Flush accumulated assistant text before tool call
this.flushAssistantMessage();
break;
case "tool_call_update":
this.handleToolCallUpdate(update);
}
case "tool_call_update": {
const status = update.status as string | undefined;
if (status === "completed" || status === "failed") {
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
break;
}
default:
break;
}
}
private handleAgentMessageChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
}
private handleAgentThoughtChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
}
private handleToolCall(update: Record<string, unknown>): void {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args = rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
this.flushAssistantMessage();
}
private handleToolCallUpdate(update: Record<string, unknown>): void {
const status = update.status as string | undefined;
if (status !== "completed" && status !== "failed") return;
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
/** Flush any accumulated text/reasoning into an assistant message. */
private flushAssistantMessage(): void {
const text = this.messageChunks.join("");
@@ -1,22 +1,5 @@
// Re-export session cache from the shared agent-kit package with agent name injected.
import {
getCachedSessionId as getCachedSessionIdBase,
setCachedSessionId as setCachedSessionIdBase,
} from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
return getCachedSessionIdBase("hermes", threadId, role);
}
export async function setCachedSessionId(
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
return setCachedSessionIdBase("hermes", threadId, role, sessionId);
}
// Re-export session cache from the shared agent-kit package.
export { getCachedSessionId, setCachedSessionId } from "@uncaged/workflow-agent-kit";
export function isResumeDisabled(): boolean {
// Hermes ACP session/resume is broken: _restore fails for custom providers
@@ -1,247 +0,0 @@
import { mkdir, readdir, readFile, rm, stat, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { getCachedSessionId, getCachePath, setCachedSessionId } from "../src/session-cache.js";
import { resolveStorageRoot } from "../src/storage.js";
describe("session-cache", () => {
let originalStorageRoot: string;
let testStorageRoot: string;
beforeEach(async () => {
// Create a temporary test storage root
originalStorageRoot = resolveStorageRoot();
testStorageRoot = join(originalStorageRoot, "test-cache", `test-${Date.now()}`);
await mkdir(testStorageRoot, { recursive: true });
// Override the storage root for testing
process.env.WORKFLOW_STORAGE_ROOT = testStorageRoot;
});
afterEach(async () => {
// Clean up test storage root
await rm(testStorageRoot, { recursive: true, force: true });
delete process.env.WORKFLOW_STORAGE_ROOT;
});
describe("getCachePath", () => {
test("returns agent-specific file path", () => {
const path = getCachePath("claude-code");
expect(path).toMatch(/\/cache\/claude-code-sessions\.json$/);
});
test("returns different paths for different agents", () => {
const pathClaudeCode = getCachePath("claude-code");
const pathHermes = getCachePath("hermes");
expect(pathClaudeCode).not.toBe(pathHermes);
expect(pathClaudeCode).toMatch(/claude-code-sessions\.json$/);
expect(pathHermes).toMatch(/hermes-sessions\.json$/);
});
test("handles agent names with special characters", () => {
const path1 = getCachePath("my-agent");
const path2 = getCachePath("my_agent");
expect(path1).toMatch(/my-agent-sessions\.json$/);
expect(path2).toMatch(/my_agent-sessions\.json$/);
});
});
describe("session isolation", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("sessions are isolated per agent", async () => {
// Cache different session IDs for each agent
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Each agent should retrieve its own session ID
const sessionCC = await getCachedSessionId("claude-code", threadId, role);
const sessionHermes = await getCachedSessionId("hermes", threadId, role);
expect(sessionCC).toBe("session-cc-001");
expect(sessionHermes).toBe("session-hermes-001");
});
test("updating one agent's cache does not affect another", async () => {
// Set initial sessions for both agents
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Update claude-code's session
await setCachedSessionId("claude-code", threadId, role, "session-cc-002");
// Hermes's session should remain unchanged
const sessionHermes = await getCachedSessionId("hermes", threadId, role);
expect(sessionHermes).toBe("session-hermes-001");
// Claude-code should have the new session
const sessionCC = await getCachedSessionId("claude-code", threadId, role);
expect(sessionCC).toBe("session-cc-002");
});
test("missing session returns null for specific agent", async () => {
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("empty session ID is treated as missing", async () => {
await setCachedSessionId("claude-code", threadId, role, "");
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
});
describe("file system operations", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("cache directory is created if missing", async () => {
const cachePath = getCachePath("claude-code");
const cacheDir = dirname(cachePath);
// Ensure cache dir doesn't exist
await rm(cacheDir, { recursive: true, force: true });
// Write a session
await setCachedSessionId("claude-code", threadId, role, "session-001");
// Cache directory should be created
const stats = await stat(cacheDir);
expect(stats.isDirectory()).toBe(true);
});
test("multiple agents create separate cache files", async () => {
// Cache sessions for multiple agents
await setCachedSessionId("claude-code", threadId, role, "session-cc-001");
await setCachedSessionId("hermes", threadId, role, "session-hermes-001");
// Separate cache files should exist
const pathCC = getCachePath("claude-code");
const pathHermes = getCachePath("hermes");
const contentCC = JSON.parse(await readFile(pathCC, "utf8")) as Record<string, string>;
const contentHermes = JSON.parse(await readFile(pathHermes, "utf8")) as Record<
string,
string
>;
expect(contentCC).toHaveProperty(`${threadId}:${role}`, "session-cc-001");
expect(contentHermes).toHaveProperty(`${threadId}:${role}`, "session-hermes-001");
});
test("atomic writes prevent partial reads", async () => {
// Write a session
await setCachedSessionId("claude-code", threadId, role, "session-001");
// The final file should exist (no .tmp files left behind)
const cachePath = getCachePath("claude-code");
const dir = dirname(cachePath);
const files = await readdir(dir);
expect(files).toContain("claude-code-sessions.json");
expect(files.every((f) => !f.endsWith(".tmp"))).toBe(true);
});
});
describe("legacy migration", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("old agent-sessions.json is ignored", async () => {
// Create old agent-sessions.json file
const oldCachePath = join(resolveStorageRoot(), "cache", "agent-sessions.json");
await mkdir(dirname(oldCachePath), { recursive: true });
await writeFile(
oldCachePath,
JSON.stringify({
"01234567890123456789012345:developer": "old-session-001",
}),
"utf8",
);
// Query with the new per-agent cache
const session = await getCachedSessionId("claude-code", threadId, role);
// Should return null (old cache is ignored)
expect(session).toBeNull();
});
test("new per-agent cache takes precedence", async () => {
// Create both old and new cache files
const oldPath = join(resolveStorageRoot(), "cache", "agent-sessions.json");
await mkdir(dirname(oldPath), { recursive: true });
await writeFile(
oldPath,
JSON.stringify({
[`${threadId}:${role}`]: "old-session",
}),
"utf8",
);
await setCachedSessionId("claude-code", threadId, role, "new-session");
// The new per-agent cache value should be returned
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBe("new-session");
});
});
describe("error handling", () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
test("invalid JSON in cache file returns empty cache", async () => {
// Create a corrupted cache file
const cachePath = getCachePath("claude-code");
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, "{ invalid json }", "utf8");
// Should return null (treating corrupted cache as empty)
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("non-object JSON in cache file returns empty cache", async () => {
// Create a cache file with non-object JSON
const cachePath = getCachePath("claude-code");
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, JSON.stringify(["not", "an", "object"]), "utf8");
// Should return null
const session = await getCachedSessionId("claude-code", threadId, role);
expect(session).toBeNull();
});
test("cache entries with non-string values are ignored", async () => {
// Create a cache file with mixed types
const cachePath = getCachePath("claude-code");
const cacheData = {
"thread1:role1": "valid-session",
"thread2:role2": 12345, // number
"thread3:role3": null, // null
"thread4:role4": "", // empty string
};
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, JSON.stringify(cacheData), "utf8");
// Valid string entries should be returned
const session1 = await getCachedSessionId("claude-code", "thread1" as ThreadId, "role1");
expect(session1).toBe("valid-session");
// Invalid entries should return null
const session2 = await getCachedSessionId("claude-code", "thread2" as ThreadId, "role2");
const session3 = await getCachedSessionId("claude-code", "thread3" as ThreadId, "role3");
const session4 = await getCachedSessionId("claude-code", "thread4" as ThreadId, "role4");
expect(session2).toBeNull();
expect(session3).toBeNull();
expect(session4).toBeNull(); // empty string is treated as missing
});
});
});
+11 -6
View File
@@ -21,6 +21,14 @@ function fail(message: string): never {
throw new Error(message);
}
function readEdgePrompt(): string {
const value = process.env.UWF_EDGE_PROMPT;
if (value === undefined || value === "") {
fail("UWF_EDGE_PROMPT environment variable is required");
}
return value;
}
function walkChain(store: Store, schemas: AgentStore["schemas"], headHash: CasRef): ChainState {
const headNode = store.get(headHash);
if (headNode === null) {
@@ -115,11 +123,7 @@ async function loadWorkflow(store: Store, schemas: AgentStore["schemas"], workfl
* Build agent execution context from thread head in threads.yaml.
* Walks the CAS chain from head to StartNode and expands step outputs.
*/
export async function buildContext(
threadId: ThreadId,
role: string,
edgePrompt: string,
): Promise<AgentContext> {
export async function buildContext(threadId: ThreadId, role: string): Promise<AgentContext> {
const storageRoot = resolveStorageRoot();
const agentStore = await createAgentStore(storageRoot);
const { store, schemas } = agentStore;
@@ -138,6 +142,7 @@ export async function buildContext(
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
@@ -167,7 +172,6 @@ export type BuildContextMeta = {
export async function buildContextWithMeta(
threadId: ThreadId,
role: string,
edgePrompt: string,
): Promise<AgentContext & { meta: BuildContextMeta }> {
const storageRoot = resolveStorageRoot();
const agentStore = await createAgentStore(storageRoot);
@@ -187,6 +191,7 @@ export async function buildContextWithMeta(
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
+1 -1
View File
@@ -12,7 +12,7 @@ export {
export type { FrontmatterFastPathResult } from "./frontmatter.js";
export { tryFrontmatterFastPath } from "./frontmatter.js";
export { createAgent } from "./run.js";
export { getCachedSessionId, getCachePath, setCachedSessionId } from "./session-cache.js";
export { getCachedSessionId, setCachedSessionId } from "./session-cache.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type {
AgentContext,
+11 -19
View File
@@ -22,24 +22,16 @@ function agentLabel(name: string): string {
return `uwf-${name}`;
}
const USAGE = "usage: <agent-cli> --thread <id> --role <role> --prompt <text>";
function getNamedArg(argv: string[], name: string): string {
const idx = argv.indexOf(name);
if (idx === -1 || idx + 1 >= argv.length) {
return "";
function parseArgv(argv: string[]): { threadId: ThreadId; role: string } {
const threadId = argv[2];
const role = argv[3];
if (threadId === undefined || threadId === "") {
fail("usage: <agent-cli> <thread-id> <role>");
}
return argv[idx + 1];
}
function parseArgv(argv: string[]): { threadId: ThreadId; role: string; prompt: string } {
const threadId = getNamedArg(argv, "--thread");
const role = getNamedArg(argv, "--role");
const prompt = getNamedArg(argv, "--prompt");
if (threadId === "") fail(USAGE);
if (role === "") fail(USAGE);
if (prompt === "") fail(USAGE);
return { threadId: threadId as ThreadId, role, prompt };
if (role === undefined || role === "") {
fail("usage: <agent-cli> <thread-id> <role>");
}
return { threadId: threadId as ThreadId, role };
}
function runWithMessage<T>(label: string, fn: () => Promise<T>): Promise<T> {
@@ -111,11 +103,11 @@ async function persistStep(options: {
export function createAgent(options: AgentOptions): () => Promise<void> {
return async function main(): Promise<void> {
const { threadId, role, prompt } = parseArgv(process.argv);
const { threadId, role } = parseArgv(process.argv);
const storageRoot = resolveStorageRoot();
loadDotenv({ path: getEnvPath(storageRoot) });
const ctx = await runWithMessage("context", () => buildContextWithMeta(threadId, role, prompt));
const ctx = await runWithMessage("context", () => buildContextWithMeta(threadId, role));
const roleDef = ctx.workflow.roles[role];
if (roleDef === undefined) {
@@ -8,8 +8,8 @@ import { resolveStorageRoot } from "./storage.js";
type SessionCache = Record<string, string>;
export function getCachePath(agentName: string): string {
return join(resolveStorageRoot(), "cache", `${agentName}-sessions.json`);
function getCachePath(): string {
return join(resolveStorageRoot(), "cache", "agent-sessions.json");
}
function cacheKey(threadId: ThreadId, role: string): string {
@@ -20,8 +20,8 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
async function readCache(agentName: string): Promise<SessionCache> {
const path = getCachePath(agentName);
async function readCache(): Promise<SessionCache> {
const path = getCachePath();
try {
const text = await readFile(path, "utf8");
const raw = JSON.parse(text) as unknown;
@@ -40,45 +40,36 @@ async function readCache(agentName: string): Promise<SessionCache> {
if (err.code === "ENOENT") {
return {};
}
// Treat JSON parse errors as empty cache
if (err.name === "SyntaxError") {
return {};
}
throw e;
}
}
async function writeCache(agentName: string, cache: SessionCache): Promise<void> {
const path = getCachePath(agentName);
async function writeCache(cache: SessionCache): Promise<void> {
const path = getCachePath();
const dir = dirname(path);
await mkdir(dir, { recursive: true });
// Atomic write: write to temp file then rename to avoid partial reads on concurrent access.
// NOTE: Current workflow execution is serial (execFileSync), so true concurrency doesn't occur.
// This is a safety net for future parallel execution.
const tmpPath = join(dir, `.${agentName}-sessions.${randomBytes(4).toString("hex")}.tmp`);
const tmpPath = join(dir, `.agent-sessions.${randomBytes(4).toString("hex")}.tmp`);
await writeFile(tmpPath, `${JSON.stringify(cache, null, 2)}\n`, "utf8");
await rename(tmpPath, path);
}
/** Read the cached session ID for a thread+role pair. */
export async function getCachedSessionId(
agentName: string,
threadId: ThreadId,
role: string,
): Promise<string | null> {
const cache = await readCache(agentName);
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
const cache = await readCache();
const sessionId = cache[cacheKey(threadId, role)];
return sessionId ?? null;
}
/** Write the session ID for a thread+role pair into the cache. */
export async function setCachedSessionId(
agentName: string,
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
const cache = await readCache(agentName);
const cache = await readCache();
cache[cacheKey(threadId, role)] = sessionId;
await writeCache(agentName, cache);
await writeCache(cache);
}
+1 -1
View File
@@ -13,7 +13,7 @@ export type AgentContext = ModeratorContext & {
*/
outputFormatInstruction: string;
/**
* Edge prompt from the graph transition that led to this role (--prompt CLI arg).
* Edge prompt from the graph transition that led to this role (UWF_EDGE_PROMPT).
* Always the real moderator instruction for this step.
*/
edgePrompt: string;
@@ -1,6 +1,7 @@
import path from "node:path";
import { defineConfig } from "vitest/config";
// biome-ignore lint/style/noDefaultExport: Vitest loads config from default export.
export default defineConfig({
test: {
environment: "node",
-2
View File
@@ -15,8 +15,6 @@ export type {
ProviderConfig,
RoleDefinition,
RoleName,
RunningThreadItem,
RunningThreadsOutput,
Scenario,
StartEntry,
StartNodePayload,
-14
View File
@@ -84,7 +84,6 @@ export type StepOutput = {
thread: ThreadId;
head: CasRef;
done: boolean;
background: boolean | null;
};
/** uwf thread steps — single step entry */
@@ -127,19 +126,6 @@ export type ThreadListItem = {
head: CasRef;
};
/** uwf thread running — single running thread entry */
export type RunningThreadItem = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
/** uwf thread running output */
export type RunningThreadsOutput = {
threads: RunningThreadItem[];
};
// ── 4.6 配置 ────────────────────────────────────────────────────────
/** Alias types for config references */
-89
View File
@@ -1,89 +0,0 @@
#!/usr/bin/env bash
# batch-solve.sh — solve multiple Gitea issues via solve-issue workflow
#
# Usage:
# ./scripts/batch-solve.sh [--agent CMD] [--repo OWNER/REPO] [--count N] ISSUE_NUM...
#
# Examples:
# ./scripts/batch-solve.sh 448 449
# ./scripts/batch-solve.sh --agent "bun run $(pwd)/packages/workflow-agent-claude-code/src/cli.ts" 448 449
# ./scripts/batch-solve.sh --repo uncaged/workflow --count 15 448 449
set -euo pipefail
AGENT=""
REPO="uncaged/workflow"
COUNT=10
ISSUES=()
while [[ $# -gt 0 ]]; do
case "$1" in
--agent) AGENT="$2"; shift 2 ;;
--repo) REPO="$2"; shift 2 ;;
--count) COUNT="$2"; shift 2 ;;
*) ISSUES+=("$1"); shift ;;
esac
done
if [[ ${#ISSUES[@]} -eq 0 ]]; then
echo "Usage: $0 [--agent CMD] [--repo OWNER/REPO] [--count N] ISSUE_NUM..." >&2
exit 1
fi
AGENT_FLAG=""
if [[ -n "$AGENT" ]]; then
AGENT_FLAG="--agent $AGENT"
fi
TOTAL=${#ISSUES[@]}
PASSED=0
FAILED=0
RESULTS=()
echo "━━━ Batch solve: ${TOTAL} issues ━━━"
echo ""
for i in "${!ISSUES[@]}"; do
ISSUE="${ISSUES[$i]}"
NUM=$((i + 1))
echo "┌─── [$NUM/$TOTAL] Issue #${ISSUE} ───"
# Read issue title
TITLE=$(tea issues "$ISSUE" -r "$REPO" 2>/dev/null | head -1 | sed 's/^# #[0-9]* //' | sed 's/ (.*//' || echo "unknown")
echo "│ Title: $TITLE"
# Start thread
PROMPT="Fix issue #${ISSUE} in ${REPO}. Read the issue first with 'tea issues ${ISSUE} -r ${REPO}' for full spec."
THREAD_JSON=$(uwf thread start solve-issue -p "$PROMPT" 2>&1)
THREAD_ID=$(echo "$THREAD_JSON" | python3 -c "import json,sys; print(json.load(sys.stdin)['thread'])")
echo "│ Thread: $THREAD_ID"
# Run steps
echo "│ Running (max $COUNT steps)..."
# shellcheck disable=SC2086
if STEP_OUTPUT=$(uwf thread step "$THREAD_ID" $AGENT_FLAG -c "$COUNT" 2>&1); then
# Check if done
LAST_DONE=$(echo "$STEP_OUTPUT" | python3 -c "import json,sys; lines=sys.stdin.read().strip(); data=json.loads(lines); print(data[-1].get('done', False))")
if [[ "$LAST_DONE" == "True" ]]; then
echo "│ ✅ Done!"
PASSED=$((PASSED + 1))
RESULTS+=("✅ #${ISSUE}${TITLE}")
else
echo "│ ⚠️ Ran out of steps (not done)"
FAILED=$((FAILED + 1))
RESULTS+=("⚠️ #${ISSUE}${TITLE} (incomplete)")
fi
else
echo "│ ❌ Failed"
FAILED=$((FAILED + 1))
RESULTS+=("❌ #${ISSUE}${TITLE} (error)")
fi
echo "└───"
echo ""
done
echo "━━━ Results: ${PASSED}/${TOTAL} passed, ${FAILED} failed ━━━"
for R in "${RESULTS[@]}"; do
echo " $R"
done