From f61b3957274db217de461e47445ca9fce6874643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Thu, 28 May 2026 01:52:09 +0000 Subject: [PATCH] feat(cli): add currentRole field to thread show and thread list output (#571) Add currentRole: string | null to StepOutput and ThreadListItemWithStatus. - idle/running: derives next role via evaluate() on workflow graph - completed/cancelled: null - as next role: null Includes 9 test cases covering all status combinations and conditional routing. --- .../src/__tests__/current-role.test.ts | 442 ++++++++++++++++++ packages/cli-workflow/src/commands/thread.ts | 34 +- packages/workflow-protocol/src/types.ts | 2 + 3 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 packages/cli-workflow/src/__tests__/current-role.test.ts diff --git a/packages/cli-workflow/src/__tests__/current-role.test.ts b/packages/cli-workflow/src/__tests__/current-role.test.ts new file mode 100644 index 0000000..05652e2 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/current-role.test.ts @@ -0,0 +1,442 @@ +import { mkdir, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@uncaged/json-cas"; +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; +import { describe, expect, test } from "vitest"; +import { createMarker, deleteMarker } from "../background/index.js"; +import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; +import { + appendThreadHistory, + createUwfStore, + loadThreadsIndex, + saveThreadsIndex, +} from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + }, +}; + +const SIMPLE_WORKFLOW_YAML = ` +name: test-current-role +description: Test workflow for currentRole +roles: + roleA: + description: First role + goal: Do A + capabilities: ["coding"] + procedure: Do A + output: | + $status: "ready" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string, enum: ["ready", "not-ready"] } + roleB: + description: Second role + goal: Do B + capabilities: ["coding"] + procedure: Do B + output: | + $status: "done" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string } +graph: + $START: + _: + role: roleA + prompt: "Do A" + location: null + roleA: + ready: + role: roleB + prompt: "Do B" + location: null + not-ready: + role: roleA + prompt: "Try again" + location: null + roleB: + _: + role: $END + prompt: "Done" + location: null +`; + +const CONDITIONAL_WORKFLOW_YAML = ` +name: test-conditional-role +description: Conditional routing workflow +roles: + roleA: + description: First role + goal: Do A + capabilities: ["coding"] + procedure: Do A + output: | + $status: "pass" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string, enum: ["pass", "fail"] } + roleB: + description: Pass role + goal: Do B + capabilities: ["coding"] + procedure: Do B + output: | + $status: "done" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string } + roleC: + description: Fail role + goal: Do C + capabilities: ["coding"] + procedure: Do C + output: | + $status: "done" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string } +graph: + $START: + _: + role: roleA + prompt: "Do A" + location: null + roleA: + pass: + role: roleB + prompt: "Do B (pass)" + location: null + fail: + role: roleC + prompt: "Do C (fail)" + location: null + roleB: + _: + role: $END + prompt: "Done" + location: null + roleC: + _: + role: $END + prompt: "Done" + location: null +`; + +const SINGLE_ROLE_WORKFLOW_YAML = ` +name: test-single-role +description: Single role that goes to END +roles: + worker: + description: Worker + goal: Work + capabilities: ["coding"] + procedure: Work + output: | + $status: "done" + frontmatter: + type: object + required: ["$status"] + properties: + $status: { type: string } +graph: + $START: + _: + role: worker + prompt: "Work" + location: null + worker: + _: + role: $END + prompt: "Done" + location: null +`; + +/** Helper: insert a completed step node after the current head. */ +async function insertStepNode( + storageRoot: string, + threadId: ThreadId, + role: string, + outputPayload: Record, +): Promise { + const uwf = await createUwfStore(storageRoot); + const index = await loadThreadsIndex(storageRoot); + const head = index[threadId]; + if (head === undefined) throw new Error(`thread ${threadId} not in index`); + + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); + + // Use text schema for detail (simple placeholder) + const detailHash = await uwf.store.put(uwf.schemas.text, "detail-placeholder"); + + // Resolve start hash from head + const headNode = uwf.store.get(head); + if (headNode === null) throw new Error(`head ${head} not found`); + const isStart = headNode.type === uwf.schemas.startNode; + const startHash = isStart ? head : (headNode.payload as { start: CasRef }).start; + + const stepHash = (await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: isStart ? null : head, + role, + prompt: `Do ${role}`, + output: outputHash, + detail: detailHash, + })) as CasRef; + + index[threadId] = stepHash; + await saveThreadsIndex(storageRoot, index); +} + +describe("currentRole field", () => { + let tmpDir: string; + let storageRoot: string; + + async function setup() { + tmpDir = join( + tmpdir(), + `uwf-test-current-role-${Date.now()}-${Math.random().toString(36).slice(2)}`, + ); + storageRoot = join(tmpDir, "storage"); + await mkdir(storageRoot, { recursive: true }); + } + + async function teardown() { + if (tmpDir) { + await rm(tmpDir, { recursive: true, force: true }); + } + } + + // T1: idle at start — currentRole = first role from graph + test("thread show — idle at start returns first role as currentRole", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + + const result = await cmdThreadShow(storageRoot, thread as ThreadId); + expect(result.status).toBe("idle"); + expect(result.currentRole).toBe("roleA"); + } finally { + await teardown(); + } + }); + + // T2: idle after one step — currentRole = next role + test("thread show — idle after step returns next role as currentRole", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + + await insertStepNode(storageRoot, thread as ThreadId, "roleA", { $status: "ready" }); + + const result = await cmdThreadShow(storageRoot, thread as ThreadId); + expect(result.status).toBe("idle"); + expect(result.currentRole).toBe("roleB"); + } finally { + await teardown(); + } + }); + + // T3: completed → currentRole = null + test("thread show — completed thread returns null currentRole", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + const tid = thread as ThreadId; + + const index = await loadThreadsIndex(storageRoot); + const head = index[tid]!; + delete index[tid]; + await saveThreadsIndex(storageRoot, index); + await appendThreadHistory(storageRoot, { + thread: tid, + workflow, + head, + completedAt: Date.now(), + reason: "completed", + }); + + const result = await cmdThreadShow(storageRoot, tid); + expect(result.status).toBe("completed"); + expect(result.currentRole).toBe(null); + } finally { + await teardown(); + } + }); + + // T4: cancelled → currentRole = null + test("thread show — cancelled thread returns null currentRole", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + const tid = thread as ThreadId; + + const index = await loadThreadsIndex(storageRoot); + const head = index[tid]!; + delete index[tid]; + await saveThreadsIndex(storageRoot, index); + await appendThreadHistory(storageRoot, { + thread: tid, + workflow, + head, + completedAt: Date.now(), + reason: "cancelled", + }); + + const result = await cmdThreadShow(storageRoot, tid); + expect(result.status).toBe("cancelled"); + expect(result.currentRole).toBe(null); + } finally { + await teardown(); + } + }); + + // T5: running → currentRole = role being executed + test("thread show — running thread returns current role", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + const tid = thread as ThreadId; + + await createMarker(storageRoot, { + thread: tid, + workflow, + pid: process.pid, + startedAt: Date.now(), + }); + + try { + const result = await cmdThreadShow(storageRoot, tid); + expect(result.status).toBe("running"); + expect(result.currentRole).toBe("roleA"); + } finally { + await deleteMarker(storageRoot, tid); + } + } finally { + await teardown(); + } + }); + + // T6: thread list — mixed statuses with correct currentRole + test("thread list — returns correct currentRole for each status", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + + // idle thread + const idle = await cmdThreadStart(storageRoot, wf, "idle", tmpDir); + const idleId = idle.thread as ThreadId; + + // completed thread + const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); + const compId = comp.thread as ThreadId; + const index = await loadThreadsIndex(storageRoot); + const compHead = index[compId]!; + delete index[compId]; + await saveThreadsIndex(storageRoot, index); + await appendThreadHistory(storageRoot, { + thread: compId, + workflow: comp.workflow, + head: compHead, + completedAt: Date.now(), + reason: "completed", + }); + + const list = await cmdThreadList(storageRoot, null, null, null, 0, 100); + + const idleItem = list.find((i) => i.thread === idleId); + expect(idleItem).toBeDefined(); + expect(idleItem!.currentRole).toBe("roleA"); + + const compItem = list.find((i) => i.thread === compId); + expect(compItem).toBeDefined(); + expect(compItem!.currentRole).toBe(null); + } finally { + await teardown(); + } + }); + + // T7: thread list — idle at start has correct currentRole + test("thread list — idle thread at start has correct currentRole", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-current-role.yaml"); + await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); + const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + + const list = await cmdThreadList(storageRoot, null, null, null, 0, 100); + const item = list.find((i) => i.thread === (thread as ThreadId)); + expect(item).toBeDefined(); + expect(item!.currentRole).toBe("roleA"); + } finally { + await teardown(); + } + }); + + // T8: conditional routing — $status=pass vs fail + test("thread show — conditional routing selects correct next role", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-conditional-role.yaml"); + await writeFile(wf, CONDITIONAL_WORKFLOW_YAML, "utf8"); + + // pass path + const t1 = await cmdThreadStart(storageRoot, wf, "pass test", tmpDir); + await insertStepNode(storageRoot, t1.thread as ThreadId, "roleA", { $status: "pass" }); + const r1 = await cmdThreadShow(storageRoot, t1.thread as ThreadId); + expect(r1.currentRole).toBe("roleB"); + + // fail path + const t2 = await cmdThreadStart(storageRoot, wf, "fail test", tmpDir); + await insertStepNode(storageRoot, t2.thread as ThreadId, "roleA", { $status: "fail" }); + const r2 = await cmdThreadShow(storageRoot, t2.thread as ThreadId); + expect(r2.currentRole).toBe("roleC"); + } finally { + await teardown(); + } + }); + + // T9: next role is $END → currentRole = null + test("thread show — when next is $END, currentRole is null", async () => { + await setup(); + try { + const wf = join(tmpDir, "test-single-role.yaml"); + await writeFile(wf, SINGLE_ROLE_WORKFLOW_YAML, "utf8"); + + const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); + // worker → _ maps to $END + await insertStepNode(storageRoot, thread as ThreadId, "worker", {}); + + const result = await cmdThreadShow(storageRoot, thread as ThreadId); + expect(result.currentRole).toBe(null); + } finally { + await teardown(); + } + }); +}); diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index c3b245d..67dc39e 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -57,6 +57,21 @@ const END_ROLE = "$END"; const START_ROLE = "$START"; export const THREAD_READ_DEFAULT_QUOTA = 4000; +/** + * Derive the current/next role from the workflow graph and chain state. + * Returns null when the next role is $END or evaluation fails. + */ +function resolveCurrentRole(uwf: UwfStore, head: CasRef, workflowRef: CasRef): string | null { + const chain = walkChain(uwf, head); + const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); + const workflow = loadWorkflowPayload(uwf, workflowRef); + const result = evaluate(workflow.graph, lastRole, lastOutput); + if (!result.ok) { + return null; + } + return result.value.role === END_ROLE ? null : result.value.role; +} + const PL_THREAD_START = "7HNQ4B2X"; const PL_MODERATOR = "M3K8V9T1"; const PL_AGENT_SPAWN = "R5J2W8N4"; @@ -321,12 +336,14 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr // Check if thread is running const runningMarker = await isThreadRunning(storageRoot, threadId); const status: ThreadStatus = runningMarker !== null ? "running" : "idle"; + const currentRole = resolveCurrentRole(uwf, activeHead, workflow); return { workflow, thread: threadId, head: activeHead, status, + currentRole, done: false, background: null, }; @@ -341,6 +358,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr thread: threadId, head: hist.head, status, + currentRole: null, done: true, background: null, }; @@ -351,6 +369,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr export type ThreadListItemWithStatus = ThreadListItem & { status: ThreadStatus; + currentRole: string | null; }; async function threadListItemFromActive( @@ -368,7 +387,13 @@ async function threadListItemFromActive( const runningMarker = await isThreadRunning(storageRoot, threadId); const status: ThreadStatus = runningMarker !== null ? "running" : "idle"; - return { thread: threadId, workflow, head, status }; + return { + thread: threadId, + workflow, + head, + status, + currentRole: resolveCurrentRole(uwf, head, workflow), + }; } async function collectActiveThreads( @@ -406,6 +431,7 @@ async function collectCompletedThreads( workflow: entry.workflow, head: entry.head, status: entry.reason === "cancelled" ? "cancelled" : "completed", + currentRole: null, }); } } @@ -938,6 +964,8 @@ async function cmdThreadStepBackground( failStep(plog, `thread not active: ${threadId}`); } + const uwf = await createUwfStore(storageRoot); + // Spawn detached background process const scriptPath = process.argv[1]; if (scriptPath === undefined) { @@ -969,6 +997,7 @@ async function cmdThreadStepBackground( thread: threadId, head: headHash, status: "running", + currentRole: resolveCurrentRole(uwf, headHash, workflowHash), done: false, background: true, }, @@ -1012,6 +1041,7 @@ async function cmdThreadStepOnce( thread: threadId, head: headHash, status: "completed", + currentRole: null, done: true, background: null, }; @@ -1067,12 +1097,14 @@ async function cmdThreadStepOnce( // Determine status based on whether thread is done and running state const status: ThreadStatus = done ? "completed" : "idle"; + const currentRole = done ? null : afterResult.value.role; return { workflow: workflowHash, thread: threadId, head: newHead, status, + currentRole, done, background: null, }; diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index e26717d..0236266 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -97,6 +97,8 @@ export type StepOutput = { thread: ThreadId; head: CasRef; status: ThreadStatus; + /** The current or next role. Null when completed, cancelled, or next is $END. */ + currentRole: string | null; done: boolean; background: boolean | null; }; -- 2.43.0