diff --git a/packages/workflow-template-develop/__tests__/develop-template.test.ts b/packages/workflow-template-develop/__tests__/develop-template.test.ts index f67adba..02c37ae 100644 --- a/packages/workflow-template-develop/__tests__/develop-template.test.ts +++ b/packages/workflow-template-develop/__tests__/develop-template.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test"; -import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime"; import { validateWorkflowDescriptor } from "@uncaged/workflow"; +import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime"; import { buildDevelopDescriptor } from "../src/descriptor.js"; import { developModerator } from "../src/index.js"; import type { CommitterMeta, PlannerMeta } from "../src/roles/index.js"; diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index 6fb6e7f..463e7b2 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -2,7 +2,12 @@ import { afterEach, describe, expect, test } from "bun:test"; import { mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import { createCasStore, createExtract, createWorkflow, validateWorkflowDescriptor } from "@uncaged/workflow"; +import { + createCasStore, + createExtract, + createWorkflow, + validateWorkflowDescriptor, +} from "@uncaged/workflow"; import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime"; import { buildSolveIssueDescriptor } from "../src/descriptor.js"; import type { DeveloperMeta } from "../src/developer.js"; diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index 9dbaf6c..cb2d92f 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -101,10 +101,10 @@ async function writeRegistryYaml(storageRoot: string, yaml: string): Promise>; - supervisorContent: string; + supervisorDecision: "continue" | "stop"; onSupervisorCall?: () => void; }): () => void { const origFetch = globalThis.fetch; @@ -114,9 +114,9 @@ function installMockExtractThenSupervisor(params: { init?: RequestInit, ): Promise => { const body = init?.body ? (JSON.parse(String(init.body)) as Record) : {}; - const tools = body.tools; - const hasTools = Array.isArray(tools) && tools.length > 0; - if (hasTools) { + const model = typeof body.model === "string" ? body.model : ""; + const isSupervisor = model.startsWith("supervisor-"); + if (!isSupervisor) { const args = params.extractArgs[extractI] ?? params.extractArgs[params.extractArgs.length - 1]; if (args === undefined) { @@ -133,7 +133,9 @@ function installMockExtractThenSupervisor(params: { params.onSupervisorCall?.(); return new Response( JSON.stringify({ - choices: [{ message: { content: params.supervisorContent } }], + choices: [ + { message: { content: JSON.stringify({ decision: params.supervisorDecision }) } }, + ], }), { status: 200, headers: { "Content-Type": "application/json" } }, ); @@ -674,7 +676,7 @@ describe("executeThread", () => { test("supervisor stops thread when interval elapses and model returns stop", async () => { restoreFetch = installMockExtractThenSupervisor({ extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }], - supervisorContent: "stop", + supervisorDecision: "stop", }); const root = await mkdtemp(join(tmpdir(), "wf-engine-sup-stop-")); @@ -725,7 +727,7 @@ describe("executeThread", () => { let supervisorCalls = 0; restoreFetch = installMockExtractThenSupervisor({ extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }], - supervisorContent: "stop", + supervisorDecision: "stop", onSupervisorCall: () => { supervisorCalls += 1; }, diff --git a/packages/workflow/__tests__/supervisor.test.ts b/packages/workflow/__tests__/supervisor.test.ts index e2a9186..3591ae8 100644 --- a/packages/workflow/__tests__/supervisor.test.ts +++ b/packages/workflow/__tests__/supervisor.test.ts @@ -1,6 +1,6 @@ import { afterEach, describe, expect, test } from "bun:test"; -import { parseSupervisorDecisionText, runSupervisor } from "../src/engine/supervisor.js"; +import { runSupervisor } from "../src/engine/supervisor.js"; import type { WorkflowConfig } from "../src/registry/index.js"; import type { LogFn } from "../src/util/index.js"; @@ -20,28 +20,23 @@ function supervisorOnlyConfig(): WorkflowConfig { }; } -describe("parseSupervisorDecisionText", () => { - test("reads continue and stop case-insensitively", () => { - expect(parseSupervisorDecisionText("continue")).toBe("continue"); - expect(parseSupervisorDecisionText("CONTINUE")).toBe("continue"); - expect(parseSupervisorDecisionText("stop")).toBe("stop"); - expect(parseSupervisorDecisionText("STOP.")).toBe("stop"); +function jsonResponse(body: Record, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, }); +} - test("finds token inside a sentence", () => { - expect(parseSupervisorDecisionText("Answer: continue")).toBe("continue"); - expect(parseSupervisorDecisionText("I recommend stop now")).toBe("stop"); - }); - - test("when both appear, earlier token wins", () => { - expect(parseSupervisorDecisionText("continue then stop")).toBe("continue"); - expect(parseSupervisorDecisionText("stop then continue")).toBe("stop"); - }); - - test("defaults to continue when unclear", () => { - expect(parseSupervisorDecisionText("maybe later")).toBe("continue"); - }); -}); +function installFetchMock(impl: (init?: RequestInit) => Promise): () => void { + const origFetch = globalThis.fetch; + globalThis.fetch = Object.assign( + async (_input: Parameters[0], init?: RequestInit) => impl(init), + { preconnect: origFetch.preconnect.bind(origFetch) }, + ) as typeof fetch; + return () => { + globalThis.fetch = origFetch; + }; +} describe("runSupervisor", () => { let restoreFetch: (() => void) | null = null; @@ -52,16 +47,9 @@ describe("runSupervisor", () => { }); test("returns continue when supervisor model cannot be resolved (no fetch)", async () => { - const origFetch = globalThis.fetch; - restoreFetch = () => { - globalThis.fetch = origFetch; - }; - globalThis.fetch = Object.assign( - async () => { - throw new Error("fetch should not run when supervisor is not configured"); - }, - { preconnect: origFetch.preconnect.bind(origFetch) }, - ) as typeof fetch; + restoreFetch = installFetchMock(async () => { + throw new Error("fetch should not run when supervisor is not configured"); + }); const config: WorkflowConfig = { maxDepth: 1, @@ -87,21 +75,27 @@ describe("runSupervisor", () => { expect(r.value).toBe("continue"); }); - test("returns stop from chat/completions assistant content", async () => { - const origFetch = globalThis.fetch; - restoreFetch = () => { - globalThis.fetch = origFetch; - }; - globalThis.fetch = Object.assign( - async () => - new Response( - JSON.stringify({ - choices: [{ message: { content: "stop" } }], - }), - { status: 200, headers: { "Content-Type": "application/json" } }, - ), - { preconnect: origFetch.preconnect.bind(origFetch) }, - ) as typeof fetch; + test("returns stop from structured tool call", async () => { + restoreFetch = installFetchMock(async () => + jsonResponse({ + choices: [ + { + message: { + tool_calls: [ + { + id: "t1", + type: "function", + function: { + name: "supervisor_decision", + arguments: JSON.stringify({ decision: "stop" }), + }, + }, + ], + }, + }, + ], + }), + ); const r = await runSupervisor({ config: supervisorOnlyConfig(), @@ -116,14 +110,44 @@ describe("runSupervisor", () => { expect(r.value).toBe("stop"); }); - test("returns err on invalid JSON body", async () => { - const origFetch = globalThis.fetch; - restoreFetch = () => { - globalThis.fetch = origFetch; - }; - globalThis.fetch = Object.assign(async () => new Response("not-json", { status: 200 }), { - preconnect: origFetch.preconnect.bind(origFetch), - }) as typeof fetch; + test("returns continue from plain JSON content (reactor short-circuit)", async () => { + restoreFetch = installFetchMock(async () => + jsonResponse({ + choices: [{ message: { content: '{"decision":"continue"}' } }], + }), + ); + + const r = await runSupervisor({ + config: supervisorOnlyConfig(), + prompt: "do Y", + recentSteps: [], + logger: noopLogger, + }); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value).toBe("continue"); + }); + + test("returns err when reactor cannot validate the schema within max rounds", async () => { + restoreFetch = installFetchMock(async () => + jsonResponse({ + choices: [{ message: { content: "not-json" } }], + }), + ); + + const r = await runSupervisor({ + config: supervisorOnlyConfig(), + prompt: "p", + recentSteps: [], + logger: noopLogger, + }); + expect(r.ok).toBe(false); + }); + + test("returns err on HTTP failure", async () => { + restoreFetch = installFetchMock(async () => new Response("boom", { status: 500 })); const r = await runSupervisor({ config: supervisorOnlyConfig(), diff --git a/packages/workflow/src/engine/supervisor.ts b/packages/workflow/src/engine/supervisor.ts index 9b151fa..cef8777 100644 --- a/packages/workflow/src/engine/supervisor.ts +++ b/packages/workflow/src/engine/supervisor.ts @@ -1,67 +1,27 @@ +import * as z from "zod/v4"; + import { resolveModel } from "../config/index.js"; +import { extractFunctionToolFromZodSchema } from "../extract/index.js"; +import { createLlmFn, createThreadReactor } from "../reactor/index.js"; import type { WorkflowConfig } from "../registry/index.js"; import { err, type LogFn, ok, type Result } from "../util/index.js"; import type { SupervisorDecision } from "./types.js"; const SUPERVISOR_RECENT_STEP_LIMIT = 12; +const SUPERVISOR_MAX_REACT_ROUNDS = 4; -function chatCompletionsUrl(baseUrl: string): string { - const trimmed = baseUrl.replace(/\/+$/, ""); - return `${trimmed}/chat/completions`; -} +const supervisorDecisionSchema = z + .object({ + decision: z.enum(["continue", "stop"]), + }) + .meta({ + title: "supervisor_decision", + description: + 'Workflow supervisor decision. "continue" when the thread is making progress; "stop" when done, looping, or stuck.', + }); -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - -function readAssistantContent(parsed: unknown): string | null { - if (!isRecord(parsed)) { - return null; - } - const choices = parsed.choices; - if (!Array.isArray(choices) || choices.length === 0) { - return null; - } - const first = choices[0]; - if (!isRecord(first)) { - return null; - } - const messageObj = first.message; - if (!isRecord(messageObj)) { - return null; - } - const content = messageObj.content; - if (typeof content !== "string") { - return null; - } - return content; -} - -/** Lenient: accepts STOP/stop/stop. as prose; prefers {@link SupervisorDecision.stop} when both tokens appear. */ -export function parseSupervisorDecisionText(text: string): SupervisorDecision { - const lower = text.toLowerCase(); - const stopWord = /\bstop\b/.test(lower); - const continueWord = /\bcontinue\b/.test(lower); - if (stopWord && continueWord) { - const si = lower.search(/\bstop\b/); - const ci = lower.search(/\bcontinue\b/); - return si <= ci ? "stop" : "continue"; - } - if (stopWord) { - return "stop"; - } - if (continueWord) { - return "continue"; - } - if (lower.includes("stop")) { - return "stop"; - } - if (lower.includes("continue")) { - return "continue"; - } - return "continue"; -} +type SupervisorThreadContext = Record; type RunSupervisorArgs = { config: WorkflowConfig; @@ -70,7 +30,13 @@ type RunSupervisorArgs = { logger: LogFn; }; -/** Calls the `supervisor` scene LLM; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */ +function buildSupervisorInput(args: RunSupervisorArgs): string { + const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT); + const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n"); + return `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`; +} + +/** Calls the `supervisor` scene via {@link createThreadReactor}; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */ export async function runSupervisor( args: RunSupervisorArgs, ): Promise> { @@ -78,63 +44,42 @@ export async function runSupervisor( if (!resolved.ok) { return ok("continue"); } - const provider = resolved.value; - const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT); - const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n"); - const body = { - model: provider.model, - messages: [ - { - role: "system" as const, - content: - 'You supervise a multi-step workflow. Decide if the thread should keep running or halt.\n\nReply with exactly one token: either "continue" (progress toward the goal, not obviously stuck) or "stop" (done, looping, or no progress). Do not add explanation.', - }, - { - role: "user" as const, - content: `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`, - }, - ], - }; + const reactor = createThreadReactor({ + llm: createLlmFn(resolved.value), + maxRounds: SUPERVISOR_MAX_REACT_ROUNDS, + staticTools: [], + structuredToolFromSchema: (schema) => { + const t = extractFunctionToolFromZodSchema(schema); + return { + name: t.name, + tool: { + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + }, + }; + }, + systemPromptForStructuredTool: (structuredToolName) => + `You supervise a multi-step workflow. Decide whether the thread should keep running or halt. Reply with "continue" when the thread is making progress toward the task, or "stop" when it is finished, looping, or no longer making progress. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"stop"}.`, + toolHandler: async (call) => `Unknown tool: ${call.function.name}`, + }); - let response: Response; - try { - response = await fetch(chatCompletionsUrl(provider.baseUrl), { - method: "POST", - headers: { - Authorization: `Bearer ${provider.apiKey}`, - "Content-Type": "application/json", - }, - body: JSON.stringify(body), - }); - } catch (cause) { - const message = cause instanceof Error ? cause.message : String(cause); - args.logger("R9CW4PLM", `supervisor request failed: ${message}`); - return err(`supervisor network error: ${message}`); + const result = await reactor({ + thread: {} as SupervisorThreadContext, + input: buildSupervisorInput(args), + schema: supervisorDecisionSchema, + }); + + if (!result.ok) { + args.logger("R9CW4PLM", `supervisor failed: ${result.error}`); + return err(`supervisor: ${result.error}`); } - const responseText = await response.text(); - if (!response.ok) { - args.logger("T3HN8VKQ", `supervisor HTTP ${response.status}: ${responseText.slice(0, 200)}`); - return err(`supervisor HTTP ${response.status}: ${responseText.slice(0, 500)}`); - } - - let parsed: unknown; - try { - parsed = JSON.parse(responseText) as unknown; - } catch (cause) { - const message = cause instanceof Error ? cause.message : String(cause); - args.logger("W7BQ2NXM", `supervisor response is not JSON: ${message}`); - return err(`supervisor invalid JSON: ${message}`); - } - - const content = readAssistantContent(parsed); - if (content === null || content.trim() === "") { - args.logger("Y4JX9PKW", "supervisor returned empty assistant content"); - return err("supervisor empty assistant content"); - } - - const decision = parseSupervisorDecisionText(content); + const decision: SupervisorDecision = result.value.decision; args.logger("Z8KM5QWT", `supervisor says ${decision}`); return ok(decision); }