Merge pull request 'feat: thread list/show displays suspended state and message' (#596) from feat/591-thread-list-suspended into main

This commit is contained in:
2026-06-02 05:12:12 +00:00
3 changed files with 317 additions and 3 deletions
@@ -2,6 +2,7 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { createThreadIndexEntry } from "@uncaged/workflow-protocol";
import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util"; import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util";
import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js"; import { createMarker, deleteMarker } from "../background/index.js";
@@ -45,9 +46,15 @@ async function createTestThread(
const startPayload = { const startPayload = {
workflow: workflowHash, workflow: workflowHash,
prompt: "test prompt", prompt: "test prompt",
cwd: storageRoot,
}; };
const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload);
await saveThreadsIndex(storageRoot, { [threadId]: headHash });
// Load existing index and add new thread
const existingIndex = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
existingIndex[threadId] = createThreadIndexEntry(headHash);
await saveThreadsIndex(storageRoot, existingIndex);
return threadId; return threadId;
} }
@@ -0,0 +1,286 @@
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@ocas/core";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { createThreadIndexEntry, markThreadSuspended } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdThreadList, cmdThreadShow } from "../commands/thread.js";
import { createUwfStore, saveThreadsIndex } from "../store.js";
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const },
question: { type: "string" as const },
},
required: ["$status"],
additionalProperties: false,
};
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspended-display-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
describe("suspended thread display", () => {
test("thread list shows [suspended] marker for suspended threads", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const originalCasDir = process.env.UNCAGED_CAS_DIR;
process.env.UNCAGED_CAS_DIR = casDir;
try {
const uwf = await createUwfStore(tmpDir);
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
// Create test workflow with suspend capability
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-suspend-display",
description: "test suspended display",
roles: {
worker: {
description: "Worker role",
goal: "Work and potentially suspend",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: {
needs_input: {
role: "$SUSPEND",
prompt: "Please provide more details: {{{question}}}",
location: null,
},
},
},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Test task requiring input",
cwd: tmpDir,
});
// Create suspended thread
const suspendedThreadId = "01SUSPENDEDTHREAD0000000" as ThreadId;
const outputHash = await uwf.store.put(outputSchemaHash, {
$status: "needs_input",
question: "What is the target API?",
});
const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail");
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600000000,
completedAtMs: 1716600001500,
cwd: tmpDir,
assembledPrompt: null,
});
// Create suspended thread entry in threads.yaml
const suspendedEntry = markThreadSuspended(
createThreadIndexEntry(stepHash),
"worker",
"Please provide more details: What is the target API?",
);
// Create normal (idle) thread
const idleThreadId = "01IDLETHREAD00000000000" as ThreadId;
const idleStartHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Normal task",
cwd: tmpDir,
});
const idleEntry = createThreadIndexEntry(idleStartHash);
await saveThreadsIndex(tmpDir, {
[suspendedThreadId]: suspendedEntry,
[idleThreadId]: idleEntry,
});
// Test thread list
const listResult = await cmdThreadList(tmpDir, null, null, null, null, null);
// Find the suspended and idle threads in results
const suspendedItem = listResult.find((item) => item.thread === suspendedThreadId);
const idleItem = listResult.find((item) => item.thread === idleThreadId);
expect(suspendedItem).toBeDefined();
expect(suspendedItem!.status).toBe("suspended");
expect(suspendedItem!.statusDisplay).toBe("suspended [suspended]");
expect(idleItem).toBeDefined();
expect(idleItem!.status).toBe("idle");
expect(idleItem!.statusDisplay).toBe("idle");
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
test("thread show displays suspend info and resume hint", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const originalCasDir = process.env.UNCAGED_CAS_DIR;
process.env.UNCAGED_CAS_DIR = casDir;
try {
const uwf = await createUwfStore(tmpDir);
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-suspend-show",
description: "test suspended show",
roles: {
worker: {
description: "Worker role",
goal: "Work and potentially suspend",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: {
needs_input: {
role: "$SUSPEND",
prompt: "Need clarification: {{{question}}}",
location: null,
},
},
},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Test task",
cwd: tmpDir,
});
const threadId = "01SUSPENDSHOW000000000" as ThreadId;
const outputHash = await uwf.store.put(outputSchemaHash, {
$status: "needs_input",
question: "Which database to use?",
});
const detailHash = await uwf.store.put(uwf.schemas.text, "mock detail");
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600000000,
completedAtMs: 1716600001500,
cwd: tmpDir,
assembledPrompt: null,
});
const suspendedEntry = markThreadSuspended(
createThreadIndexEntry(stepHash),
"worker",
"Need clarification: Which database to use?",
);
await saveThreadsIndex(tmpDir, { [threadId]: suspendedEntry });
// Test thread show
const showResult = await cmdThreadShow(tmpDir, threadId);
expect(showResult.status).toBe("suspended");
expect(showResult.suspendedRole).toBe("worker");
expect(showResult.suspendMessage).toBe("Need clarification: Which database to use?");
expect(showResult.hint).toBe(
`Thread is suspended. Resume with: uwf thread resume ${threadId}`,
);
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
test("non-suspended threads do not show suspend markers or hints", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const originalCasDir = process.env.UNCAGED_CAS_DIR;
process.env.UNCAGED_CAS_DIR = casDir;
try {
const uwf = await createUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-normal",
description: "test normal thread",
roles: {
worker: {
description: "Worker role",
goal: "Work normally",
capabilities: [],
procedure: "work",
output: "result",
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Normal task",
cwd: tmpDir,
});
const threadId = "01NORMALTHREAD000000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: createThreadIndexEntry(startHash) });
// Test thread show
const showResult = await cmdThreadShow(tmpDir, threadId);
expect(showResult.status).toBe("idle");
expect(showResult.suspendedRole).toBeNull();
expect(showResult.suspendMessage).toBeNull();
expect(showResult.hint).toBeNull();
// Test thread list
const listResult = await cmdThreadList(tmpDir, null, null, null, null, null);
const threadItem = listResult.find((item) => item.thread === threadId);
expect(threadItem).toBeDefined();
expect(threadItem!.status).toBe("idle");
expect(threadItem!.statusDisplay).toBe("idle");
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
});
+23 -2
View File
@@ -481,7 +481,10 @@ export async function cmdThreadStart(
return { workflow: workflowHash, thread: threadId }; return { workflow: workflowHash, thread: threadId };
} }
export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise<StepOutput> { export async function cmdThreadShow(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadShowOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const entry = index[threadId]; const entry = index[threadId];
if (entry !== undefined) { if (entry !== undefined) {
@@ -502,6 +505,11 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
const currentRole = resolveCurrentRole(uwf, activeHead, workflow); const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
const hint =
status === "suspended"
? `Thread is suspended. Resume with: uwf thread resume ${threadId}`
: null;
return { return {
workflow, workflow,
thread: threadId, thread: threadId,
@@ -512,6 +520,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
suspendMessage: suspendFields.suspendMessage, suspendMessage: suspendFields.suspendMessage,
done: false, done: false,
background: null, background: null,
hint,
}; };
} }
@@ -529,6 +538,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
suspendMessage: null, suspendMessage: null,
done: true, done: true,
background: null, background: null,
hint: null,
}; };
} }
@@ -538,6 +548,13 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
export type ThreadListItemWithStatus = ThreadListItem & { export type ThreadListItemWithStatus = ThreadListItem & {
status: ThreadStatus; status: ThreadStatus;
currentRole: string | null; currentRole: string | null;
/** Display label with status marker for suspended threads */
statusDisplay: string;
};
export type ThreadShowOutput = StepOutput & {
/** Hint message for suspended threads */
hint: string | null;
}; };
async function threadListItemFromActive( async function threadListItemFromActive(
@@ -552,6 +569,7 @@ async function threadListItemFromActive(
} }
const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow); const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow);
const statusDisplay = status === "suspended" ? `${status} [suspended]` : status;
return { return {
thread: threadId, thread: threadId,
@@ -559,6 +577,7 @@ async function threadListItemFromActive(
head, head,
status, status,
currentRole: resolveCurrentRole(uwf, head, workflow), currentRole: resolveCurrentRole(uwf, head, workflow),
statusDisplay,
}; };
} }
@@ -587,12 +606,14 @@ async function collectCompletedThreads(
for (const entry of history) { for (const entry of history) {
if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) {
seen.add(entry.thread); seen.add(entry.thread);
const status = entry.reason === "cancelled" ? "cancelled" : "completed";
items.push({ items.push({
thread: entry.thread, thread: entry.thread,
workflow: entry.workflow, workflow: entry.workflow,
head: entry.head, head: entry.head,
status: entry.reason === "cancelled" ? "cancelled" : "completed", status,
currentRole: null, currentRole: null,
statusDisplay: status,
}); });
} }
} }