From b0ef9c55a9792b68b80d804b4a9a47dfb39cc148 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E5=A2=A8?= Date: Tue, 2 Jun 2026 04:39:29 +0000 Subject: [PATCH] feat: moderator recognizes $SUSPEND as pseudo-role target - Add GraphPseudoRole type ($END | $SUSPEND) to workflow-protocol - Add 'suspended' to ThreadStatus - evaluate() returns EvaluateSuspendResult for $SUSPEND targets - Thread show/list derive suspended status from moderator evaluation - validate-semantic treats $SUSPEND like $END (valid target, no outgoing edges) - Tests: routing to $SUSPEND, mustache rendering, thread status display Closes #588 --- .../src/__tests__/moderator-evaluate.test.ts | 43 ++++++ .../src/__tests__/thread-show-status.test.ts | 124 +++++++++++++++++- packages/cli-workflow/src/cli.ts | 4 +- packages/cli-workflow/src/commands/thread.ts | 100 ++++++++++++-- .../src/moderator/__tests__/evaluate.test.ts | 11 +- .../cli-workflow/src/moderator/evaluate.ts | 12 ++ packages/cli-workflow/src/moderator/index.ts | 7 +- packages/cli-workflow/src/moderator/types.ts | 19 ++- .../cli-workflow/src/validate-semantic.ts | 13 +- packages/workflow-protocol/src/index.ts | 1 + packages/workflow-protocol/src/schemas.ts | 2 +- packages/workflow-protocol/src/types.ts | 12 +- 12 files changed, 316 insertions(+), 32 deletions(-) diff --git a/packages/cli-workflow/src/__tests__/moderator-evaluate.test.ts b/packages/cli-workflow/src/__tests__/moderator-evaluate.test.ts index eab29fc..97e414e 100644 --- a/packages/cli-workflow/src/__tests__/moderator-evaluate.test.ts +++ b/packages/cli-workflow/src/__tests__/moderator-evaluate.test.ts @@ -51,6 +51,49 @@ describe("evaluate", () => { }); }); + test("status-based routing (needs input → $SUSPEND)", () => { + const graph: Record> = { + ...solveIssueGraph, + reviewer: { + ...solveIssueGraph.reviewer, + needs_input: { role: "$SUSPEND", prompt: "Waiting for user input.", location: null }, + }, + }; + const result = evaluate(graph, "reviewer", { $status: "needs_input" }); + expect(result).toEqual({ + ok: true, + value: { + action: "suspend", + suspendedRole: "reviewer", + prompt: "Waiting for user input.", + }, + }); + }); + + test("$SUSPEND prompt template renders mustache variables", () => { + const graph: Record> = { + reviewer: { + needs_input: { + role: "$SUSPEND", + prompt: "Please clarify: {{{question}}}", + location: null, + }, + }, + }; + const result = evaluate(graph, "reviewer", { + $status: "needs_input", + question: "Which API endpoint?", + }); + expect(result).toEqual({ + ok: true, + value: { + action: "suspend", + suspendedRole: "reviewer", + prompt: "Please clarify: Which API endpoint?", + }, + }); + }); + test("missing role in graph → error", () => { const result = evaluate(solveIssueGraph, "unknown-role", { $status: "_" }); expect(result.ok).toBe(false); diff --git a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts index 7f8360c..80137f7 100644 --- a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts @@ -1,11 +1,25 @@ import { mkdir, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import type { ThreadId } from "@uncaged/workflow-protocol"; +import { putSchema } from "@ocas/core"; +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; import { describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; -import { appendThreadHistory, loadThreadsIndex } from "../store.js"; +import { + appendThreadHistory, + createUwfStore, + loadThreadsIndex, + saveThreadsIndex, +} from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, +}; const TEST_WORKFLOW_YAML = ` name: test-status @@ -36,6 +50,76 @@ graph: location: null `; +const SUSPEND_WORKFLOW_YAML = ` +name: test-suspend-status +description: Test workflow for suspended status +roles: + worker: + description: Worker role + goal: Work + capabilities: ["coding"] + procedure: Work + output: | + $status: "needs_input" + question: "Which API?" + frontmatter: + oneOf: + - type: object + required: ["$status", "question"] + properties: + $status: { const: "needs_input" } + question: { type: string } +graph: + $START: + _: + role: worker + prompt: "Start work" + location: null + worker: + needs_input: + role: $SUSPEND + prompt: "Please clarify: {{{question}}}" + location: null +`; + +async function insertStepNode( + storageRoot: string, + threadId: ThreadId, + role: string, + outputPayload: Record, +): Promise { + const uwf = await createUwfStore(storageRoot); + const index = await loadThreadsIndex(storageRoot); + const head = index[threadId]; + if (head === undefined) throw new Error(`thread ${threadId} not in index`); + + const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); + const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); + const detailHash = await uwf.store.put(uwf.schemas.text, "detail-placeholder"); + + const headNode = uwf.store.get(head); + if (headNode === null) throw new Error(`head ${head} not found`); + const isStart = headNode.type === uwf.schemas.startNode; + const startHash = isStart ? head : (headNode.payload as { start: CasRef }).start; + + const stepHash = (await uwf.store.put(uwf.schemas.stepNode, { + start: startHash, + prev: isStart ? null : head, + role, + output: outputHash, + detail: detailHash, + agent: "uwf-test", + edgePrompt: "edge", + startedAtMs: Date.now(), + completedAtMs: Date.now() + 1, + cwd: "/tmp", + assembledPrompt: null, + })) as CasRef; + + index[threadId] = stepHash; + await saveThreadsIndex(storageRoot, index); +} + describe("thread show status field", () => { let tmpDir: string; let storageRoot: string; @@ -224,4 +308,40 @@ describe("thread show status field", () => { await teardown(); }); + + test("active suspended thread shows status 'suspended'", async () => { + await setupTestEnv(); + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const workflowPath = join(tmpDir, "test-suspend-status.yaml"); + await writeFile(workflowPath, SUSPEND_WORKFLOW_YAML, "utf8"); + + const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir); + const threadId = startResult.thread as ThreadId; + + await insertStepNode(storageRoot, threadId, "worker", { + $status: "needs_input", + question: "Which API?", + }); + + const result = await cmdThreadShow(storageRoot, threadId); + + expect(result.status).toBe("suspended"); + expect(result.done).toBe(false); + expect(result.currentRole).toBe(null); + expect(result.background).toBe(null); + expect(result.thread).toBe(threadId); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + await teardown(); + } + }); }); diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index 81e1ff0..26dc0d8 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -189,11 +189,11 @@ function parseStatusFilter(status: string | undefined): ThreadStatus[] | null { if (raw === "active") return ["idle", "running"]; const parts = raw.split(",").map((s) => s.trim()); - const validStatuses: ThreadStatus[] = ["idle", "running", "completed", "cancelled"]; + const validStatuses: ThreadStatus[] = ["idle", "running", "suspended", "completed", "cancelled"]; for (const part of parts) { if (!validStatuses.includes(part as ThreadStatus)) { process.stderr.write( - `Invalid status: ${part}. Must be one of: idle, running, completed, cancelled, active\n`, + `Invalid status: ${part}. Must be one of: idle, running, suspended, completed, cancelled, active\n`, ); process.exit(1); } diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index f738322..92e4c43 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -29,7 +29,7 @@ import { config as loadDotenv } from "dotenv"; import { parse } from "yaml"; import { createMarker, deleteMarker, isThreadRunning } from "../background/index.js"; import { createIncludeTag } from "../include.js"; -import { evaluate } from "../moderator/index.js"; +import { evaluate, isSuspendResult } from "../moderator/index.js"; import { appendThreadHistory, createUwfStore, @@ -58,9 +58,56 @@ const END_ROLE = "$END"; const START_ROLE = "$START"; export const THREAD_READ_DEFAULT_QUOTA = 4000; +function buildStepOutputFromEvaluation( + workflowHash: CasRef, + threadId: ThreadId, + head: CasRef, + status: ThreadStatus, + evaluation: ReturnType, + background: boolean | null, +): StepOutput { + const done = status === "completed"; + let currentRole: string | null = null; + if (evaluation.ok && !isSuspendResult(evaluation.value) && evaluation.value.role !== END_ROLE) { + currentRole = evaluation.value.role; + } + return { + workflow: workflowHash, + thread: threadId, + head, + status, + currentRole, + done, + background, + }; +} + +async function resolveActiveThreadStatus( + storageRoot: string, + threadId: ThreadId, + uwf: UwfStore, + head: CasRef, + workflowRef: CasRef, +): Promise { + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker !== null) { + return "running"; + } + + const chain = walkChain(uwf, head); + const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); + const workflow = loadWorkflowPayload(uwf, workflowRef); + const result = evaluate(workflow.graph, lastRole, lastOutput); + if (result.ok && isSuspendResult(result.value)) { + return "suspended"; + } + + return "idle"; +} + /** * Derive the current/next role from the workflow graph and chain state. - * Returns null when the next role is $END or evaluation fails. + * Returns null when the next role is $END, thread is suspended, or evaluation fails. */ function resolveCurrentRole(uwf: UwfStore, head: CasRef, workflowRef: CasRef): string | null { const chain = walkChain(uwf, head); @@ -70,7 +117,10 @@ function resolveCurrentRole(uwf: UwfStore, head: CasRef, workflowRef: CasRef): s if (!result.ok) { return null; } - return result.value.role === END_ROLE ? null : result.value.role; + if (isSuspendResult(result.value) || result.value.role === END_ROLE) { + return null; + } + return result.value.role; } const PL_THREAD_START = "7HNQ4B2X"; @@ -352,9 +402,13 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr fail(`failed to resolve workflow from head: ${activeHead}`); } - // Check if thread is running - const runningMarker = await isThreadRunning(storageRoot, threadId); - const status: ThreadStatus = runningMarker !== null ? "running" : "idle"; + const status = await resolveActiveThreadStatus( + storageRoot, + threadId, + uwf, + activeHead, + workflow, + ); const currentRole = resolveCurrentRole(uwf, activeHead, workflow); return { @@ -402,9 +456,7 @@ async function threadListItemFromActive( return null; } - // Check if thread is currently running in background - const runningMarker = await isThreadRunning(storageRoot, threadId); - const status: ThreadStatus = runningMarker !== null ? "running" : "idle"; + const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow); return { thread: threadId, @@ -941,7 +993,7 @@ export async function cmdThreadExec( for (let i = 0; i < count; i++) { const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog); results.push(result); - if (result.done) { + if (result.done || result.status === "suspended") { break; } } @@ -1048,10 +1100,25 @@ async function cmdThreadStepOnce( plog.log( PL_MODERATOR, - `moderator role=${nextResult.value.role} prompt=${nextResult.value.prompt}`, + `moderator ${ + isSuspendResult(nextResult.value) + ? `action=suspend suspendedRole=${nextResult.value.suspendedRole}` + : `role=${nextResult.value.role}` + } prompt=${nextResult.value.prompt}`, null, ); + if (isSuspendResult(nextResult.value)) { + return buildStepOutputFromEvaluation( + workflowHash, + threadId, + headHash, + "suspended", + nextResult, + null, + ); + } + if (nextResult.value.role === END_ROLE) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null); await archiveThread(storageRoot, threadId, workflowHash, headHash); @@ -1108,6 +1175,17 @@ async function cmdThreadStepOnce( failStep(plog, `post-step moderator evaluate failed: ${afterResult.error.message}`); } + if (isSuspendResult(afterResult.value)) { + return buildStepOutputFromEvaluation( + workflowHash, + threadId, + newHead, + "suspended", + afterResult, + null, + ); + } + const done = afterResult.value.role === END_ROLE; if (done) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null); diff --git a/packages/cli-workflow/src/moderator/__tests__/evaluate.test.ts b/packages/cli-workflow/src/moderator/__tests__/evaluate.test.ts index 8cc0a1f..8290bbb 100644 --- a/packages/cli-workflow/src/moderator/__tests__/evaluate.test.ts +++ b/packages/cli-workflow/src/moderator/__tests__/evaluate.test.ts @@ -1,5 +1,6 @@ import { describe, expect, test } from "vitest"; import { evaluate } from "../evaluate.js"; +import { isSuspendResult } from "../types.js"; describe("Edge prompt template variable resolution", () => { test("returns error when rendered prompt is empty string", () => { @@ -107,7 +108,7 @@ describe("Moderator location resolution", () => { const result = evaluate(graph, "planner", { $status: "ready" }); expect(result.ok).toBe(true); - if (result.ok) { + if (result.ok && !isSuspendResult(result.value)) { expect(result.value.location).toBe(null); } }); @@ -126,7 +127,7 @@ describe("Moderator location resolution", () => { const result = evaluate(graph, "planner", { $status: "ready" }); expect(result.ok).toBe(true); - if (result.ok) { + if (result.ok && !isSuspendResult(result.value)) { expect(result.value.location).toBe("/static/path"); } }); @@ -148,7 +149,7 @@ describe("Moderator location resolution", () => { }); expect(result.ok).toBe(true); - if (result.ok) { + if (result.ok && !isSuspendResult(result.value)) { expect(result.value.location).toBe("/home/user/repo"); } }); @@ -171,7 +172,7 @@ describe("Moderator location resolution", () => { }); expect(result.ok).toBe(true); - if (result.ok) { + if (result.ok && !isSuspendResult(result.value)) { expect(result.value.location).toBe("/home/user/myproject"); } }); @@ -190,7 +191,7 @@ describe("Moderator location resolution", () => { const result = evaluate(graph, "planner", { $status: "ready" }); expect(result.ok).toBe(true); - if (result.ok) { + if (result.ok && !isSuspendResult(result.value)) { // Mustache renders missing variables as empty string expect(result.value.location).toBe(""); } diff --git a/packages/cli-workflow/src/moderator/evaluate.ts b/packages/cli-workflow/src/moderator/evaluate.ts index 1f9bad4..3ca7621 100644 --- a/packages/cli-workflow/src/moderator/evaluate.ts +++ b/packages/cli-workflow/src/moderator/evaluate.ts @@ -7,6 +7,7 @@ import type { EvaluateResult, Result } from "./types.js"; mustache.escape = (text: string) => text; const START_ROLE = "$START"; +const SUSPEND_ROLE = "$SUSPEND"; const UNIT_STATUS = "_"; type LastOutput = Record; @@ -51,6 +52,17 @@ export function evaluate( ), }; } + if (target.role === SUSPEND_ROLE) { + return { + ok: true, + value: { + action: "suspend", + suspendedRole: lastRole, + prompt, + }, + }; + } + const location = target.location !== null ? mustache.render(target.location, lastOutput) : null; return { ok: true, value: { role: target.role, prompt, location } }; } catch (error) { diff --git a/packages/cli-workflow/src/moderator/index.ts b/packages/cli-workflow/src/moderator/index.ts index 91dabbc..5bf3f82 100644 --- a/packages/cli-workflow/src/moderator/index.ts +++ b/packages/cli-workflow/src/moderator/index.ts @@ -1,2 +1,7 @@ export { evaluate } from "./evaluate.js"; -export type { EvaluateResult } from "./types.js"; +export type { + EvaluateResult, + EvaluateRouteResult, + EvaluateSuspendResult, +} from "./types.js"; +export { isSuspendResult } from "./types.js"; diff --git a/packages/cli-workflow/src/moderator/types.ts b/packages/cli-workflow/src/moderator/types.ts index b9c533a..248334a 100644 --- a/packages/cli-workflow/src/moderator/types.ts +++ b/packages/cli-workflow/src/moderator/types.ts @@ -1,9 +1,24 @@ export type Result = { ok: true; value: T } | { ok: false; error: E }; -/** The result of moderator evaluation — which role to go to, and the edge prompt. */ -export type EvaluateResult = { +/** Moderator routes the thread to a real role (or `$END`). */ +export type EvaluateRouteResult = { role: string; prompt: string; /** Resolved working directory from edge location field (null = inherit thread cwd). */ location: string | null; }; + +/** Moderator routes the thread to `$SUSPEND` — waiting for external input. */ +export type EvaluateSuspendResult = { + action: "suspend"; + /** Role whose output triggered the suspend transition. */ + suspendedRole: string; + prompt: string; +}; + +/** The result of moderator evaluation. */ +export type EvaluateResult = EvaluateRouteResult | EvaluateSuspendResult; + +export function isSuspendResult(result: EvaluateResult): result is EvaluateSuspendResult { + return "action" in result && result.action === "suspend"; +} diff --git a/packages/cli-workflow/src/validate-semantic.ts b/packages/cli-workflow/src/validate-semantic.ts index 5c38393..b6e7a57 100644 --- a/packages/cli-workflow/src/validate-semantic.ts +++ b/packages/cli-workflow/src/validate-semantic.ts @@ -2,7 +2,8 @@ import type { WorkflowPayload } from "@uncaged/workflow-protocol"; type SchemaObj = Record; -const RESERVED_NAMES = new Set(["$START", "$END"]); +const RESERVED_NAMES = new Set(["$START", "$END", "$SUSPEND"]); +const PSEUDO_TARGETS = new Set(["$END", "$SUSPEND"]); /** Extract mustache variable names from a prompt string. */ function extractMustacheVars(prompt: string): string[] { @@ -110,9 +111,13 @@ function checkGraphStructure(payload: WorkflowPayload, errors: string[]): void { errors.push("$END must not have outgoing edges"); } + if (graphNodes.has("$SUSPEND")) { + errors.push("$SUSPEND must not have outgoing edges"); + } + for (const [node, statusMap] of Object.entries(payload.graph)) { for (const [status, target] of Object.entries(statusMap)) { - if (target.role !== "$END" && !roleNames.has(target.role)) { + if (!PSEUDO_TARGETS.has(target.role) && !roleNames.has(target.role)) { errors.push(`edge ${node}→${status}: unknown target role "${target.role}"`); } } @@ -129,7 +134,7 @@ function collectReachableRoles(graph: WorkflowPayload["graph"]): Set { const queue: string[] = []; for (const target of Object.values(startEdges)) { - if (target.role !== "$END" && !reachable.has(target.role)) { + if (!PSEUDO_TARGETS.has(target.role) && !reachable.has(target.role)) { reachable.add(target.role); queue.push(target.role); } @@ -140,7 +145,7 @@ function collectReachableRoles(graph: WorkflowPayload["graph"]): Set { const edges = graph[current]; if (!edges) continue; for (const target of Object.values(edges)) { - if (target.role !== "$END" && !reachable.has(target.role)) { + if (!PSEUDO_TARGETS.has(target.role) && !reachable.has(target.role)) { reachable.add(target.role); queue.push(target.role); } diff --git a/packages/workflow-protocol/src/index.ts b/packages/workflow-protocol/src/index.ts index 30c5829..bd9a358 100644 --- a/packages/workflow-protocol/src/index.ts +++ b/packages/workflow-protocol/src/index.ts @@ -7,6 +7,7 @@ export type { AgentAlias, AgentConfig, CasRef, + GraphPseudoRole, ModelAlias, ModelConfig, ModeratorContext, diff --git a/packages/workflow-protocol/src/schemas.ts b/packages/workflow-protocol/src/schemas.ts index 81d062b..13c6e09 100644 --- a/packages/workflow-protocol/src/schemas.ts +++ b/packages/workflow-protocol/src/schemas.ts @@ -18,7 +18,7 @@ const TARGET: JSONSchema = { type: "object", required: ["role", "prompt"], properties: { - role: { type: "string" }, + role: { type: "string", description: "Role name or pseudo-role ($END, $SUSPEND)" }, prompt: { type: "string" }, location: { anyOf: [{ type: "string" }, { type: "null" }], diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index b0e52ed..87259e1 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -35,8 +35,12 @@ export type RoleDefinition = { frontmatter: CasRef; }; +/** Pseudo-role targets in workflow graph edges (not real roles). */ +export type GraphPseudoRole = "$END" | "$SUSPEND"; + export type Target = { - role: string; + /** Next role name, or a graph pseudo-role such as `$END` or `$SUSPEND`. */ + role: string | GraphPseudoRole; prompt: string; /** Optional working directory override via mustache template. */ location: string | null; @@ -79,7 +83,7 @@ export type ModeratorContext = { // ── 4.5 CLI 输出 ──────────────────────────────────────────────────── /** Thread status — unified status representation */ -export type ThreadStatus = "idle" | "running" | "completed" | "cancelled"; +export type ThreadStatus = "idle" | "running" | "suspended" | "completed" | "cancelled"; /** uwf thread start */ export type StartOutput = { @@ -90,7 +94,7 @@ export type StartOutput = { /** * Output from thread show and thread exec commands. * - * @property status - Current thread status (idle/running/completed/cancelled) + * @property status - Current thread status (idle/running/suspended/completed/cancelled) * @property done - @deprecated Use status field instead. True if thread is completed or cancelled. * @property background - @deprecated Use status field instead. Always null in current implementation. */ @@ -99,7 +103,7 @@ export type StepOutput = { thread: ThreadId; head: CasRef; status: ThreadStatus; - /** The current or next role. Null when completed, cancelled, or next is $END. */ + /** The current or next role. Null when completed, cancelled, suspended, or next is $END. */ currentRole: string | null; done: boolean; background: boolean | null;