diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts index 0a29d83..fa4d9fc 100644 --- a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -2,6 +2,7 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; +import { createThreadIndexEntry } from "@uncaged/workflow-protocol"; import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; @@ -45,9 +46,15 @@ async function createTestThread( const startPayload = { workflow: workflowHash, prompt: "test prompt", + cwd: storageRoot, }; const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); - await saveThreadsIndex(storageRoot, { [threadId]: headHash }); + + // Load existing index and add new thread + const existingIndex = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); + existingIndex[threadId] = createThreadIndexEntry(headHash); + await saveThreadsIndex(storageRoot, existingIndex); + return threadId; } diff --git a/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts new file mode 100644 index 0000000..3477bad --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts @@ -0,0 +1,286 @@ +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@ocas/core"; +import type { ThreadId } from "@uncaged/workflow-protocol"; +import { createThreadIndexEntry, markThreadSuspended } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { cmdThreadList, cmdThreadShow } from "../commands/thread.js"; +import { createUwfStore, saveThreadsIndex } from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, + required: ["$status"], + additionalProperties: false, +}; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspended-display-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("suspended thread display", () => { + test("thread list shows [suspended] marker for suspended threads", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + + // Create test workflow with suspend capability + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-suspend-display", + description: "test suspended display", + roles: { + worker: { + description: "Worker role", + goal: "Work and potentially suspend", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Please provide more details: {{{question}}}", + location: null, + }, + }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Test task requiring input", + cwd: tmpDir, + }); + + // Create suspended thread + const suspendedThreadId = "01SUSPENDEDTHREAD0000000" as ThreadId; + const outputHash = await uwf.store.put(outputSchemaHash, { + $status: "needs_input", + question: "What is the target API?", + }); + const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail"); + + const stepHash = await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001500, + cwd: tmpDir, + assembledPrompt: null, + }); + + // Create suspended thread entry in threads.yaml + const suspendedEntry = markThreadSuspended( + createThreadIndexEntry(stepHash), + "worker", + "Please provide more details: What is the target API?", + ); + + // Create normal (idle) thread + const idleThreadId = "01IDLETHREAD00000000000" as ThreadId; + const idleStartHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Normal task", + cwd: tmpDir, + }); + const idleEntry = createThreadIndexEntry(idleStartHash); + + await saveThreadsIndex(tmpDir, { + [suspendedThreadId]: suspendedEntry, + [idleThreadId]: idleEntry, + }); + + // Test thread list + const listResult = await cmdThreadList(tmpDir, null, null, null, null, null); + + // Find the suspended and idle threads in results + const suspendedItem = listResult.find((item) => item.thread === suspendedThreadId); + const idleItem = listResult.find((item) => item.thread === idleThreadId); + + expect(suspendedItem).toBeDefined(); + expect(suspendedItem!.status).toBe("suspended"); + expect(suspendedItem!.statusDisplay).toBe("suspended [suspended]"); + + expect(idleItem).toBeDefined(); + expect(idleItem!.status).toBe("idle"); + expect(idleItem!.statusDisplay).toBe("idle"); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("thread show displays suspend info and resume hint", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-suspend-show", + description: "test suspended show", + roles: { + worker: { + description: "Worker role", + goal: "Work and potentially suspend", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Need clarification: {{{question}}}", + location: null, + }, + }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + cwd: tmpDir, + }); + + const threadId = "01SUSPENDSHOW000000000" as ThreadId; + const outputHash = await uwf.store.put(outputSchemaHash, { + $status: "needs_input", + question: "Which database to use?", + }); + const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail"); + + const stepHash = await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001500, + cwd: tmpDir, + assembledPrompt: null, + }); + + const suspendedEntry = markThreadSuspended( + createThreadIndexEntry(stepHash), + "worker", + "Need clarification: Which database to use?", + ); + + await saveThreadsIndex(tmpDir, { [threadId]: suspendedEntry }); + + // Test thread show + const showResult = await cmdThreadShow(tmpDir, threadId); + + expect(showResult.status).toBe("suspended"); + expect(showResult.suspendedRole).toBe("worker"); + expect(showResult.suspendMessage).toBe("Need clarification: Which database to use?"); + expect(showResult.hint).toBe( + `Thread is suspended. Resume with: uwf thread resume ${threadId}`, + ); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("non-suspended threads do not show suspend markers or hints", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const uwf = await createUwfStore(tmpDir); + + const workflowHash = await uwf.store.put(uwf.schemas.workflow, { + name: "test-normal", + description: "test normal thread", + roles: { + worker: { + description: "Worker role", + goal: "Work normally", + capabilities: [], + procedure: "work", + output: "result", + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + }, + }); + + const startHash = await uwf.store.put(uwf.schemas.startNode, { + workflow: workflowHash, + prompt: "Normal task", + cwd: tmpDir, + }); + + const threadId = "01NORMALTHREAD000000000" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: createThreadIndexEntry(startHash) }); + + // Test thread show + const showResult = await cmdThreadShow(tmpDir, threadId); + + expect(showResult.status).toBe("idle"); + expect(showResult.suspendedRole).toBeNull(); + expect(showResult.suspendMessage).toBeNull(); + expect(showResult.hint).toBeNull(); + + // Test thread list + const listResult = await cmdThreadList(tmpDir, null, null, null, null, null); + const threadItem = listResult.find((item) => item.thread === threadId); + + expect(threadItem).toBeDefined(); + expect(threadItem!.status).toBe("idle"); + expect(threadItem!.statusDisplay).toBe("idle"); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); +}); diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index 018ce87..f9cc665 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -481,7 +481,10 @@ export async function cmdThreadStart( return { workflow: workflowHash, thread: threadId }; } -export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise { +export async function cmdThreadShow( + storageRoot: string, + threadId: ThreadId, +): Promise { const index = await loadThreadsIndex(storageRoot); const entry = index[threadId]; if (entry !== undefined) { @@ -502,6 +505,11 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr const currentRole = resolveCurrentRole(uwf, activeHead, workflow); const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); + const hint = + status === "suspended" + ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` + : null; + return { workflow, thread: threadId, @@ -512,6 +520,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr suspendMessage: suspendFields.suspendMessage, done: false, background: null, + hint, }; } @@ -529,6 +538,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr suspendMessage: null, done: true, background: null, + hint: null, }; } @@ -538,6 +548,13 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr export type ThreadListItemWithStatus = ThreadListItem & { status: ThreadStatus; currentRole: string | null; + /** Display label with status marker for suspended threads */ + statusDisplay: string; +}; + +export type ThreadShowOutput = StepOutput & { + /** Hint message for suspended threads */ + hint: string | null; }; async function threadListItemFromActive( @@ -552,6 +569,7 @@ async function threadListItemFromActive( } const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow); + const statusDisplay = status === "suspended" ? `${status} [suspended]` : status; return { thread: threadId, @@ -559,6 +577,7 @@ async function threadListItemFromActive( head, status, currentRole: resolveCurrentRole(uwf, head, workflow), + statusDisplay, }; } @@ -587,12 +606,14 @@ async function collectCompletedThreads( for (const entry of history) { if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { seen.add(entry.thread); + const status = entry.reason === "cancelled" ? "cancelled" : "completed"; items.push({ thread: entry.thread, workflow: entry.workflow, head: entry.head, - status: entry.reason === "cancelled" ? "cancelled" : "completed", + status, currentRole: null, + statusDisplay: status, }); } }