diff --git a/packages/cli-workflow/README.md b/packages/cli-workflow/README.md index 12affdc..32b7e7a 100644 --- a/packages/cli-workflow/README.md +++ b/packages/cli-workflow/README.md @@ -6,6 +6,18 @@ Layer 4 entry point for the workflow engine. The `uwf` binary orchestrates one step per invocation: load thread head from `threads.yaml`, run the moderator, spawn the configured agent CLI, run extract, append a CAS step node, and update the head pointer (or archive when `$END`). +### Four-Layer Architecture + +``` +workflow → thread → step → turn +模板定义 执行实例 单步结果 agent内部交互 +``` + +- **Workflow** (layer 1): YAML template with roles and routing graph +- **Thread** (layer 2): Single workflow execution instance +- **Step** (layer 3): One moderator→agent→extract cycle +- **Turn** (layer 4): Agent-internal interactions (use `step read` or CAS to inspect) + This package has no library `src/index.ts` — it is consumed as a CLI binary only. **Dependencies:** `@uncaged/json-cas`, `@uncaged/json-cas-fs`, `@uncaged/workflow-agent-kit`, `@uncaged/workflow-moderator`, `@uncaged/workflow-protocol`, `@uncaged/workflow-util`, `commander`, `dotenv`, `yaml` @@ -30,34 +42,53 @@ bun link packages/cli-workflow -h, --help Show help ``` -### Thread +### Thread (Layer 2: Execution Instances) | Command | Description | |---------|-------------| | `uwf thread start -p ` | Create a thread without executing | -| `uwf thread step [--agent ] [-c ]` | Execute one or more moderator→agent→extract cycles | +| `uwf thread exec [--agent ] [-c ] [--background]` | Execute one or more moderator→agent→extract cycles | | `uwf thread show ` | Show thread head pointer | -| `uwf thread list [--all]` | List active threads (`--all` includes archived) | -| `uwf thread steps ` | List all steps chronologically | +| `uwf thread list [--status ]` | List threads, optionally filtered by status | | `uwf thread read [--quota N] [--before ] [--start]` | Render thread as readable markdown | -| `uwf thread fork ` | Fork from a specific step | -| `uwf thread step-details ` | Dump full detail node as YAML | -| `uwf thread kill ` | Terminate and archive | +| `uwf thread stop ` | Stop background execution (keep thread active) | +| `uwf thread cancel ` | Cancel thread (stop + archive to history) | Examples: ```bash uwf thread start solve-issue -p "Fix the login redirect bug" -uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV -uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin +uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV +uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV -c 3 --agent uwf-builtin +uwf thread exec 01ARZ3NDEKTSV4RRFFQ69G5FAV --background +uwf thread list --status running uwf thread read 01ARZ3NDEKTSV4RRFFQ69G5FAV --quota 8000 +uwf thread stop 01ARZ3NDEKTSV4RRFFQ69G5FAV ``` -### Workflow +### Step (Layer 3: Single Cycle Results) | Command | Description | |---------|-------------| -| `uwf workflow put ` | Register a workflow from YAML | +| `uwf step list ` | List all steps in a thread chronologically | +| `uwf step show ` | Show step metadata and frontmatter | +| `uwf step read [--before N]` | Read step output as markdown | +| `uwf step fork ` | Fork a thread from a specific step | + +Examples: + +```bash +uwf step list 01ARZ3NDEKTSV4RRFFQ69G5FAV +uwf step show 32GCDE899RRQ3 +uwf step read 32GCDE899RRQ3 --before 3 +uwf step fork 32GCDE899RRQ3 +``` + +### Workflow (Layer 1: Templates) + +| Command | Description | +|---------|-------------| +| `uwf workflow add ` | Register a workflow from YAML | | `uwf workflow show ` | Show workflow definition | | `uwf workflow list` | List registered workflows | @@ -99,6 +130,52 @@ Config: `~/.uncaged/workflow/config.yaml`. API keys: `~/.uncaged/workflow/.env`. | `uwf log show [--thread ] [--process ] [--date YYYY-MM-DD]` | Show filtered log entries | | `uwf log clean [--before YYYY-MM-DD]` | Delete old log files | +## Migration Guide + +### Breaking Changes (v0.x → v1.x) + +The CLI was reorganized to clarify the four-layer architecture. **No backward compatibility** — old commands have been removed. + +#### Renamed Commands + +| Old Command | New Command | Notes | +|------------|-------------|-------| +| `workflow put` | `workflow add` | More intuitive verb | +| `thread step` | `thread exec` | Eliminates ambiguity with "step" noun | +| `thread list --all` | `thread list --status completed` | Unified status filtering | + +#### Removed Commands (Merged) + +| Old Command | New Command | Notes | +|------------|-------------|-------| +| `thread running` | `thread list --status running` | Merged into unified list | + +#### Removed Commands (Split) + +| Old Command | New Commands | Notes | +|------------|-------------|-------| +| `thread kill` | `thread stop` or `thread cancel` | `stop` keeps thread active, `cancel` archives it | + +#### Moved Commands + +| Old Command | New Command | Notes | +|------------|-------------|-------| +| `thread steps` | `step list` | Moved to step layer | +| `thread step-details` | `step show` | Moved to step layer | +| `thread fork` | `step fork` | Moved to step layer (forks are step-based) | + +#### Deprecation Errors + +Old commands now show helpful error messages: + +```bash +$ uwf thread step 01ARZ3NDEKTSV4RRFFQ69G5FAV +Error: Command 'thread step' has been removed. +Use 'thread exec' instead. + +For more information, see: uwf help thread exec +``` + ## Internal Structure ``` @@ -109,8 +186,9 @@ src/ ├── validate.ts Workflow YAML validation ├── schemas.ts CLI-local schema registration └── commands/ - ├── thread.ts Thread lifecycle and step execution - ├── workflow.ts Workflow registry (put/show/list) + ├── thread.ts Thread lifecycle and exec + ├── step.ts Step operations (list/show/read/fork) + ├── workflow.ts Workflow registry (add/show/list) ├── cas.ts CAS inspection and schema ops ├── setup.ts Interactive/non-interactive setup ├── skill.ts Built-in skill references diff --git a/packages/cli-workflow/src/__tests__/thread-step-count.test.ts b/packages/cli-workflow/src/__tests__/thread-step-count.test.ts index 2340f0c..1077de3 100644 --- a/packages/cli-workflow/src/__tests__/thread-step-count.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-step-count.test.ts @@ -22,48 +22,48 @@ function runCli(args: string[]): { stdout: string; stderr: string; exitCode: num } } -describe("thread step --count CLI parsing", () => { +describe("thread exec --count CLI parsing", () => { test("--help shows -c/--count option", () => { - const result = runCli(["thread", "step", "--help"]); + const result = runCli(["thread", "exec", "--help"]); expect(result.stdout).toContain("--count"); expect(result.stdout).toContain("-c"); }); test("description says 'one or more steps'", () => { - const result = runCli(["thread", "step", "--help"]); + const result = runCli(["thread", "exec", "--help"]); expect(result.stdout).toContain("one or more steps"); }); }); -describe("cmdThreadStep count logic", () => { +describe("cmdThreadExec count logic", () => { test("count=0 fails with validation error", () => { - const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "0"]); + const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "0"]); expect(result.exitCode).not.toBe(0); expect(result.stderr).toContain("positive integer"); }); test("negative count fails with validation error", () => { - const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "-1"]); + const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "-1"]); expect(result.exitCode).not.toBe(0); expect(result.stderr).toContain("positive integer"); }); test("non-integer count fails with validation error", () => { - const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "1.5"]); + const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "1.5"]); expect(result.exitCode).not.toBe(0); expect(result.stderr).toContain("positive integer"); }); test("count=1 is the default (no -c flag)", () => { // Without -c, it should attempt to run 1 step (failing on missing thread, not on count validation) - const result = runCli(["thread", "step", "FAKE_THREAD_ID"]); + const result = runCli(["thread", "exec", "FAKE_THREAD_ID"]); expect(result.exitCode).not.toBe(0); // Should NOT contain "positive integer" error — should fail on thread lookup instead expect(result.stderr).not.toContain("positive integer"); }); test("count=3 passes validation (fails on thread lookup)", () => { - const result = runCli(["thread", "step", "FAKE_THREAD_ID", "-c", "3"]); + const result = runCli(["thread", "exec", "FAKE_THREAD_ID", "-c", "3"]); expect(result.exitCode).not.toBe(0); // Should NOT contain "positive integer" error — should fail on thread/storage lookup expect(result.stderr).not.toContain("positive integer"); diff --git a/packages/cli-workflow/src/__tests__/thread.test.ts b/packages/cli-workflow/src/__tests__/thread.test.ts index 3ca456b..8f40d77 100644 --- a/packages/cli-workflow/src/__tests__/thread.test.ts +++ b/packages/cli-workflow/src/__tests__/thread.test.ts @@ -5,9 +5,9 @@ import { bootstrap, putSchema } from "@uncaged/json-cas"; import { createFsStore } from "@uncaged/json-cas-fs"; import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { cmdStepShow } from "../commands/step.js"; import { cmdThreadRead, - cmdThreadStepDetails, extractLastAssistantContent, THREAD_READ_DEFAULT_QUOTA, } from "../commands/thread.js"; @@ -315,9 +315,9 @@ describe("cmdThreadRead section", () => { }); }); -// ── cmdThreadStepDetails ────────────────────────────────────────────────────── +// ── cmdStepShow ─────────────────────────────────────────────────────────────── -describe("cmdThreadStepDetails", () => { +describe("cmdStepShow", () => { test("returns expanded detail node with turns inlined", async () => { const uwf = await makeUwfStore(tmpDir); const detailSchemas = await registerDetailSchemas(uwf.store); @@ -365,7 +365,7 @@ describe("cmdThreadStepDetails", () => { agent: "uwf-hermes", }); - const result = await cmdThreadStepDetails(tmpDir, stepHash); + const result = await cmdStepShow(tmpDir, stepHash); expect(result).toMatchObject({ sessionId: "sess42", @@ -586,9 +586,9 @@ describe("cmdThreadRead start section / before / quota", () => { // ── Tests that call process.exit must be last ───────────────────────────────── -describe("cmdThreadStepDetails (process.exit tests - must be last)", () => { +describe("cmdStepShow (process.exit tests - must be last)", () => { test("throws when step hash does not exist", async () => { - await expect(cmdThreadStepDetails(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow(); + await expect(cmdStepShow(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow(); }); test("before with unknown hash rejects", async () => { diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index 3bff36d..c7d7fa4 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -1,8 +1,7 @@ #!/usr/bin/env bun -import type { ThreadId } from "@uncaged/workflow-protocol"; +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; import { Command } from "commander"; -import { stringify as yamlStringify } from "yaml"; import { cmdCasGet, cmdCasHas, @@ -17,20 +16,19 @@ import { import { cmdLogClean, cmdLogList, cmdLogShow } from "./commands/log.js"; import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js"; import { cmdSkillCli } from "./commands/skill.js"; +import { cmdStepFork, cmdStepList, cmdStepShow } from "./commands/step.js"; import { - cmdThreadFork, - cmdThreadKill, + cmdThreadCancel, + cmdThreadExec, cmdThreadList, cmdThreadRead, - cmdThreadRunning, cmdThreadShow, cmdThreadStart, - cmdThreadStep, - cmdThreadStepDetails, - cmdThreadSteps, + cmdThreadStop, THREAD_READ_DEFAULT_QUOTA, + type ThreadStatus, } from "./commands/thread.js"; -import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js"; +import { cmdWorkflowAdd, cmdWorkflowList, cmdWorkflowShow } from "./commands/workflow.js"; import { formatOutput, type OutputFormat } from "./format.js"; import { resolveStorageRoot } from "./store.js"; @@ -53,20 +51,27 @@ const program = new Command(); const pkg = await import("../package.json", { with: { type: "json" } }); program .name("uwf") - .description("Stateless workflow CLI") + .description( + "Stateless workflow CLI\n\n" + + "Four-layer architecture:\n" + + " workflow → thread → step → turn\n" + + " 模板定义 执行实例 单步结果 agent内部交互", + ) .version(pkg.default.version, "-V, --version"); program.option("--format ", "Output format: json or yaml", "json"); -const workflow = program.command("workflow").description("Workflow registry and CAS"); +const workflow = program + .command("workflow") + .description("Workflow definitions (layer 1: templates)"); workflow - .command("put") + .command("add") .description("Register a workflow from YAML") .argument("", "Workflow YAML file") .action((file: string) => { const storageRoot = resolveStorageRoot(); runAction(async () => { - const result = await cmdWorkflowPut(storageRoot, file); + const result = await cmdWorkflowAdd(storageRoot, file); writeOutput(result); }); }); @@ -94,7 +99,7 @@ workflow }); }); -const thread = program.command("thread").description("Thread lifecycle and execution"); +const thread = program.command("thread").description("Thread execution (layer 2: instances)"); thread .command("start") @@ -110,7 +115,7 @@ thread }); thread - .command("step") + .command("exec") .description("Execute one or more steps") .argument("", "Thread ULID") .option("--agent ", "Override agent command") @@ -134,7 +139,7 @@ thread const background = opts.background ?? false; const backgroundWorker = opts._backgroundWorker ?? false; - const results = await cmdThreadStep( + const results = await cmdThreadExec( storageRoot, threadId, agentOverride, @@ -165,47 +170,49 @@ thread thread .command("list") - .description("List active threads") - .option("--all", "Include archived threads") - .action((opts: { all: boolean }) => { + .description("List threads") + .option("--status ", "Filter by status: idle, running, or completed") + .action((opts: { status: string | undefined }) => { const storageRoot = resolveStorageRoot(); runAction(async () => { - const result = await cmdThreadList(storageRoot, opts.all); + const validStatuses: ThreadStatus[] = ["idle", "running", "completed"]; + let statusFilter: ThreadStatus | null = null; + + if (opts.status !== undefined) { + if (!validStatuses.includes(opts.status as ThreadStatus)) { + process.stderr.write( + `Invalid status: ${opts.status}. Must be one of: idle, running, completed\n`, + ); + process.exit(1); + } + statusFilter = opts.status as ThreadStatus; + } + + const result = await cmdThreadList(storageRoot, statusFilter); writeOutput(result); }); }); thread - .command("running") - .description("List threads currently executing in the background") - .action(() => { - const storageRoot = resolveStorageRoot(); - runAction(async () => { - const result = await cmdThreadRunning(storageRoot); - writeOutput(result); - }); - }); - -thread - .command("kill") - .description("Terminate and archive a thread") + .command("stop") + .description("Stop background execution of a thread (keep thread active)") .argument("", "Thread ULID") .action((threadId: string) => { const storageRoot = resolveStorageRoot(); runAction(async () => { - const result = await cmdThreadKill(storageRoot, threadId); + const result = await cmdThreadStop(storageRoot, threadId); writeOutput(result); }); }); thread - .command("steps") - .description("List all steps in a thread") + .command("cancel") + .description("Cancel a thread (stop execution and move to history)") .argument("", "Thread ULID") .action((threadId: string) => { const storageRoot = resolveStorageRoot(); runAction(async () => { - const result = await cmdThreadSteps(storageRoot, threadId); + const result = await cmdThreadCancel(storageRoot, threadId); writeOutput(result); }); }); @@ -239,28 +246,141 @@ thread }, ); -thread +const step = program.command("step").description("Step results (layer 3: single cycle)"); + +step + .command("list") + .description("List all steps in a thread") + .argument("", "Thread ULID") + .action((threadId: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdStepList(storageRoot, threadId); + writeOutput(result); + }); + }); + +step + .command("show") + .description("Show details of a specific step") + .argument("", "CAS hash of the StepNode") + .action((stepHash: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const detail = await cmdStepShow(storageRoot, stepHash as CasRef); + writeOutput(detail); + }); + }); + +// step read is not yet registered (half-baked, see step.ts cmdStepRead) + +step .command("fork") .description("Fork a thread from a specific step") .argument("", "CAS hash of the StartNode or StepNode to fork from") .action((stepHash: string) => { const storageRoot = resolveStorageRoot(); runAction(async () => { - const result = await cmdThreadFork(storageRoot, stepHash); + const result = await cmdStepFork(storageRoot, stepHash as CasRef); writeOutput(result); }); }); +// ── Deprecation Handlers ────────────────────────────────────────────────────── +// These commands have been removed. Show helpful error messages. + +workflow + .command("put") + .description("[DEPRECATED] Use 'workflow add' instead") + .argument("", "Workflow YAML file") + .action(() => { + process.stderr.write(`Error: Command 'workflow put' has been removed. +Use 'workflow add' instead. + +For more information, see: uwf help workflow add +`); + process.exit(1); + }); + +thread + .command("step") + .description("[DEPRECATED] Use 'thread exec' instead") + .argument("", "Thread ULID") + .allowUnknownOption() + .action(() => { + process.stderr.write(`Error: Command 'thread step' has been removed. +Use 'thread exec' instead. + +For more information, see: uwf help thread exec +`); + process.exit(1); + }); + +thread + .command("steps") + .description("[DEPRECATED] Use 'step list' instead") + .argument("", "Thread ULID") + .action(() => { + process.stderr.write(`Error: Command 'thread steps' has been removed. +Use 'step list' instead. + +For more information, see: uwf help step list +`); + process.exit(1); + }); + thread .command("step-details") - .description("Dump the full detail node of a step as YAML") - .argument("", "CAS hash of the StepNode") - .action((stepHash: string) => { - const storageRoot = resolveStorageRoot(); - runAction(async () => { - const detail = await cmdThreadStepDetails(storageRoot, stepHash); - process.stdout.write(yamlStringify(detail)); - }); + .description("[DEPRECATED] Use 'step show' instead") + .argument("", "Step hash") + .action(() => { + process.stderr.write(`Error: Command 'thread step-details' has been removed. +Use 'step show' instead. + +For more information, see: uwf help step show +`); + process.exit(1); + }); + +thread + .command("fork") + .description("[DEPRECATED] Use 'step fork' instead") + .argument("", "Step hash") + .action(() => { + process.stderr.write(`Error: Command 'thread fork' has been removed. +Use 'step fork' instead. + +For more information, see: uwf help step fork +`); + process.exit(1); + }); + +thread + .command("kill") + .description("[DEPRECATED] Use 'thread stop' or 'thread cancel' instead") + .argument("", "Thread ULID") + .action(() => { + process.stderr.write(`Error: Command 'thread kill' has been removed. +Use 'thread stop' to stop background execution (keep thread active), +or 'thread cancel' to cancel and archive the thread. + +For more information, see: + uwf help thread stop + uwf help thread cancel +`); + process.exit(1); + }); + +thread + .command("running") + .description("[DEPRECATED] Use 'thread list --status running' instead") + .action(() => { + process.stderr.write(`Error: Command 'thread running' has been removed. +Use 'thread list --status running' instead. + +For more information, see: uwf help thread list +`); + process.exit(1); }); const skill = program.command("skill").description("Built-in skill references for agents"); diff --git a/packages/cli-workflow/src/commands/shared.ts b/packages/cli-workflow/src/commands/shared.ts new file mode 100644 index 0000000..6579de6 --- /dev/null +++ b/packages/cli-workflow/src/commands/shared.ts @@ -0,0 +1,227 @@ +import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; +import { getSchema } from "@uncaged/json-cas"; +import type { + CasRef, + StartNodePayload, + StepNodePayload, + ThreadId, +} from "@uncaged/workflow-protocol"; +import { loadThreadsIndex, type UwfStore } from "../store.js"; + +type ChainState = { + startHash: CasRef; + start: StartNodePayload; + stepsNewestFirst: StepNodePayload[]; + headIsStart: boolean; +}; + +type OrderedStepItem = { + hash: CasRef; + payload: StepNodePayload; + timestamp: number; +}; + +function fail(message: string): never { + process.stderr.write(`${message}\n`); + process.exit(1); +} + +function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { + const headNode = uwf.store.get(headHash); + if (headNode === null) { + fail(`CAS node not found: ${headHash}`); + } + + if (headNode.type === uwf.schemas.startNode) { + return { + startHash: headHash, + start: headNode.payload as StartNodePayload, + stepsNewestFirst: [], + headIsStart: true, + }; + } + + if (headNode.type !== uwf.schemas.stepNode) { + fail(`head ${headHash} is not a StartNode or StepNode`); + } + + const stepsNewestFirst: StepNodePayload[] = []; + let hash: CasRef | null = headHash; + + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null) { + fail(`CAS node not found while walking chain: ${hash}`); + } + if (node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + stepsNewestFirst.push(payload); + hash = payload.prev; + } + + const newest = stepsNewestFirst[0]; + if (newest === undefined) { + fail(`empty step chain at head ${headHash}`); + } + + const startNode = uwf.store.get(newest.start); + if (startNode === null || startNode.type !== uwf.schemas.startNode) { + fail(`StartNode not found: ${newest.start}`); + } + + return { + startHash: newest.start, + start: startNode.payload as StartNodePayload, + stepsNewestFirst, + headIsStart: false, + }; +} + +function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { + const node = uwf.store.get(outputRef); + if (node === null) { + return {}; + } + return node.payload; +} + +/** + * Recursively expand all cas_ref fields in a CAS node's payload, + * replacing hash strings with the referenced node's expanded payload. + */ +function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { + const seen = visited ?? new Set(); + if (seen.has(hash)) return hash; // cycle guard + seen.add(hash); + + const node = store.get(hash); + if (node === null) return hash; + + const schema = getSchema(store, node.type); + if (schema === null) return node.payload; + + return expandValue(store, schema, node.payload, seen); +} + +function expandCasRefField(store: CasStore, value: unknown, visited: Set): unknown { + if (typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + return value; +} + +function expandAnyOfField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (!Array.isArray(schema.anyOf)) return value; + for (const sub of schema.anyOf as JSONSchema[]) { + if (sub.format === "cas_ref" && typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + } + return value; +} + +function expandArrayField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (!schema.items || !Array.isArray(value)) return value; + const itemSchema = schema.items as JSONSchema; + return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited)); +} + +function expandObjectField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) { + return value; + } + const props = schema.properties as Record; + const obj = value as Record; + const result: Record = {}; + for (const [key, val] of Object.entries(obj)) { + const propSchema = props[key]; + result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val; + } + return result; +} + +function expandValue( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (schema.format === "cas_ref") return expandCasRefField(store, value, visited); + if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited); + if (schema.type === "array") return expandArrayField(store, schema, value, visited); + return expandObjectField(store, schema, value, visited); +} + +function collectOrderedSteps( + uwf: UwfStore, + headHash: CasRef, + chain: ChainState, +): OrderedStepItem[] { + let hash: CasRef | null = headHash; + const hashToNode = new Map(); + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null || node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + hashToNode.set(hash, { payload, timestamp: node.timestamp }); + hash = payload.prev; + } + + let cur: CasRef | null = chain.headIsStart ? null : headHash; + const ordered: OrderedStepItem[] = []; + while (cur !== null) { + const entry = hashToNode.get(cur); + if (entry === undefined) { + break; + } + ordered.push({ hash: cur, ...entry }); + cur = entry.payload.prev; + } + + ordered.reverse(); + return ordered; +} + +async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { + const index = await loadThreadsIndex(storageRoot); + const head = index[threadId]; + if (head === undefined) { + fail(`thread not active: ${threadId}`); + } + return head; +} + +export { + type ChainState, + collectOrderedSteps, + expandAnyOfField, + expandArrayField, + expandCasRefField, + expandDeep, + expandObjectField, + expandOutput, + expandValue, + fail, + type OrderedStepItem, + resolveHeadHash, + walkChain, +}; diff --git a/packages/cli-workflow/src/commands/step.ts b/packages/cli-workflow/src/commands/step.ts new file mode 100644 index 0000000..33b5d47 --- /dev/null +++ b/packages/cli-workflow/src/commands/step.ts @@ -0,0 +1,145 @@ +import type { + CasRef, + StartEntry, + StepEntry, + StepNodePayload, + ThreadForkOutput, + ThreadId, + ThreadStepsOutput, +} from "@uncaged/workflow-protocol"; +import { generateUlid } from "@uncaged/workflow-util"; +import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js"; +import { + collectOrderedSteps, + expandDeep, + expandOutput, + fail, + resolveHeadHash, + walkChain, +} from "./shared.js"; + +/** + * List all steps in a thread (previously: thread steps) + */ +export async function cmdStepList( + storageRoot: string, + threadId: ThreadId, +): Promise { + const headHash = await resolveHeadHash(storageRoot, threadId); + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + + const startNode = uwf.store.get(chain.startHash); + if (startNode === null) { + fail(`StartNode not found: ${chain.startHash}`); + } + + const startEntry: StartEntry = { + hash: chain.startHash, + workflow: chain.start.workflow, + prompt: chain.start.prompt, + timestamp: startNode.timestamp, + }; + + const stepEntries: StepEntry[] = []; + const ordered = collectOrderedSteps(uwf, headHash, chain); + + for (const item of ordered) { + stepEntries.push({ + hash: item.hash, + role: item.payload.role, + output: expandOutput(uwf, item.payload.output), + detail: item.payload.detail ?? null, + agent: item.payload.agent, + timestamp: item.timestamp, + }); + } + + return { + thread: threadId, + workflow: chain.start.workflow, + steps: [startEntry, ...stepEntries], + }; +} + +/** + * Show details of a specific step (previously: thread step-details) + */ +export async function cmdStepShow(storageRoot: string, stepHash: CasRef): Promise { + const uwf = await createUwfStore(storageRoot); + const node = uwf.store.get(stepHash); + if (node === null) { + fail(`CAS node not found: ${stepHash}`); + } + if (node.type !== uwf.schemas.stepNode) { + fail(`node ${stepHash} is not a StepNode`); + } + const payload = node.payload as StepNodePayload; + if (!payload.detail) { + fail(`step ${stepHash} has no detail`); + } + return expandDeep(uwf.store, payload.detail); +} + +/** + * Fork a thread from a specific step (previously: thread fork) + */ +export async function cmdStepFork( + storageRoot: string, + stepHash: CasRef, +): Promise { + const uwf = await createUwfStore(storageRoot); + const node = uwf.store.get(stepHash); + if (node === null) { + fail(`CAS node not found: ${stepHash}`); + } + if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) { + fail(`node ${stepHash} is not a StartNode or StepNode`); + } + + const newThreadId = generateUlid(Date.now()) as ThreadId; + const index = await loadThreadsIndex(storageRoot); + index[newThreadId] = stepHash; + await saveThreadsIndex(storageRoot, index); + + return { + thread: newThreadId, + forkedFrom: { + step: stepHash, + }, + }; +} + +/** + * Read a step's agent output as markdown (new command - requires #462) + * TODO: Implement once unified agent detail/turn schema is available + */ +export async function cmdStepRead( + storageRoot: string, + stepHash: CasRef, + _before: number | null = null, +): Promise { + const uwf = await createUwfStore(storageRoot); + const node = uwf.store.get(stepHash); + if (node === null) { + fail(`CAS node not found: ${stepHash}`); + } + if (node.type !== uwf.schemas.stepNode) { + fail(`node ${stepHash} is not a StepNode`); + } + const payload = node.payload as StepNodePayload; + if (!payload.output) { + fail(`step ${stepHash} has no output`); + } + + // TODO: Implement progressive turn reading with --before N + // For now, return a placeholder + const outputNode = uwf.store.get(payload.output); + if (outputNode === null) { + fail(`output node not found: ${payload.output}`); + } + + // Return the output as JSON for now + // Once #462 is implemented, this will properly format frontmatter + markdown + return JSON.stringify(outputNode.payload, null, 2); +} diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index f71900a..556801f 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -1,8 +1,7 @@ import { execFileSync, spawn } from "node:child_process"; import { access, readFile } from "node:fs/promises"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; -import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; -import { getSchema, validate } from "@uncaged/json-cas"; +import { validate } from "@uncaged/json-cas"; import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-agent-kit"; import { evaluate } from "@uncaged/workflow-moderator"; import type { @@ -11,17 +10,13 @@ import type { CasRef, ModeratorContext, RunningThreadsOutput, - StartEntry, StartNodePayload, StartOutput, StepContext, - StepEntry, StepNodePayload, StepOutput, - ThreadForkOutput, ThreadId, ThreadListItem, - ThreadStepsOutput, WorkflowConfig, WorkflowPayload, } from "@uncaged/workflow-protocol"; @@ -47,6 +42,14 @@ import { type UwfStore, } from "../store.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; +import { + type ChainState, + collectOrderedSteps, + expandOutput, + fail, + type OrderedStepItem, + walkChain, +} from "./shared.js"; import { materializeWorkflowPayload } from "./workflow.js"; const END_ROLE = "$END"; @@ -65,29 +68,6 @@ function failStep(plog: ProcessLogger, message: string): never { fail(message); } -type ChainState = { - startHash: CasRef; - start: StartNodePayload; - stepsNewestFirst: StepNodePayload[]; - headIsStart: boolean; -}; - -type OrderedStepItem = { - hash: CasRef; - payload: StepNodePayload; - timestamp: number; -}; - -export type KillOutput = { - thread: ThreadId; - archived: boolean; -}; - -function fail(message: string): never { - process.stderr.write(`${message}\n`); - process.exit(1); -} - /** * Check if a string looks like a file path (contains path separators or has .yaml/.yml extension). */ @@ -346,226 +326,70 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr fail(`thread not found: ${threadId}`); } +export type ThreadStatus = "idle" | "running" | "completed"; + +export type ThreadListItemWithStatus = ThreadListItem & { + status: ThreadStatus; +}; + async function threadListItemFromActive( + storageRoot: string, uwf: UwfStore, threadId: ThreadId, head: CasRef, -): Promise { +): Promise { const workflow = resolveWorkflowFromHead(uwf, head); if (workflow === null) { return null; } - return { thread: threadId, workflow, head }; + + // Check if thread is currently running in background + const runningMarker = await isThreadRunning(storageRoot, threadId); + const status: ThreadStatus = runningMarker !== null ? "running" : "idle"; + + return { thread: threadId, workflow, head, status }; } export async function cmdThreadList( storageRoot: string, - includeAll: boolean, -): Promise { + statusFilter: ThreadStatus | null, +): Promise { const uwf = await createUwfStore(storageRoot); const index = await loadThreadsIndex(storageRoot); - const items: ThreadListItem[] = []; + const items: ThreadListItemWithStatus[] = []; + // Add active threads for (const [threadId, head] of Object.entries(index)) { - const item = await threadListItemFromActive(uwf, threadId as ThreadId, head); + const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, head); if (item !== null) { items.push(item); } } - if (!includeAll) { - return items; + // Add completed threads if requested + if (statusFilter === "completed" || statusFilter === null) { + const activeIds = new Set(items.map((i) => i.thread)); + const history = await loadThreadHistory(storageRoot); + for (const entry of history) { + if (!activeIds.has(entry.thread)) { + items.push({ + thread: entry.thread, + workflow: entry.workflow, + head: entry.head, + status: "completed", + }); + } + } } - const activeIds = new Set(items.map((i) => i.thread)); - const history = await loadThreadHistory(storageRoot); - for (const entry of history) { - if (!activeIds.has(entry.thread)) { - items.push({ - thread: entry.thread, - workflow: entry.workflow, - head: entry.head, - }); - } + // Apply status filter if provided + if (statusFilter !== null) { + return items.filter((item) => item.status === statusFilter); } return items; } -function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { - const headNode = uwf.store.get(headHash); - if (headNode === null) { - fail(`CAS node not found: ${headHash}`); - } - - if (headNode.type === uwf.schemas.startNode) { - return { - startHash: headHash, - start: headNode.payload as StartNodePayload, - stepsNewestFirst: [], - headIsStart: true, - }; - } - - if (headNode.type !== uwf.schemas.stepNode) { - fail(`head ${headHash} is not a StartNode or StepNode`); - } - - const stepsNewestFirst: StepNodePayload[] = []; - let hash: CasRef | null = headHash; - - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null) { - fail(`CAS node not found while walking chain: ${hash}`); - } - if (node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - stepsNewestFirst.push(payload); - hash = payload.prev; - } - - const newest = stepsNewestFirst[0]; - if (newest === undefined) { - fail(`empty step chain at head ${headHash}`); - } - - const startNode = uwf.store.get(newest.start); - if (startNode === null || startNode.type !== uwf.schemas.startNode) { - fail(`StartNode not found: ${newest.start}`); - } - - return { - startHash: newest.start, - start: startNode.payload as StartNodePayload, - stepsNewestFirst, - headIsStart: false, - }; -} - -function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { - const node = uwf.store.get(outputRef); - if (node === null) { - return {}; - } - return node.payload; -} - -/** - * Recursively expand all cas_ref fields in a CAS node's payload, - * replacing hash strings with the referenced node's expanded payload. - */ -function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { - const seen = visited ?? new Set(); - if (seen.has(hash)) return hash; // cycle guard - seen.add(hash); - - const node = store.get(hash); - if (node === null) return hash; - - const schema = getSchema(store, node.type); - if (schema === null) return node.payload; - - return expandValue(store, schema, node.payload, seen); -} - -function expandCasRefField(store: CasStore, value: unknown, visited: Set): unknown { - if (typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - return value; -} - -function expandAnyOfField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!Array.isArray(schema.anyOf)) return value; - for (const sub of schema.anyOf as JSONSchema[]) { - if (sub.format === "cas_ref" && typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - } - return value; -} - -function expandArrayField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!schema.items || !Array.isArray(value)) return value; - const itemSchema = schema.items as JSONSchema; - return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited)); -} - -function expandObjectField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) { - return value; - } - const props = schema.properties as Record; - const obj = value as Record; - const result: Record = {}; - for (const [key, val] of Object.entries(obj)) { - const propSchema = props[key]; - result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val; - } - return result; -} - -function expandValue( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (schema.format === "cas_ref") return expandCasRefField(store, value, visited); - if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited); - if (schema.type === "array") return expandArrayField(store, schema, value, visited); - return expandObjectField(store, schema, value, visited); -} - -function collectOrderedSteps( - uwf: UwfStore, - headHash: CasRef, - chain: ChainState, -): OrderedStepItem[] { - let hash: CasRef | null = headHash; - const hashToNode = new Map(); - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null || node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - hashToNode.set(hash, { payload, timestamp: node.timestamp }); - hash = payload.prev; - } - - let cur: CasRef | null = chain.headIsStart ? null : headHash; - const ordered: OrderedStepItem[] = []; - while (cur !== null) { - const entry = hashToNode.get(cur); - if (entry === undefined) { - break; - } - ordered.push({ hash: cur, ...entry }); - cur = entry.payload.prev; - } - ordered.reverse(); - return ordered; -} - function formatYaml(value: unknown): string { return stringify(value, { aliasDuplicateObjects: false }).trimEnd(); } @@ -857,7 +681,7 @@ async function archiveThread( }); } -export async function cmdThreadStep( +export async function cmdThreadExec( storageRoot: string, threadId: ThreadId, agentOverride: string | null, @@ -953,7 +777,7 @@ async function cmdThreadStepBackground( failStep(plog, "unable to determine script path for background execution"); } - const args = ["thread", "step", threadId, "--count", String(count)]; + const args = ["thread", "exec", threadId, "--count", String(count)]; if (agentOverride !== null) { args.push("--agent", agentOverride); @@ -1085,47 +909,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise fail(`thread not found: ${threadId}`); } -export async function cmdThreadSteps( - storageRoot: string, - threadId: ThreadId, -): Promise { - const headHash = await resolveHeadHash(storageRoot, threadId); - const uwf = await createUwfStore(storageRoot); - const chain = walkChain(uwf, headHash); - - const startNode = uwf.store.get(chain.startHash); - if (startNode === null) { - fail(`StartNode not found: ${chain.startHash}`); - } - - const startEntry: StartEntry = { - hash: chain.startHash, - workflow: chain.start.workflow, - prompt: chain.start.prompt, - timestamp: startNode.timestamp, - }; - - const stepEntries: StepEntry[] = []; - const ordered = collectOrderedSteps(uwf, headHash, chain); - - for (const item of ordered) { - stepEntries.push({ - hash: item.hash, - role: item.payload.role, - output: expandOutput(uwf, item.payload.output), - detail: item.payload.detail, - agent: item.payload.agent, - timestamp: item.timestamp, - }); - } - - return { - thread: threadId, - workflow: chain.start.workflow, - steps: [startEntry, ...stepEntries], - }; -} - export async function cmdThreadRead( storageRoot: string, threadId: ThreadId, @@ -1153,52 +936,50 @@ export async function cmdThreadRead( }); } -export async function cmdThreadFork( - storageRoot: string, - stepHash: CasRef, -): Promise { - const uwf = await createUwfStore(storageRoot); - const node = uwf.store.get(stepHash); - if (node === null) { - fail(`CAS node not found: ${stepHash}`); - } - if (node.type !== uwf.schemas.startNode && node.type !== uwf.schemas.stepNode) { - fail(`node ${stepHash} is not a StartNode or StepNode`); - } +export type StopOutput = { + thread: ThreadId; + stopped: boolean; +}; - const newThreadId = generateUlid(Date.now()) as ThreadId; +export type CancelOutput = { + thread: ThreadId; + cancelled: boolean; +}; + +/** + * Stop background execution of a thread (but keep thread active) + */ +export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); - index[newThreadId] = stepHash; - await saveThreadsIndex(storageRoot, index); + const head = index[threadId]; + if (head === undefined) { + fail(`thread not active: ${threadId}`); + } - return { - thread: newThreadId, - forkedFrom: { - step: stepHash, - }, - }; + // Check if thread is running in background and terminate it + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker === null) { + process.stderr.write(`Warning: thread ${threadId} is not currently running\n`); + return { thread: threadId, stopped: false }; + } + + try { + process.kill(runningMarker.pid, "SIGTERM"); + } catch { + // Process may have already exited, ignore error + } + await deleteMarker(storageRoot, threadId); + + return { thread: threadId, stopped: true }; } -export async function cmdThreadStepDetails( +/** + * Cancel a thread (stop execution + move to history) + */ +export async function cmdThreadCancel( storageRoot: string, - stepHash: CasRef, -): Promise { - const uwf = await createUwfStore(storageRoot); - const node = uwf.store.get(stepHash); - if (node === null) { - fail(`CAS node not found: ${stepHash}`); - } - if (node.type !== uwf.schemas.stepNode) { - fail(`node ${stepHash} is not a StepNode`); - } - const payload = node.payload as StepNodePayload; - if (!payload.detail) { - fail(`step ${stepHash} has no detail`); - } - return expandDeep(uwf.store, payload.detail); -} - -export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise { + threadId: ThreadId, +): Promise { const index = await loadThreadsIndex(storageRoot); const head = index[threadId]; if (head === undefined) { @@ -1233,7 +1014,7 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr }; await appendThreadHistory(storageRoot, historyEntry); - return { thread: threadId, archived: true }; + return { thread: threadId, cancelled: true }; } export async function cmdThreadRunning(storageRoot: string): Promise { diff --git a/packages/cli-workflow/src/commands/workflow.ts b/packages/cli-workflow/src/commands/workflow.ts index 23c8368..ea72e94 100644 --- a/packages/cli-workflow/src/commands/workflow.ts +++ b/packages/cli-workflow/src/commands/workflow.ts @@ -29,7 +29,7 @@ export type WorkflowListEntry = { origin: WorkflowOrigin; }; -export type WorkflowPutOutput = { +export type WorkflowAddOutput = { name: string; hash: CasRef; }; @@ -111,10 +111,10 @@ export async function materializeWorkflowPayload( }; } -export async function cmdWorkflowPut( +export async function cmdWorkflowAdd( storageRoot: string, filePath: string, -): Promise { +): Promise { let text: string; try { text = await readFile(filePath, "utf8");