diff --git a/.workflows/solve-issue.yaml b/.workflows/solve-issue.yaml index bc1f910..2089f07 100644 --- a/.workflows/solve-issue.yaml +++ b/.workflows/solve-issue.yaml @@ -228,7 +228,7 @@ graph: $START: _: { role: "planner", prompt: "Analyze the issue and produce an implementation plan." } planner: - insufficient_info: { role: "$END", prompt: "Insufficient information to proceed; end the workflow." } + insufficient_info: { role: "$SUSPEND", prompt: "信息不足,需要补充:{{{reason}}}" } ready: { role: "developer", prompt: "Implement the TDD test spec (CAS hash: {{{plan}}}) in repo {{{repoPath}}}. Repo remote: {{{repoRemote}}}." } developer: done: { role: "reviewer", prompt: "Review branch {{{branch}}} at {{{worktree}}} for code standards compliance. Repo remote: {{{repoRemote}}}." } diff --git a/examples/solve-issue.yaml b/examples/solve-issue.yaml index 66b604b..6edbbce 100644 --- a/examples/solve-issue.yaml +++ b/examples/solve-issue.yaml @@ -216,7 +216,7 @@ graph: $START: _: { role: "planner", prompt: "Analyze the issue and produce an implementation plan." } planner: - insufficient_info: { role: "$END", prompt: "Insufficient information to proceed; end the workflow." } + insufficient_info: { role: "$SUSPEND", prompt: "信息不足,需要补充:{{{reason}}}" } ready: { role: "developer", prompt: "Implement the TDD test spec (CAS hash: {{{plan}}}) in repo {{{repoPath}}}." } continue: { role: "developer", prompt: "Continue work on existing branch {{{branch}}} at worktree {{{worktree}}}. Implement the revised TDD test spec (CAS hash: {{{plan}}}) in repo {{{repoPath}}}. Do NOT create a new branch or worktree — cd into the existing worktree and work there." } developer: diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts index 0a29d83..fa4d9fc 100644 --- a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -2,6 +2,7 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; +import { createThreadIndexEntry } from "@uncaged/workflow-protocol"; import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; @@ -45,9 +46,15 @@ async function createTestThread( const startPayload = { workflow: workflowHash, prompt: "test prompt", + cwd: storageRoot, }; const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); - await saveThreadsIndex(storageRoot, { [threadId]: headHash }); + + // Load existing index and add new thread + const existingIndex = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); + existingIndex[threadId] = createThreadIndexEntry(headHash); + await saveThreadsIndex(storageRoot, existingIndex); + return threadId; } diff --git a/packages/cli-workflow/src/__tests__/thread-resume.test.ts b/packages/cli-workflow/src/__tests__/thread-resume.test.ts new file mode 100644 index 0000000..b539244 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-resume.test.ts @@ -0,0 +1,442 @@ +import { execFileSync } from "node:child_process"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@ocas/core"; +import { createFsStore } from "@ocas/fs"; +import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { parse } from "yaml"; +import { cmdThreadShow } from "../commands/thread.js"; +import { registerUwfSchemas } from "../schemas.js"; +import { saveThreadsIndex } from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, + required: ["$status"], + additionalProperties: false, +}; + +const THREAD_ID = "01RESUMESTEPTEST0000000" as ThreadId; +const SUSPEND_MESSAGE = "Please clarify: Which API?"; + +type MockAgentMode = "suspend" | "ok"; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-resume-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +async function setupSuspendedThread(mode: MockAgentMode): Promise<{ + casDir: string; + mockAgentPath: string; + promptCapturePath: string; +}> { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-resume", + description: "resume command integration test", + roles: { + worker: { + description: "Worker role", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + reviewer: { + description: "Reviewer role", + goal: "Review", + capabilities: [], + procedure: "review", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Please clarify: {{{question}}}", + location: null, + }, + ok: { role: "reviewer", prompt: "Review the work", location: null }, + }, + reviewer: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test resume task", + cwd: tmpDir, + }); + + await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + + const outputHash = await store.put(outputSchemaHash, { + $status: "needs_input", + question: "Which API?", + }); + const detailHash = await store.put(schemas.text, "mock detail"); + + const startedAtMs = 1716600000000; + const completedAtMs = 1716600001500; + + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs, + completedAtMs, + cwd: tmpDir, + assembledPrompt: null, + }); + + await saveThreadsIndex(tmpDir, { + [THREAD_ID]: { + head: stepHash, + suspendedRole: "worker", + suspendMessage: SUSPEND_MESSAGE, + }, + }); + + const promptCapturePath = join(tmpDir, "captured-prompt.txt"); + const mockAgentPath = join(tmpDir, "mock-agent.sh"); + + const frontmatter = + mode === "suspend" ? { $status: "needs_input", question: "Which API?" } : { $status: "ok" }; + + const adapterJson = JSON.stringify({ + stepHash: await store.put(schemas.stepNode, { + start: startHash, + prev: stepHash, + role: "worker", + output: await store.put(outputSchemaHash, frontmatter), + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "resume prompt placeholder", + startedAtMs: completedAtMs + 1, + completedAtMs: completedAtMs + 2, + cwd: tmpDir, + assembledPrompt: null, + }), + detailHash, + role: "worker", + frontmatter, + body: "", + startedAtMs: completedAtMs + 1, + completedAtMs: completedAtMs + 2, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + const configPath = join(tmpDir, "config.yaml"); + await writeFile( + configPath, + `defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`, + ); + + return { casDir, mockAgentPath, promptCapturePath }; +} + +function runUwf( + args: string[], + casDir: string, +): { stdout: string; stderr: string; status: number } { + const cliPath = join(import.meta.dirname, "..", "cli.js"); + try { + const stdout = execFileSync("bun", ["run", cliPath, ...args], { + encoding: "utf8", + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + WORKFLOW_STORAGE_ROOT: tmpDir, + UNCAGED_CAS_DIR: casDir, + }, + cwd: tmpDir, + timeout: 30000, + }); + return { stdout, stderr: "", status: 0 }; + } catch (error) { + const err = error as NodeJS.ErrnoException & { + stdout?: string | Buffer; + stderr?: string | Buffer; + status?: number; + }; + return { + stdout: typeof err.stdout === "string" ? err.stdout : (err.stdout?.toString("utf8") ?? ""), + stderr: typeof err.stderr === "string" ? err.stderr : (err.stderr?.toString("utf8") ?? ""), + status: err.status ?? 1, + }; + } +} + +describe("uwf thread resume", () => { + test("resume non-suspended thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "idle-workflow", + description: "idle thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread is not suspended"); + }); + + test("resume suspended thread executes step and becomes idle", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(result.status).toBe(0); + + const cliOutput = JSON.parse(result.stdout.trim()); + expect(cliOutput.status).toBe("idle"); + expect(cliOutput.currentRole).toBe("reviewer"); + expect(cliOutput.suspendedRole).toBeNull(); + expect(cliOutput.suspendMessage).toBeNull(); + expect(cliOutput.done).toBe(false); + + const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); + const threadsIndex = parse(threadsYaml) as Record; + expect(threadsIndex[THREAD_ID]).toBe(cliOutput.head); + + const showResult = await cmdThreadShow(tmpDir, THREAD_ID); + expect(showResult.status).toBe("idle"); + expect(showResult.suspendedRole).toBeNull(); + expect(showResult.suspendMessage).toBeNull(); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("resume without -p uses suspend message as agent prompt", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(result.status).toBe(0); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(SUSPEND_MESSAGE); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("resume with -p appends supplementary info to agent prompt", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const supplement = "Use the REST API."; + const result = runUwf( + ["thread", "resume", THREAD_ID, "-p", supplement, "--agent", mockAgentPath], + casDir, + ); + expect(result.status).toBe(0); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(`${SUSPEND_MESSAGE}\n\n${supplement}`); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("multiple suspend/resume cycles", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("suspend"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const firstResult = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(firstResult.status).toBe(0); + const firstResume = JSON.parse(firstResult.stdout.trim()); + expect(firstResume.status).toBe("suspended"); + expect(firstResume.suspendedRole).toBe("worker"); + expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE); + + const threadsAfterFirst = parse( + await readFile(join(tmpDir, "threads.yaml"), "utf8"), + ) as Record; + expect(threadsAfterFirst[THREAD_ID]).toEqual({ + head: firstResume.head, + suspendedRole: "worker", + suspendMessage: SUSPEND_MESSAGE, + }); + + const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent( + casDir, + firstResume.head as CasRef, + ); + + const secondResult = runUwf( + ["thread", "resume", THREAD_ID, "--agent", okMockAgentPath], + casDir, + ); + expect(secondResult.status).toBe(0); + const secondResume = JSON.parse(secondResult.stdout.trim()); + expect(secondResume.status).toBe("idle"); + expect(secondResume.currentRole).toBe("reviewer"); + expect(secondResume.suspendedRole).toBeNull(); + expect(secondResume.suspendMessage).toBeNull(); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(SUSPEND_MESSAGE); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); +}); + +async function setupOkMockAgent( + casDir: string, + prevHead: CasRef, +): Promise<{ mockAgentPath: string }> { + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const prevNode = store.get(prevHead); + if (prevNode === null || prevNode.type !== schemas.stepNode) { + throw new Error(`expected StepNode at ${prevHead}`); + } + const prevPayload = prevNode.payload as StepNodePayload; + + const outputHash = await store.put(outputSchemaHash, { $status: "ok" }); + const detailHash = await store.put(schemas.text, "ok detail"); + const startedAtMs = Date.now(); + const completedAtMs = startedAtMs + 1; + + const stepHash = await store.put(schemas.stepNode, { + start: prevPayload.start, + prev: prevHead, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "resume", + startedAtMs, + completedAtMs, + cwd: tmpDir, + assembledPrompt: null, + }); + + const promptCapturePath = join(tmpDir, "captured-prompt.txt"); + const mockAgentPath = join(tmpDir, "mock-agent-ok.sh"); + const adapterJson = JSON.stringify({ + stepHash, + detailHash, + role: "worker", + frontmatter: { $status: "ok" }, + body: "", + startedAtMs, + completedAtMs, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + return { mockAgentPath }; +} diff --git a/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts new file mode 100644 index 0000000..3477bad --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts @@ -0,0 +1,286 @@ +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@ocas/core"; +import type { ThreadId } from "@uncaged/workflow-protocol"; +import { createThreadIndexEntry, markThreadSuspended } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { cmdThreadList, cmdThreadShow } from "../commands/thread.js"; +import { createUwfStore, saveThreadsIndex } from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, + required: ["$status"], + additionalProperties: false, +}; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspended-display-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("suspended thread display", () => { + test("thread list shows [suspended] marker for suspended threads", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + + // Create test workflow with suspend capability + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-suspend-display", + description: "test suspended display", + roles: { + worker: { + description: "Worker role", + goal: "Work and potentially suspend", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Please provide more details: {{{question}}}", + location: null, + }, + }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Test task requiring input", + cwd: tmpDir, + }); + + // Create suspended thread + const suspendedThreadId = "01SUSPENDEDTHREAD0000000" as ThreadId; + const outputHash = await uwf.store.put(outputSchemaHash, { + $status: "needs_input", + question: "What is the target API?", + }); + const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail"); + + const stepHash = await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001500, + cwd: tmpDir, + assembledPrompt: null, + }); + + // Create suspended thread entry in threads.yaml + const suspendedEntry = markThreadSuspended( + createThreadIndexEntry(stepHash), + "worker", + "Please provide more details: What is the target API?", + ); + + // Create normal (idle) thread + const idleThreadId = "01IDLETHREAD00000000000" as ThreadId; + const idleStartHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Normal task", + cwd: tmpDir, + }); + const idleEntry = createThreadIndexEntry(idleStartHash); + + await saveThreadsIndex(tmpDir, { + [suspendedThreadId]: suspendedEntry, + [idleThreadId]: idleEntry, + }); + + // Test thread list + const listResult = await cmdThreadList(tmpDir, null, null, null, null, null); + + // Find the suspended and idle threads in results + const suspendedItem = listResult.find((item) => item.thread === suspendedThreadId); + const idleItem = listResult.find((item) => item.thread === idleThreadId); + + expect(suspendedItem).toBeDefined(); + expect(suspendedItem!.status).toBe("suspended"); + expect(suspendedItem!.statusDisplay).toBe("suspended [suspended]"); + + expect(idleItem).toBeDefined(); + expect(idleItem!.status).toBe("idle"); + expect(idleItem!.statusDisplay).toBe("idle"); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("thread show displays suspend info and resume hint", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-suspend-show", + description: "test suspended show", + roles: { + worker: { + description: "Worker role", + goal: "Work and potentially suspend", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Need clarification: {{{question}}}", + location: null, + }, + }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + cwd: tmpDir, + }); + + const threadId = "01SUSPENDSHOW000000000" as ThreadId; + const outputHash = await uwf.store.put(outputSchemaHash, { + $status: "needs_input", + question: "Which database to use?", + }); + const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail"); + + const stepHash = await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001500, + cwd: tmpDir, + assembledPrompt: null, + }); + + const suspendedEntry = markThreadSuspended( + createThreadIndexEntry(stepHash), + "worker", + "Need clarification: Which database to use?", + ); + + await saveThreadsIndex(tmpDir, { [threadId]: suspendedEntry }); + + // Test thread show + const showResult = await cmdThreadShow(tmpDir, threadId); + + expect(showResult.status).toBe("suspended"); + expect(showResult.suspendedRole).toBe("worker"); + expect(showResult.suspendMessage).toBe("Need clarification: Which database to use?"); + expect(showResult.hint).toBe( + `Thread is suspended. Resume with: uwf thread resume ${threadId}`, + ); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("non-suspended threads do not show suspend markers or hints", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-normal", + description: "test normal thread", + roles: { + worker: { + description: "Worker role", + goal: "Work normally", + capabilities: [], + procedure: "work", + output: "result", + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Normal task", + cwd: tmpDir, + }); + + const threadId = "01NORMALTHREAD000000000" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: createThreadIndexEntry(startHash) }); + + // Test thread show + const showResult = await cmdThreadShow(tmpDir, threadId); + + expect(showResult.status).toBe("idle"); + expect(showResult.suspendedRole).toBeNull(); + expect(showResult.suspendMessage).toBeNull(); + expect(showResult.hint).toBeNull(); + + // Test thread list + const listResult = await cmdThreadList(tmpDir, null, null, null, null, null); + const threadItem = listResult.find((item) => item.thread === threadId); + + expect(threadItem).toBeDefined(); + expect(threadItem!.status).toBe("idle"); + expect(threadItem!.statusDisplay).toBe("idle"); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); +}); diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index 26dc0d8..1ba0fed 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -30,6 +30,7 @@ import { cmdThreadExec, cmdThreadList, cmdThreadRead, + cmdThreadResume, cmdThreadShow, cmdThreadStart, cmdThreadStop, @@ -280,6 +281,27 @@ thread }, ); +thread + .command("resume") + .description("Resume a suspended thread and re-run the suspended role") + .argument("", "Thread ULID") + .option("-p, --prompt ", "Supplementary info to append to the resume prompt") + .option("--agent ", "Override agent command") + .action((threadId: string, opts: { prompt: string | undefined; agent: string | undefined }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const supplement = opts.prompt ?? null; + const agentOverride = opts.agent ?? null; + const result = await cmdThreadResume( + storageRoot, + threadId as ThreadId, + supplement, + agentOverride, + ); + writeOutput(result); + }); + }); + thread .command("stop") .description("Stop background execution of a thread (keep thread active)") diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index 5d62a01..f9cc665 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -200,6 +200,25 @@ const PL_AGENT_DONE = "C6P9E3H7"; const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_STEP_ERROR = "B8T5N1V6"; const PL_BACKGROUND_START = "X7Q4W9M2"; +const PL_THREAD_RESUME = "K2R7M4N8"; + +type ResumeStepConfig = { + role: string; + prompt: string; +}; + +type AgentStepTarget = { + role: string; + edgePrompt: string; + effectiveCwd: string; +}; + +function buildResumePrompt(graphPrompt: string, supplement: string | null): string { + if (supplement === null || supplement === "") { + return graphPrompt; + } + return `${graphPrompt}\n\n${supplement}`; +} function failStep(plog: ProcessLogger, message: string): never { plog.log(PL_STEP_ERROR, message, null); @@ -462,7 +481,10 @@ export async function cmdThreadStart( return { workflow: workflowHash, thread: threadId }; } -export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise { +export async function cmdThreadShow( + storageRoot: string, + threadId: ThreadId, +): Promise { const index = await loadThreadsIndex(storageRoot); const entry = index[threadId]; if (entry !== undefined) { @@ -483,6 +505,11 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr const currentRole = resolveCurrentRole(uwf, activeHead, workflow); const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); + const hint = + status === "suspended" + ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` + : null; + return { workflow, thread: threadId, @@ -493,6 +520,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr suspendMessage: suspendFields.suspendMessage, done: false, background: null, + hint, }; } @@ -510,6 +538,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr suspendMessage: null, done: true, background: null, + hint: null, }; } @@ -519,6 +548,13 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr export type ThreadListItemWithStatus = ThreadListItem & { status: ThreadStatus; currentRole: string | null; + /** Display label with status marker for suspended threads */ + statusDisplay: string; +}; + +export type ThreadShowOutput = StepOutput & { + /** Hint message for suspended threads */ + hint: string | null; }; async function threadListItemFromActive( @@ -533,6 +569,7 @@ async function threadListItemFromActive( } const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow); + const statusDisplay = status === "suspended" ? `${status} [suspended]` : status; return { thread: threadId, @@ -540,6 +577,7 @@ async function threadListItemFromActive( head, status, currentRole: resolveCurrentRole(uwf, head, workflow), + statusDisplay, }; } @@ -568,12 +606,14 @@ async function collectCompletedThreads( for (const entry of history) { if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { seen.add(entry.thread); + const status = entry.reason === "cancelled" ? "cancelled" : "completed"; items.push({ thread: entry.thread, workflow: entry.workflow, head: entry.head, - status: entry.reason === "cancelled" ? "cancelled" : "completed", + status, currentRole: null, + statusDisplay: status, }); } } @@ -1016,6 +1056,65 @@ async function archiveThread( }); } +export async function cmdThreadResume( + storageRoot: string, + threadId: ThreadId, + supplement: string | null, + agentOverride: string | null, +): Promise { + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker !== null) { + fail(`thread already executing in background (PID: ${runningMarker.pid})`); + } + + const index = await loadThreadsIndex(storageRoot); + const entry = index[threadId]; + if (entry === undefined) { + fail(`thread not active: ${threadId}`); + } + + const uwf = await createUwfStore(storageRoot); + const headHash = entry.head; + const chain = walkChain(uwf, headHash); + const workflowHash = chain.start.workflow; + + const status = await resolveActiveThreadStatus( + storageRoot, + threadId, + uwf, + headHash, + workflowHash, + ); + if (status !== "suspended") { + fail(`thread is not suspended: ${threadId} (status: ${status})`); + } + + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); + if (suspendFields.suspendedRole === null) { + fail(`thread is suspended but suspendedRole is missing: ${threadId}`); + } + if (suspendFields.suspendMessage === null) { + fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + } + + const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); + const plog = createProcessLogger({ + storageRoot, + context: { thread: threadId, workflow: workflowHash }, + }); + + plog.log( + PL_THREAD_RESUME, + `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + null, + ); + + return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { + role: suspendFields.suspendedRole, + prompt: resumePrompt, + }); +} + export async function cmdThreadExec( storageRoot: string, threadId: ThreadId, @@ -1149,25 +1248,34 @@ async function cmdThreadStepBackground( ]; } -async function cmdThreadStepOnce( +function resolveResumeStepTarget( + resume: ResumeStepConfig, + chain: ChainState, + threadCwd: string, + plog: ProcessLogger, +): AgentStepTarget { + const lastStep = chain.stepsNewestFirst[0]; + plog.log(PL_MODERATOR, `resume role=${resume.role} prompt=${resume.prompt}`, null); + return { + role: resume.role, + edgePrompt: resume.prompt, + effectiveCwd: lastStep !== undefined && lastStep.cwd !== "" ? lastStep.cwd : threadCwd, + }; +} + +async function resolveModeratorStepTarget( storageRoot: string, threadId: ThreadId, - agentOverride: string | null, + entry: ThreadIndexEntry, + headHash: CasRef, + workflowHash: CasRef, + workflow: WorkflowPayload, + uwf: UwfStore, + chain: ChainState, + threadCwd: string, plog: ProcessLogger, -): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { - failStep(plog, `thread not active: ${threadId}`); - } - const headHash = entry.head; - - const uwf = await createUwfStore(storageRoot); - const chain = walkChain(uwf, headHash); - const workflowHash = chain.start.workflow; - const workflow = loadWorkflowPayload(uwf, workflowHash); +): Promise { const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); - const nextResult = evaluate(workflow.graph, lastRole, lastOutput); if (!nextResult.ok) { failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`); @@ -1217,34 +1325,22 @@ async function cmdThreadStepOnce( }; } - const role = nextResult.value.role; - const edgePrompt = nextResult.value.prompt; + return { + role: nextResult.value.role, + edgePrompt: nextResult.value.prompt, + effectiveCwd: nextResult.value.location !== null ? nextResult.value.location : threadCwd, + }; +} - // Resolve cwd: use edge location if provided, otherwise inherit thread.cwd - const threadCwd = chain.start.cwd; - const effectiveCwd = nextResult.value.location !== null ? nextResult.value.location : threadCwd; - - const config = await loadWorkflowConfig(storageRoot); - const agent = resolveAgentConfig(config, workflow, role, agentOverride); - - plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { - args: [...agent.args, threadId, role].join(" "), - }); - - loadDotenv({ path: getEnvPath(storageRoot) }); - const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd); - const newHead = agentResult.stepHash as CasRef; - - plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null); - - // Re-create store to pick up nodes written by the agent subprocess - const uwfAfter = await createUwfStore(storageRoot); - const newNode = uwfAfter.store.get(newHead); - if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) { - failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`); - } - - // Reload threads index to avoid overwriting changes made by the agent subprocess +async function finalizeAgentStep( + storageRoot: string, + threadId: ThreadId, + workflowHash: CasRef, + workflow: WorkflowPayload, + newHead: CasRef, + uwfAfter: UwfStore, + plog: ProcessLogger, +): Promise { const freshIndex = await loadThreadsIndex(storageRoot); const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead); freshIndex[threadId] = updateThreadHead(priorEntry, newHead); @@ -1283,7 +1379,6 @@ async function cmdThreadStepOnce( await archiveThread(storageRoot, threadId, workflowHash, newHead); } - // Determine status based on whether thread is done and running state const status: ThreadStatus = done ? "completed" : "idle"; const currentRole = done ? null : afterResult.value.role; @@ -1300,6 +1395,70 @@ async function cmdThreadStepOnce( }; } +async function cmdThreadStepOnce( + storageRoot: string, + threadId: ThreadId, + agentOverride: string | null, + plog: ProcessLogger, + resume: ResumeStepConfig | null = null, +): Promise { + const index = await loadThreadsIndex(storageRoot); + const entry = index[threadId]; + if (entry === undefined) { + failStep(plog, `thread not active: ${threadId}`); + } + const headHash = entry.head; + + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + const workflowHash = chain.start.workflow; + const workflow = loadWorkflowPayload(uwf, workflowHash); + const threadCwd = chain.start.cwd; + + const targetOrOutput = + resume !== null + ? resolveResumeStepTarget(resume, chain, threadCwd, plog) + : await resolveModeratorStepTarget( + storageRoot, + threadId, + entry, + headHash, + workflowHash, + workflow, + uwf, + chain, + threadCwd, + plog, + ); + + if ("status" in targetOrOutput) { + return targetOrOutput; + } + + const { role, edgePrompt, effectiveCwd } = targetOrOutput; + + const config = await loadWorkflowConfig(storageRoot); + const agent = resolveAgentConfig(config, workflow, role, agentOverride); + + plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { + args: [...agent.args, threadId, role].join(" "), + }); + + loadDotenv({ path: getEnvPath(storageRoot) }); + const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd); + const newHead = agentResult.stepHash as CasRef; + + plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null); + + const uwfAfter = await createUwfStore(storageRoot); + const newNode = uwfAfter.store.get(newHead); + if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) { + failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`); + } + + return finalizeAgentStep(storageRoot, threadId, workflowHash, workflow, newHead, uwfAfter, plog); +} + async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const activeHead = index[threadId]?.head; diff --git a/workflows/solve-issue.yaml b/workflows/solve-issue.yaml index 6cee7f4..1b3b38c 100644 --- a/workflows/solve-issue.yaml +++ b/workflows/solve-issue.yaml @@ -285,8 +285,8 @@ graph: prompt: Analyze the issue and produce an implementation plan. planner: insufficient_info: - role: $END - prompt: Insufficient information to proceed; end the workflow. + role: $SUSPEND + prompt: "信息不足,需要补充:{{{reason}}}" ready: role: developer prompt: 'Implement the TDD test spec (CAS hash: {{{plan}}}) in repo {{{repoPath}}}. Repo remote: {{{repoRemote}}}.'