From bda3e3a861d04cd29d2863b28e34f96119b997da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Thu, 4 Jun 2026 15:13:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(cli):=20resume=20completed=20threads=20(?= =?UTF-8?q?=E8=A1=94=E5=B0=BE=E8=9B=87:=20end=20=E2=86=92=20start)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit uwf thread resume now supports completed threads: - Evaluates workflow graph from $START to find first role - Clears completed state (status → idle, completedAt → null) - Builds resume prompt with supplement context - Full CAS chain preserved for rich context Suspended resume behavior unchanged. Cancelled/idle threads still rejected. 425 tests pass. Part of #39, closes #43 --- .../cli/src/__tests__/thread-resume.test.ts | 265 +++++++++++++++++- packages/cli/src/commands/thread.ts | 82 ++++-- 2 files changed, 327 insertions(+), 20 deletions(-) diff --git a/packages/cli/src/__tests__/thread-resume.test.ts b/packages/cli/src/__tests__/thread-resume.test.ts index af9b044..9199137 100644 --- a/packages/cli/src/__tests__/thread-resume.test.ts +++ b/packages/cli/src/__tests__/thread-resume.test.ts @@ -249,7 +249,7 @@ describe("uwf thread resume", () => { const result = runUwf(["thread", "resume", THREAD_ID], casDir); expect(result.status).not.toBe(0); - expect(result.stderr).toContain("thread is not suspended"); + expect(result.stderr).toContain("thread cannot be resumed"); }); test("resume suspended thread executes step and becomes idle", async () => { @@ -448,3 +448,266 @@ echo '${adapterJson}' return { mockAgentPath }; } + +describe("uwf thread resume - completed threads", () => { + test("resume completed thread starts from $START role", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "test-completed-resume", + description: "completed thread resume test", + roles: { + worker: { + description: "Worker role", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + reviewer: { + description: "Reviewer role", + goal: "Review", + capabilities: [], + procedure: "review", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { _: { role: "reviewer", prompt: "Review the work", location: null } }, + reviewer: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Initial task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + + const workerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" }); + const reviewerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" }); + const detailHash = await store.cas.put(schemas.text, "mock detail"); + + const workerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: workerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001000, + cwd: tmpDir, + assembledPrompt: null, + }); + + const reviewerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: workerStepHash, + role: "reviewer", + output: reviewerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Review the work", + startedAtMs: 1716600001000, + completedAtMs: 1716600002000, + cwd: tmpDir, + assembledPrompt: null, + }); + + await seedThreads(tmpDir, { + [THREAD_ID]: { + head: reviewerStepHash, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: 1716600002000, + }, + }); + + // Verify the status was actually set + const { createUwfStore, getThread } = await import("../store.js"); + const verifyUwf = await createUwfStore(tmpDir); + const verifyEntry = getThread(verifyUwf.varStore, THREAD_ID); + // biome-ignore lint/nursery/noConsole: test debugging + console.log("Seeded entry status:", verifyEntry?.status); + // biome-ignore lint/nursery/noConsole: test debugging + console.log("Seeded entry:", JSON.stringify(verifyEntry, null, 2)); + + const promptCapturePath = join(tmpDir, "captured-prompt-completed.txt"); + const mockAgentPath = join(tmpDir, "mock-agent-completed.sh"); + + const newWorkerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: reviewerStepHash, + role: "worker", + output: workerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600003000, + completedAtMs: 1716600004000, + cwd: tmpDir, + assembledPrompt: null, + }); + + const adapterJson = JSON.stringify({ + stepHash: newWorkerStepHash, + detailHash, + role: "worker", + frontmatter: { $status: "_" }, + body: "", + startedAtMs: 1716600003000, + completedAtMs: 1716600004000, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + const configPath = join(tmpDir, "config.yaml"); + await writeFile( + configPath, + `defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`, + ); + + const result = runUwf( + ["thread", "resume", THREAD_ID, "-p", "Additional context", "--agent", mockAgentPath], + casDir, + ); + + if (result.status !== 0) { + // biome-ignore lint/nursery/noConsole: test debugging + console.error("Command failed:", result.stderr); + } + + expect(result.status).toBe(0); + + const cliOutput = JSON.parse(result.stdout.trim()); + expect(cliOutput.status).toBe("idle"); + expect(cliOutput.currentRole).toBe("reviewer"); + expect(cliOutput.done).toBe(false); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toContain("Previous run completed"); + expect(capturedPrompt).toContain("Additional context"); + + const storeModule = await import("../store.js"); + const uwf2 = await storeModule.createUwfStore(tmpDir); + const entry2 = storeModule.getThread(uwf2.varStore, THREAD_ID); + expect(entry2?.status).toBe("idle"); + expect(entry2?.completedAt).toBeNull(); + }); + + test("resume cancelled thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "cancelled-workflow", + description: "cancelled thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + await seedThreads(tmpDir, { + [THREAD_ID]: { + head: startHash, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }, + }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread cannot be resumed"); + expect(result.stderr).toContain("cancelled"); + }); + + test("resume idle thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "idle-workflow", + description: "idle thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + await seedThreads(tmpDir, { [THREAD_ID]: startHash }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread cannot be resumed"); + expect(result.stderr).toContain("idle"); + }); +}); diff --git a/packages/cli/src/commands/thread.ts b/packages/cli/src/commands/thread.ts index 53c6493..d2c06d6 100644 --- a/packages/cli/src/commands/thread.ts +++ b/packages/cli/src/commands/thread.ts @@ -1051,40 +1051,84 @@ export async function cmdThreadResume( const chain = walkChain(uwf, headHash); const workflowHash = chain.start.workflow; - const status = await resolveActiveThreadStatus( - storageRoot, - threadId, - uwf, - headHash, - workflowHash, - ); - if (status !== "suspended") { - fail(`thread is not suspended: ${threadId} (status: ${status})`); + // Check entry.status first for completed/cancelled (like in cmdThreadShow) + let status: ThreadStatus; + if (entry.status === "completed" || entry.status === "cancelled") { + status = entry.status; + } else { + status = await resolveActiveThreadStatus( + storageRoot, + threadId, + uwf, + headHash, + workflowHash, + ); } - const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); - if (suspendFields.suspendedRole === null) { - fail(`thread is suspended but suspendedRole is missing: ${threadId}`); - } - if (suspendFields.suspendMessage === null) { - fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + if (status !== "suspended" && status !== "completed") { + fail(`thread cannot be resumed: ${threadId} (status: ${status})`); } - const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); const plog = createProcessLogger({ storageRoot, context: { thread: threadId, workflow: workflowHash }, }); + if (status === "suspended") { + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); + if (suspendFields.suspendedRole === null) { + fail(`thread is suspended but suspendedRole is missing: ${threadId}`); + } + if (suspendFields.suspendMessage === null) { + fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + } + + const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); + + plog.log( + PL_THREAD_RESUME, + `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + null, + ); + + return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { + role: suspendFields.suspendedRole, + prompt: resumePrompt, + }); + } + + // status === "completed" + const workflow = loadWorkflowPayload(uwf, workflowHash); + const startResult = evaluate(workflow.graph, START_ROLE, {}); + if (!startResult.ok) { + fail(`failed to evaluate $START: ${startResult.error.message}`); + } + if (isSuspendResult(startResult.value)) { + fail("workflow cannot start with $SUSPEND"); + } + if (startResult.value.role === END_ROLE) { + fail("workflow cannot start with $END"); + } + + const startRole = startResult.value.role; + const completedPromptPrefix = "Previous run completed. Resuming with additional context."; + const completedResumePrompt = + supplement !== null && supplement !== "" + ? `${completedPromptPrefix}\n\n${supplement}` + : completedPromptPrefix; + + const updatedEntry = { ...entry, status: "idle" as const, completedAt: null }; + setThread(uwf.varStore, threadId, updatedEntry); + plog.log( PL_THREAD_RESUME, - `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + `resume completed role=${startRole} supplement=${supplement !== null}`, null, ); return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { - role: suspendFields.suspendedRole, - prompt: resumePrompt, + role: startRole, + prompt: completedResumePrompt, }); }