feat(cli): add currentRole field to thread show and thread list output (#572)

Co-authored-by: 小橘 <xiaoju@shazhou.work>
Co-committed-by: 小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-28 01:58:23 +00:00
committed by xiaoju
parent abc9dcfc5a
commit 0b20e88317
3 changed files with 477 additions and 1 deletions
@@ -0,0 +1,442 @@
import { mkdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@uncaged/json-cas";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import {
appendThreadHistory,
createUwfStore,
loadThreadsIndex,
saveThreadsIndex,
} from "../store.js";
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const },
},
};
const SIMPLE_WORKFLOW_YAML = `
name: test-current-role
description: Test workflow for currentRole
roles:
roleA:
description: First role
goal: Do A
capabilities: ["coding"]
procedure: Do A
output: |
$status: "ready"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string, enum: ["ready", "not-ready"] }
roleB:
description: Second role
goal: Do B
capabilities: ["coding"]
procedure: Do B
output: |
$status: "done"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string }
graph:
$START:
_:
role: roleA
prompt: "Do A"
location: null
roleA:
ready:
role: roleB
prompt: "Do B"
location: null
not-ready:
role: roleA
prompt: "Try again"
location: null
roleB:
_:
role: $END
prompt: "Done"
location: null
`;
const CONDITIONAL_WORKFLOW_YAML = `
name: test-conditional-role
description: Conditional routing workflow
roles:
roleA:
description: First role
goal: Do A
capabilities: ["coding"]
procedure: Do A
output: |
$status: "pass"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string, enum: ["pass", "fail"] }
roleB:
description: Pass role
goal: Do B
capabilities: ["coding"]
procedure: Do B
output: |
$status: "done"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string }
roleC:
description: Fail role
goal: Do C
capabilities: ["coding"]
procedure: Do C
output: |
$status: "done"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string }
graph:
$START:
_:
role: roleA
prompt: "Do A"
location: null
roleA:
pass:
role: roleB
prompt: "Do B (pass)"
location: null
fail:
role: roleC
prompt: "Do C (fail)"
location: null
roleB:
_:
role: $END
prompt: "Done"
location: null
roleC:
_:
role: $END
prompt: "Done"
location: null
`;
const SINGLE_ROLE_WORKFLOW_YAML = `
name: test-single-role
description: Single role that goes to END
roles:
worker:
description: Worker
goal: Work
capabilities: ["coding"]
procedure: Work
output: |
$status: "done"
frontmatter:
type: object
required: ["$status"]
properties:
$status: { type: string }
graph:
$START:
_:
role: worker
prompt: "Work"
location: null
worker:
_:
role: $END
prompt: "Done"
location: null
`;
/** Helper: insert a completed step node after the current head. */
async function insertStepNode(
storageRoot: string,
threadId: ThreadId,
role: string,
outputPayload: Record<string, unknown>,
): Promise<void> {
const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) throw new Error(`thread ${threadId} not in index`);
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const outputHash = await uwf.store.put(outputSchemaHash, outputPayload);
// Use text schema for detail (simple placeholder)
const detailHash = await uwf.store.put(uwf.schemas.text, "detail-placeholder");
// Resolve start hash from head
const headNode = uwf.store.get(head);
if (headNode === null) throw new Error(`head ${head} not found`);
const isStart = headNode.type === uwf.schemas.startNode;
const startHash = isStart ? head : (headNode.payload as { start: CasRef }).start;
const stepHash = (await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: isStart ? null : head,
role,
prompt: `Do ${role}`,
output: outputHash,
detail: detailHash,
})) as CasRef;
index[threadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
}
describe("currentRole field", () => {
let tmpDir: string;
let storageRoot: string;
async function setup() {
tmpDir = join(
tmpdir(),
`uwf-test-current-role-${Date.now()}-${Math.random().toString(36).slice(2)}`,
);
storageRoot = join(tmpDir, "storage");
await mkdir(storageRoot, { recursive: true });
}
async function teardown() {
if (tmpDir) {
await rm(tmpDir, { recursive: true, force: true });
}
}
// T1: idle at start — currentRole = first role from graph
test("thread show — idle at start returns first role as currentRole", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const result = await cmdThreadShow(storageRoot, thread as ThreadId);
expect(result.status).toBe("idle");
expect(result.currentRole).toBe("roleA");
} finally {
await teardown();
}
});
// T2: idle after one step — currentRole = next role
test("thread show — idle after step returns next role as currentRole", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
await insertStepNode(storageRoot, thread as ThreadId, "roleA", { $status: "ready" });
const result = await cmdThreadShow(storageRoot, thread as ThreadId);
expect(result.status).toBe("idle");
expect(result.currentRole).toBe("roleB");
} finally {
await teardown();
}
});
// T3: completed → currentRole = null
test("thread show — completed thread returns null currentRole", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!;
delete index[tid];
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("completed");
expect(result.currentRole).toBe(null);
} finally {
await teardown();
}
});
// T4: cancelled → currentRole = null
test("thread show — cancelled thread returns null currentRole", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!;
delete index[tid];
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
});
const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("cancelled");
expect(result.currentRole).toBe(null);
} finally {
await teardown();
}
});
// T5: running → currentRole = role being executed
test("thread show — running thread returns current role", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId;
await createMarker(storageRoot, {
thread: tid,
workflow,
pid: process.pid,
startedAt: Date.now(),
});
try {
const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("running");
expect(result.currentRole).toBe("roleA");
} finally {
await deleteMarker(storageRoot, tid);
}
} finally {
await teardown();
}
});
// T6: thread list — mixed statuses with correct currentRole
test("thread list — returns correct currentRole for each status", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
// idle thread
const idle = await cmdThreadStart(storageRoot, wf, "idle", tmpDir);
const idleId = idle.thread as ThreadId;
// completed thread
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId;
const index = await loadThreadsIndex(storageRoot);
const compHead = index[compId]!;
delete index[compId];
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, {
thread: compId,
workflow: comp.workflow,
head: compHead,
completedAt: Date.now(),
reason: "completed",
});
const list = await cmdThreadList(storageRoot, null, null, null, 0, 100);
const idleItem = list.find((i) => i.thread === idleId);
expect(idleItem).toBeDefined();
expect(idleItem!.currentRole).toBe("roleA");
const compItem = list.find((i) => i.thread === compId);
expect(compItem).toBeDefined();
expect(compItem!.currentRole).toBe(null);
} finally {
await teardown();
}
});
// T7: thread list — idle at start has correct currentRole
test("thread list — idle thread at start has correct currentRole", async () => {
await setup();
try {
const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const list = await cmdThreadList(storageRoot, null, null, null, 0, 100);
const item = list.find((i) => i.thread === (thread as ThreadId));
expect(item).toBeDefined();
expect(item!.currentRole).toBe("roleA");
} finally {
await teardown();
}
});
// T8: conditional routing — $status=pass vs fail
test("thread show — conditional routing selects correct next role", async () => {
await setup();
try {
const wf = join(tmpDir, "test-conditional-role.yaml");
await writeFile(wf, CONDITIONAL_WORKFLOW_YAML, "utf8");
// pass path
const t1 = await cmdThreadStart(storageRoot, wf, "pass test", tmpDir);
await insertStepNode(storageRoot, t1.thread as ThreadId, "roleA", { $status: "pass" });
const r1 = await cmdThreadShow(storageRoot, t1.thread as ThreadId);
expect(r1.currentRole).toBe("roleB");
// fail path
const t2 = await cmdThreadStart(storageRoot, wf, "fail test", tmpDir);
await insertStepNode(storageRoot, t2.thread as ThreadId, "roleA", { $status: "fail" });
const r2 = await cmdThreadShow(storageRoot, t2.thread as ThreadId);
expect(r2.currentRole).toBe("roleC");
} finally {
await teardown();
}
});
// T9: next role is $END → currentRole = null
test("thread show — when next is $END, currentRole is null", async () => {
await setup();
try {
const wf = join(tmpDir, "test-single-role.yaml");
await writeFile(wf, SINGLE_ROLE_WORKFLOW_YAML, "utf8");
const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
// worker → _ maps to $END
await insertStepNode(storageRoot, thread as ThreadId, "worker", {});
const result = await cmdThreadShow(storageRoot, thread as ThreadId);
expect(result.currentRole).toBe(null);
} finally {
await teardown();
}
});
});
+33 -1
View File
@@ -57,6 +57,21 @@ const END_ROLE = "$END";
const START_ROLE = "$START";
export const THREAD_READ_DEFAULT_QUOTA = 4000;
/**
* Derive the current/next role from the workflow graph and chain state.
* Returns null when the next role is $END or evaluation fails.
*/
function resolveCurrentRole(uwf: UwfStore, head: CasRef, workflowRef: CasRef): string | null {
const chain = walkChain(uwf, head);
const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain);
const workflow = loadWorkflowPayload(uwf, workflowRef);
const result = evaluate(workflow.graph, lastRole, lastOutput);
if (!result.ok) {
return null;
}
return result.value.role === END_ROLE ? null : result.value.role;
}
const PL_THREAD_START = "7HNQ4B2X";
const PL_MODERATOR = "M3K8V9T1";
const PL_AGENT_SPAWN = "R5J2W8N4";
@@ -321,12 +336,14 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
// Check if thread is running
const runningMarker = await isThreadRunning(storageRoot, threadId);
const status: ThreadStatus = runningMarker !== null ? "running" : "idle";
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
return {
workflow,
thread: threadId,
head: activeHead,
status,
currentRole,
done: false,
background: null,
};
@@ -341,6 +358,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId,
head: hist.head,
status,
currentRole: null,
done: true,
background: null,
};
@@ -351,6 +369,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
export type ThreadListItemWithStatus = ThreadListItem & {
status: ThreadStatus;
currentRole: string | null;
};
async function threadListItemFromActive(
@@ -368,7 +387,13 @@ async function threadListItemFromActive(
const runningMarker = await isThreadRunning(storageRoot, threadId);
const status: ThreadStatus = runningMarker !== null ? "running" : "idle";
return { thread: threadId, workflow, head, status };
return {
thread: threadId,
workflow,
head,
status,
currentRole: resolveCurrentRole(uwf, head, workflow),
};
}
async function collectActiveThreads(
@@ -406,6 +431,7 @@ async function collectCompletedThreads(
workflow: entry.workflow,
head: entry.head,
status: entry.reason === "cancelled" ? "cancelled" : "completed",
currentRole: null,
});
}
}
@@ -938,6 +964,8 @@ async function cmdThreadStepBackground(
failStep(plog, `thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
// Spawn detached background process
const scriptPath = process.argv[1];
if (scriptPath === undefined) {
@@ -969,6 +997,7 @@ async function cmdThreadStepBackground(
thread: threadId,
head: headHash,
status: "running",
currentRole: resolveCurrentRole(uwf, headHash, workflowHash),
done: false,
background: true,
},
@@ -1012,6 +1041,7 @@ async function cmdThreadStepOnce(
thread: threadId,
head: headHash,
status: "completed",
currentRole: null,
done: true,
background: null,
};
@@ -1067,12 +1097,14 @@ async function cmdThreadStepOnce(
// Determine status based on whether thread is done and running state
const status: ThreadStatus = done ? "completed" : "idle";
const currentRole = done ? null : afterResult.value.role;
return {
workflow: workflowHash,
thread: threadId,
head: newHead,
status,
currentRole,
done,
background: null,
};
+2
View File
@@ -97,6 +97,8 @@ export type StepOutput = {
thread: ThreadId;
head: CasRef;
status: ThreadStatus;
/** The current or next role. Null when completed, cancelled, or next is $END. */
currentRole: string | null;
done: boolean;
background: boolean | null;
};