feat(workflow): migrate supervisor to ThreadReactor (Phase 2)
- Rewrite supervisor to use createThreadReactor + createLlmFn - No direct fetch/HTTP calls in supervisor - All 266 tests passing Refs #139, relates #141
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
|
||||
import { validateWorkflowDescriptor } from "@uncaged/workflow";
|
||||
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
|
||||
import { buildDevelopDescriptor } from "../src/descriptor.js";
|
||||
import { developModerator } from "../src/index.js";
|
||||
import type { CommitterMeta, PlannerMeta } from "../src/roles/index.js";
|
||||
|
||||
@@ -2,7 +2,12 @@ import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { mkdtemp, rm } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { createCasStore, createExtract, createWorkflow, validateWorkflowDescriptor } from "@uncaged/workflow";
|
||||
import {
|
||||
createCasStore,
|
||||
createExtract,
|
||||
createWorkflow,
|
||||
validateWorkflowDescriptor,
|
||||
} from "@uncaged/workflow";
|
||||
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
|
||||
import { buildSolveIssueDescriptor } from "../src/descriptor.js";
|
||||
import type { DeveloperMeta } from "../src/developer.js";
|
||||
|
||||
@@ -101,10 +101,10 @@ async function writeRegistryYaml(storageRoot: string, yaml: string): Promise<voi
|
||||
await writeFile(join(storageRoot, "workflow.yaml"), yaml, "utf8");
|
||||
}
|
||||
|
||||
/** Extract rounds reply with schema-shaped JSON in `content`; supervisor uses plain `content` (no tools advertised). */
|
||||
/** Extract and supervisor both run via {@link createThreadReactor}; differentiate by `body.model`. */
|
||||
function installMockExtractThenSupervisor(params: {
|
||||
extractArgs: ReadonlyArray<Record<string, unknown>>;
|
||||
supervisorContent: string;
|
||||
supervisorDecision: "continue" | "stop";
|
||||
onSupervisorCall?: () => void;
|
||||
}): () => void {
|
||||
const origFetch = globalThis.fetch;
|
||||
@@ -114,9 +114,9 @@ function installMockExtractThenSupervisor(params: {
|
||||
init?: RequestInit,
|
||||
): Promise<Response> => {
|
||||
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
|
||||
const tools = body.tools;
|
||||
const hasTools = Array.isArray(tools) && tools.length > 0;
|
||||
if (hasTools) {
|
||||
const model = typeof body.model === "string" ? body.model : "";
|
||||
const isSupervisor = model.startsWith("supervisor-");
|
||||
if (!isSupervisor) {
|
||||
const args =
|
||||
params.extractArgs[extractI] ?? params.extractArgs[params.extractArgs.length - 1];
|
||||
if (args === undefined) {
|
||||
@@ -133,7 +133,9 @@ function installMockExtractThenSupervisor(params: {
|
||||
params.onSupervisorCall?.();
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
choices: [{ message: { content: params.supervisorContent } }],
|
||||
choices: [
|
||||
{ message: { content: JSON.stringify({ decision: params.supervisorDecision }) } },
|
||||
],
|
||||
}),
|
||||
{ status: 200, headers: { "Content-Type": "application/json" } },
|
||||
);
|
||||
@@ -674,7 +676,7 @@ describe("executeThread", () => {
|
||||
test("supervisor stops thread when interval elapses and model returns stop", async () => {
|
||||
restoreFetch = installMockExtractThenSupervisor({
|
||||
extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }],
|
||||
supervisorContent: "stop",
|
||||
supervisorDecision: "stop",
|
||||
});
|
||||
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-engine-sup-stop-"));
|
||||
@@ -725,7 +727,7 @@ describe("executeThread", () => {
|
||||
let supervisorCalls = 0;
|
||||
restoreFetch = installMockExtractThenSupervisor({
|
||||
extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }],
|
||||
supervisorContent: "stop",
|
||||
supervisorDecision: "stop",
|
||||
onSupervisorCall: () => {
|
||||
supervisorCalls += 1;
|
||||
},
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
|
||||
import { parseSupervisorDecisionText, runSupervisor } from "../src/engine/supervisor.js";
|
||||
import { runSupervisor } from "../src/engine/supervisor.js";
|
||||
import type { WorkflowConfig } from "../src/registry/index.js";
|
||||
import type { LogFn } from "../src/util/index.js";
|
||||
|
||||
@@ -20,28 +20,23 @@ function supervisorOnlyConfig(): WorkflowConfig {
|
||||
};
|
||||
}
|
||||
|
||||
describe("parseSupervisorDecisionText", () => {
|
||||
test("reads continue and stop case-insensitively", () => {
|
||||
expect(parseSupervisorDecisionText("continue")).toBe("continue");
|
||||
expect(parseSupervisorDecisionText("CONTINUE")).toBe("continue");
|
||||
expect(parseSupervisorDecisionText("stop")).toBe("stop");
|
||||
expect(parseSupervisorDecisionText("STOP.")).toBe("stop");
|
||||
function jsonResponse(body: Record<string, unknown>, status = 200): Response {
|
||||
return new Response(JSON.stringify(body), {
|
||||
status,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
});
|
||||
}
|
||||
|
||||
test("finds token inside a sentence", () => {
|
||||
expect(parseSupervisorDecisionText("Answer: continue")).toBe("continue");
|
||||
expect(parseSupervisorDecisionText("I recommend stop now")).toBe("stop");
|
||||
});
|
||||
|
||||
test("when both appear, earlier token wins", () => {
|
||||
expect(parseSupervisorDecisionText("continue then stop")).toBe("continue");
|
||||
expect(parseSupervisorDecisionText("stop then continue")).toBe("stop");
|
||||
});
|
||||
|
||||
test("defaults to continue when unclear", () => {
|
||||
expect(parseSupervisorDecisionText("maybe later")).toBe("continue");
|
||||
});
|
||||
});
|
||||
function installFetchMock(impl: (init?: RequestInit) => Promise<Response>): () => void {
|
||||
const origFetch = globalThis.fetch;
|
||||
globalThis.fetch = Object.assign(
|
||||
async (_input: Parameters<typeof fetch>[0], init?: RequestInit) => impl(init),
|
||||
{ preconnect: origFetch.preconnect.bind(origFetch) },
|
||||
) as typeof fetch;
|
||||
return () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
}
|
||||
|
||||
describe("runSupervisor", () => {
|
||||
let restoreFetch: (() => void) | null = null;
|
||||
@@ -52,16 +47,9 @@ describe("runSupervisor", () => {
|
||||
});
|
||||
|
||||
test("returns continue when supervisor model cannot be resolved (no fetch)", async () => {
|
||||
const origFetch = globalThis.fetch;
|
||||
restoreFetch = () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
globalThis.fetch = Object.assign(
|
||||
async () => {
|
||||
throw new Error("fetch should not run when supervisor is not configured");
|
||||
},
|
||||
{ preconnect: origFetch.preconnect.bind(origFetch) },
|
||||
) as typeof fetch;
|
||||
restoreFetch = installFetchMock(async () => {
|
||||
throw new Error("fetch should not run when supervisor is not configured");
|
||||
});
|
||||
|
||||
const config: WorkflowConfig = {
|
||||
maxDepth: 1,
|
||||
@@ -87,21 +75,27 @@ describe("runSupervisor", () => {
|
||||
expect(r.value).toBe("continue");
|
||||
});
|
||||
|
||||
test("returns stop from chat/completions assistant content", async () => {
|
||||
const origFetch = globalThis.fetch;
|
||||
restoreFetch = () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
globalThis.fetch = Object.assign(
|
||||
async () =>
|
||||
new Response(
|
||||
JSON.stringify({
|
||||
choices: [{ message: { content: "stop" } }],
|
||||
}),
|
||||
{ status: 200, headers: { "Content-Type": "application/json" } },
|
||||
),
|
||||
{ preconnect: origFetch.preconnect.bind(origFetch) },
|
||||
) as typeof fetch;
|
||||
test("returns stop from structured tool call", async () => {
|
||||
restoreFetch = installFetchMock(async () =>
|
||||
jsonResponse({
|
||||
choices: [
|
||||
{
|
||||
message: {
|
||||
tool_calls: [
|
||||
{
|
||||
id: "t1",
|
||||
type: "function",
|
||||
function: {
|
||||
name: "supervisor_decision",
|
||||
arguments: JSON.stringify({ decision: "stop" }),
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
);
|
||||
|
||||
const r = await runSupervisor({
|
||||
config: supervisorOnlyConfig(),
|
||||
@@ -116,14 +110,44 @@ describe("runSupervisor", () => {
|
||||
expect(r.value).toBe("stop");
|
||||
});
|
||||
|
||||
test("returns err on invalid JSON body", async () => {
|
||||
const origFetch = globalThis.fetch;
|
||||
restoreFetch = () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
globalThis.fetch = Object.assign(async () => new Response("not-json", { status: 200 }), {
|
||||
preconnect: origFetch.preconnect.bind(origFetch),
|
||||
}) as typeof fetch;
|
||||
test("returns continue from plain JSON content (reactor short-circuit)", async () => {
|
||||
restoreFetch = installFetchMock(async () =>
|
||||
jsonResponse({
|
||||
choices: [{ message: { content: '{"decision":"continue"}' } }],
|
||||
}),
|
||||
);
|
||||
|
||||
const r = await runSupervisor({
|
||||
config: supervisorOnlyConfig(),
|
||||
prompt: "do Y",
|
||||
recentSteps: [],
|
||||
logger: noopLogger,
|
||||
});
|
||||
expect(r.ok).toBe(true);
|
||||
if (!r.ok) {
|
||||
return;
|
||||
}
|
||||
expect(r.value).toBe("continue");
|
||||
});
|
||||
|
||||
test("returns err when reactor cannot validate the schema within max rounds", async () => {
|
||||
restoreFetch = installFetchMock(async () =>
|
||||
jsonResponse({
|
||||
choices: [{ message: { content: "not-json" } }],
|
||||
}),
|
||||
);
|
||||
|
||||
const r = await runSupervisor({
|
||||
config: supervisorOnlyConfig(),
|
||||
prompt: "p",
|
||||
recentSteps: [],
|
||||
logger: noopLogger,
|
||||
});
|
||||
expect(r.ok).toBe(false);
|
||||
});
|
||||
|
||||
test("returns err on HTTP failure", async () => {
|
||||
restoreFetch = installFetchMock(async () => new Response("boom", { status: 500 }));
|
||||
|
||||
const r = await runSupervisor({
|
||||
config: supervisorOnlyConfig(),
|
||||
|
||||
@@ -1,67 +1,27 @@
|
||||
import * as z from "zod/v4";
|
||||
|
||||
import { resolveModel } from "../config/index.js";
|
||||
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
|
||||
import { createLlmFn, createThreadReactor } from "../reactor/index.js";
|
||||
import type { WorkflowConfig } from "../registry/index.js";
|
||||
import { err, type LogFn, ok, type Result } from "../util/index.js";
|
||||
|
||||
import type { SupervisorDecision } from "./types.js";
|
||||
|
||||
const SUPERVISOR_RECENT_STEP_LIMIT = 12;
|
||||
const SUPERVISOR_MAX_REACT_ROUNDS = 4;
|
||||
|
||||
function chatCompletionsUrl(baseUrl: string): string {
|
||||
const trimmed = baseUrl.replace(/\/+$/, "");
|
||||
return `${trimmed}/chat/completions`;
|
||||
}
|
||||
const supervisorDecisionSchema = z
|
||||
.object({
|
||||
decision: z.enum(["continue", "stop"]),
|
||||
})
|
||||
.meta({
|
||||
title: "supervisor_decision",
|
||||
description:
|
||||
'Workflow supervisor decision. "continue" when the thread is making progress; "stop" when done, looping, or stuck.',
|
||||
});
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function readAssistantContent(parsed: unknown): string | null {
|
||||
if (!isRecord(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const choices = parsed.choices;
|
||||
if (!Array.isArray(choices) || choices.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const first = choices[0];
|
||||
if (!isRecord(first)) {
|
||||
return null;
|
||||
}
|
||||
const messageObj = first.message;
|
||||
if (!isRecord(messageObj)) {
|
||||
return null;
|
||||
}
|
||||
const content = messageObj.content;
|
||||
if (typeof content !== "string") {
|
||||
return null;
|
||||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
/** Lenient: accepts STOP/stop/stop. as prose; prefers {@link SupervisorDecision.stop} when both tokens appear. */
|
||||
export function parseSupervisorDecisionText(text: string): SupervisorDecision {
|
||||
const lower = text.toLowerCase();
|
||||
const stopWord = /\bstop\b/.test(lower);
|
||||
const continueWord = /\bcontinue\b/.test(lower);
|
||||
if (stopWord && continueWord) {
|
||||
const si = lower.search(/\bstop\b/);
|
||||
const ci = lower.search(/\bcontinue\b/);
|
||||
return si <= ci ? "stop" : "continue";
|
||||
}
|
||||
if (stopWord) {
|
||||
return "stop";
|
||||
}
|
||||
if (continueWord) {
|
||||
return "continue";
|
||||
}
|
||||
if (lower.includes("stop")) {
|
||||
return "stop";
|
||||
}
|
||||
if (lower.includes("continue")) {
|
||||
return "continue";
|
||||
}
|
||||
return "continue";
|
||||
}
|
||||
type SupervisorThreadContext = Record<string, never>;
|
||||
|
||||
type RunSupervisorArgs = {
|
||||
config: WorkflowConfig;
|
||||
@@ -70,7 +30,13 @@ type RunSupervisorArgs = {
|
||||
logger: LogFn;
|
||||
};
|
||||
|
||||
/** Calls the `supervisor` scene LLM; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */
|
||||
function buildSupervisorInput(args: RunSupervisorArgs): string {
|
||||
const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT);
|
||||
const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n");
|
||||
return `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`;
|
||||
}
|
||||
|
||||
/** Calls the `supervisor` scene via {@link createThreadReactor}; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */
|
||||
export async function runSupervisor(
|
||||
args: RunSupervisorArgs,
|
||||
): Promise<Result<SupervisorDecision, string>> {
|
||||
@@ -78,63 +44,42 @@ export async function runSupervisor(
|
||||
if (!resolved.ok) {
|
||||
return ok("continue");
|
||||
}
|
||||
const provider = resolved.value;
|
||||
const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT);
|
||||
const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n");
|
||||
|
||||
const body = {
|
||||
model: provider.model,
|
||||
messages: [
|
||||
{
|
||||
role: "system" as const,
|
||||
content:
|
||||
'You supervise a multi-step workflow. Decide if the thread should keep running or halt.\n\nReply with exactly one token: either "continue" (progress toward the goal, not obviously stuck) or "stop" (done, looping, or no progress). Do not add explanation.',
|
||||
},
|
||||
{
|
||||
role: "user" as const,
|
||||
content: `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`,
|
||||
},
|
||||
],
|
||||
};
|
||||
const reactor = createThreadReactor<SupervisorThreadContext>({
|
||||
llm: createLlmFn(resolved.value),
|
||||
maxRounds: SUPERVISOR_MAX_REACT_ROUNDS,
|
||||
staticTools: [],
|
||||
structuredToolFromSchema: (schema) => {
|
||||
const t = extractFunctionToolFromZodSchema(schema);
|
||||
return {
|
||||
name: t.name,
|
||||
tool: {
|
||||
type: "function" as const,
|
||||
function: {
|
||||
name: t.name,
|
||||
description: t.description,
|
||||
parameters: t.parameters,
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
systemPromptForStructuredTool: (structuredToolName) =>
|
||||
`You supervise a multi-step workflow. Decide whether the thread should keep running or halt. Reply with "continue" when the thread is making progress toward the task, or "stop" when it is finished, looping, or no longer making progress. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"stop"}.`,
|
||||
toolHandler: async (call) => `Unknown tool: ${call.function.name}`,
|
||||
});
|
||||
|
||||
let response: Response;
|
||||
try {
|
||||
response = await fetch(chatCompletionsUrl(provider.baseUrl), {
|
||||
method: "POST",
|
||||
headers: {
|
||||
Authorization: `Bearer ${provider.apiKey}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
} catch (cause) {
|
||||
const message = cause instanceof Error ? cause.message : String(cause);
|
||||
args.logger("R9CW4PLM", `supervisor request failed: ${message}`);
|
||||
return err(`supervisor network error: ${message}`);
|
||||
const result = await reactor({
|
||||
thread: {} as SupervisorThreadContext,
|
||||
input: buildSupervisorInput(args),
|
||||
schema: supervisorDecisionSchema,
|
||||
});
|
||||
|
||||
if (!result.ok) {
|
||||
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
|
||||
return err(`supervisor: ${result.error}`);
|
||||
}
|
||||
|
||||
const responseText = await response.text();
|
||||
if (!response.ok) {
|
||||
args.logger("T3HN8VKQ", `supervisor HTTP ${response.status}: ${responseText.slice(0, 200)}`);
|
||||
return err(`supervisor HTTP ${response.status}: ${responseText.slice(0, 500)}`);
|
||||
}
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(responseText) as unknown;
|
||||
} catch (cause) {
|
||||
const message = cause instanceof Error ? cause.message : String(cause);
|
||||
args.logger("W7BQ2NXM", `supervisor response is not JSON: ${message}`);
|
||||
return err(`supervisor invalid JSON: ${message}`);
|
||||
}
|
||||
|
||||
const content = readAssistantContent(parsed);
|
||||
if (content === null || content.trim() === "") {
|
||||
args.logger("Y4JX9PKW", "supervisor returned empty assistant content");
|
||||
return err("supervisor empty assistant content");
|
||||
}
|
||||
|
||||
const decision = parseSupervisorDecisionText(content);
|
||||
const decision: SupervisorDecision = result.value.decision;
|
||||
args.logger("Z8KM5QWT", `supervisor says ${decision}`);
|
||||
return ok(decision);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user