Compare commits

..

12 Commits

Author SHA1 Message Date
xiaoju b8f9ffcb59 feat(workflow): migrate supervisor to ThreadReactor (Phase 2)
- Rewrite supervisor to use createThreadReactor + createLlmFn
- No direct fetch/HTTP calls in supervisor
- All 266 tests passing

Refs #139, relates #141
2026-05-09 02:26:39 +00:00
xiaoju a7171f05f6 feat(workflow): add ThreadReactor generic ReAct loop + migrate extract (Phase 1)
- New src/reactor/ module: createThreadReactor, createLlmFn, types
- Two-stage API: config (llm, systemPrompt, tools, toolHandler) + per-call (thread, input, schema)
- All tool failures are recoverable (returned to LLM as error message)
- Rewrite createExtract to use createThreadReactor
- Delete reactExtract old implementation
- Fix template test imports (START/END from runtime, validateWorkflowDescriptor from engine)

268 tests passing.

Refs #139, relates #140
2026-05-09 02:15:38 +00:00
xiaoju b53667a2aa Merge pull request 'refactor(workflow): move descriptor validation out of runtime' (#135) from refactor/runtime-descriptor-boundary into main 2026-05-08 15:05:24 +00:00
Scott Wei 5b60fa6454 refactor(workflow-runtime): flatten package layout and centralize types
Collapse bundle/cas/extract/util stubs into types.ts; move createWorkflow and Result helpers to src root.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 23:03:53 +08:00
xiaomo 2c0e744ebf Merge pull request 'perf(serve): SSE live pump reads incrementally' (#137) from fix/130-sse-incremental into main 2026-05-08 15:00:44 +00:00
xiaomo ae16f09688 Merge pull request 'feat(dashboard): hash routing + health check polling' (#138) from fix/128-dashboard-enhancements into main 2026-05-08 15:00:34 +00:00
xiaomo 73a3638ad9 Merge pull request 'fix(serve): error handling, CORS, body limit, CAS store reuse' (#136) from fix/120-serve-hardening into main 2026-05-08 15:00:32 +00:00
xingyue 7b0260cedd feat(dashboard): hash routing + health check polling
- Hash-based URL routing (#threads, #threads/{id}, #workflows)
  for bookmarkable/shareable thread links
- Health check polls every 10s with reconnecting state
- useHashRoute hook for clean route management

Closes #128
2026-05-08 18:16:09 +08:00
xingyue 61fc1cfe1b perf(serve): SSE live pump reads incrementally instead of full file
Use Bun.file().slice() to read only new bytes from the last known
offset instead of re-reading the entire JSONL file on every fs watch.

- readNewBytes() helper with byte-offset tracking
- Handles file truncation (resets offset)
- Early return when no new data

Closes #130
2026-05-08 18:14:04 +08:00
xingyue 6b1e728700 fix(serve): error handling, CORS, body limit, CAS store reuse
- Global error handler (app.onError → 500 JSON)
- JSON parse validation on POST routes (400)
- CORS restricted to localhost origins
- 1MB body size limit on POST (413)
- CAS store created once per route group, not per-request
- 6 new tests covering all changes

Closes #120
2026-05-08 18:11:59 +08:00
xiaomo dedab62c49 Merge pull request 'feat(dashboard): connect thread detail to SSE live stream' (#134) from feat/131-dashboard-sse into main 2026-05-08 09:53:10 +00:00
Scott Wei 8ff6f7e778 refactor(workflow): move descriptor validation out of runtime
Keep @uncaged/workflow-runtime focused on bundle runtime capabilities by relocating descriptor validation implementation to @uncaged/workflow.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:45:15 +08:00
40 changed files with 1096 additions and 719 deletions
@@ -77,6 +77,83 @@ describe("serve /api/cas", () => {
});
});
describe("serve error handling", () => {
test("POST /api/threads with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/cas with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/cas", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/threads with missing required fields → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ foo: "bar" }),
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toContain("required");
});
test("global error handler returns 500 with JSON", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
app.get("/test-error", () => {
throw new Error("boom");
});
const res = await app.fetch(new Request("http://localhost/test-error"));
expect(res.status).toBe(500);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Internal server error");
});
});
describe("serve security", () => {
test("CORS headers present on responses", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
const res2 = await app.fetch(
new Request("http://localhost/healthz", {
headers: { Origin: "http://localhost:5173" },
}),
);
expect(res2.headers.get("Access-Control-Allow-Origin")).toBe("http://localhost:5173");
});
test("POST with body > 1MB → 413", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const largeBody = "x".repeat(1_048_577);
const res = await fetch("/api/cas", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Content-Length": String(largeBody.length),
},
body: largeBody,
});
expect(res.status).toBe(413);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Payload too large");
});
});
describe("serve CAS round-trip", () => {
const tmpDir = `/tmp/uncaged-serve-cas-test-${Date.now()}`;
@@ -6,10 +6,36 @@ import { createLiveRoutes } from "./routes-live.js";
import { createThreadRoutes } from "./routes-thread.js";
import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string): Hono {
const app = new Hono();
app.use("*", cors());
app.onError((_err, c) => {
return c.json({ error: "Internal server error" }, 500);
});
app.use(
"*",
cors({
origin: [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:7860",
"http://127.0.0.1:7860",
],
}),
);
app.use("*", async (c, next) => {
if (c.req.method === "POST") {
const contentLength = c.req.header("content-length");
if (contentLength !== undefined && Number(contentLength) > MAX_BODY_SIZE) {
return c.json({ error: "Payload too large" }, 413);
}
}
await next();
});
app.get("/healthz", (c) => c.json({ ok: true }));
@@ -3,17 +3,15 @@ import { Hono } from "hono";
export function createCasRoutes(storageRoot: string): Hono {
const app = new Hono();
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
app.get("/", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hashes = await cas.list();
return c.json({ hashes });
});
app.get("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const content = await cas.get(c.req.param("hash"));
if (content === null) {
return c.json({ error: "not found" }, 404);
@@ -22,19 +20,20 @@ export function createCasRoutes(storageRoot: string): Hono {
});
app.post("/", async (c) => {
const body = await c.req.json<{ content: string }>();
let body: { content: string };
try {
body = (await c.req.json()) as { content: string };
} catch {
return c.json({ error: "invalid JSON body" }, 400);
}
if (typeof body.content !== "string") {
return c.json({ error: "content field required" }, 400);
}
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = await cas.put(body.content);
return c.json({ hash }, 201);
});
app.delete("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = c.req.param("hash");
const content = await cas.get(hash);
if (content === null) {
@@ -1,5 +1,4 @@
import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { statSync, watch } from "node:fs";
import { dirname, join } from "node:path";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
@@ -11,6 +10,30 @@ type PumpState = {
carry: string;
};
function fileSize(path: string): number {
try {
return statSync(path).size;
} catch {
return 0;
}
}
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
const size = fileSize(path);
if (size < state.contentOffset) {
// File was truncated — reset
state.contentOffset = 0;
state.carry = "";
}
if (size <= state.contentOffset) {
return null;
}
const blob = Bun.file(path).slice(state.contentOffset, size);
const chunk = await blob.text();
state.contentOffset = size;
return chunk;
}
function parseJsonLine(line: string): unknown {
try {
return JSON.parse(line) as unknown;
@@ -28,14 +51,7 @@ function isWorkflowResult(record: unknown): boolean {
);
}
function parseNewLines(text: string, state: PumpState): string[] {
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
function parseNewLines(chunk: string, state: PumpState): string[] {
state.carry += chunk;
const parts = state.carry.split("\n");
@@ -70,14 +86,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
let eventId = 0;
async function pumpData(): Promise<boolean> {
let text: string;
let chunk: string | null;
try {
text = await readFile(resolvedDataPath, "utf8");
chunk = await readNewBytes(resolvedDataPath, dataState);
} catch {
return false;
}
if (chunk === null) {
return false;
}
const lines = parseNewLines(text, dataState);
const lines = parseNewLines(chunk, dataState);
for (const line of lines) {
const record = parseJsonLine(line);
eventId++;
@@ -95,14 +114,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
}
async function pumpInfo(): Promise<void> {
let text: string;
let chunk: string | null;
try {
text = await readFile(infoPath, "utf8");
chunk = await readNewBytes(infoPath, infoState);
} catch {
return;
}
if (chunk === null) {
return;
}
const lines = parseNewLines(text, infoState);
const lines = parseNewLines(chunk, infoState);
for (const line of lines) {
const record = parseJsonLine(line);
if (
+6 -9
View File
@@ -5,12 +5,10 @@ import { StatusBar } from "./components/status-bar.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
type View = "threads" | "workflows";
import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const [view, setView] = useState<View>("threads");
const [selectedThread, setSelectedThread] = useState<string | null>(null);
const { view, threadId, setView, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
return (
@@ -19,9 +17,9 @@ export function App() {
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{view === "threads" && !selectedThread && <ThreadList onSelect={setSelectedThread} />}
{view === "threads" && selectedThread && (
<ThreadDetail threadId={selectedThread} onBack={() => setSelectedThread(null)} />
{view === "threads" && threadId === null && <ThreadList onSelect={setThreadId} />}
{view === "threads" && threadId !== null && (
<ThreadDetail threadId={threadId} onBack={() => setThreadId(null)} />
)}
{view === "workflows" && <WorkflowList />}
</div>
@@ -31,8 +29,7 @@ export function App() {
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
setView("threads");
setSelectedThread(id);
setThreadId(id);
}}
/>
)}
@@ -1,12 +1,47 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { getHealth } from "../api.ts";
import { useFetch } from "../hooks.ts";
type HealthStatus = "connected" | "disconnected" | "reconnecting";
type Props = {
onRun: () => void;
};
function statusLabel(status: HealthStatus): { text: string; color: string } {
if (status === "connected") {
return { text: "● Connected", color: "var(--color-success)" };
}
if (status === "reconnecting") {
return { text: "● Reconnecting...", color: "var(--color-warning, #f59e0b)" };
}
return { text: "● Offline", color: "var(--color-error)" };
}
export function StatusBar({ onRun }: Props) {
const health = useFetch(() => getHealth(), []);
const [status, setStatus] = useState<HealthStatus>("disconnected");
const wasConnectedRef = useRef(false);
const checkHealth = useCallback(async () => {
try {
await getHealth();
wasConnectedRef.current = true;
setStatus("connected");
} catch {
if (wasConnectedRef.current) {
setStatus("reconnecting");
} else {
setStatus("disconnected");
}
}
}, []);
useEffect(() => {
checkHealth();
const interval = setInterval(checkHealth, 10_000);
return () => clearInterval(interval);
}, [checkHealth]);
const label = statusLabel(status);
return (
<div
@@ -24,15 +59,7 @@ export function StatusBar({ onRun }: Props) {
Run Thread
</button>
</div>
<span>
{health.status === "loading" && "⏳ Connecting..."}
{health.status === "ok" && (
<span style={{ color: "var(--color-success)" }}> Connected</span>
)}
{health.status === "error" && (
<span style={{ color: "var(--color-error)" }}> Offline</span>
)}
</span>
<span style={{ color: label.color }}>{label.text}</span>
</div>
);
}
@@ -101,9 +101,9 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r, i) => (
{records.map((r) => (
<div
key={i}
key={`${threadId}-${r.type}-${String(r.timestamp)}-${r.role ?? ""}-${r.content ?? ""}`}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
+64
View File
@@ -0,0 +1,64 @@
import { useCallback, useEffect, useState } from "react";
type View = "threads" | "workflows";
type HashRoute = {
view: View;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
if (raw.startsWith("threads/")) {
const id = raw.slice("threads/".length);
if (id.length > 0) {
return { view: "threads", threadId: id };
}
}
if (raw === "workflows") {
return { view: "workflows", threadId: null };
}
return { view: "threads", threadId: null };
}
function buildHash(route: HashRoute): string {
if (route.view === "workflows") {
return "#workflows";
}
if (route.threadId !== null) {
return `#threads/${route.threadId}`;
}
return "#threads";
}
export function useHashRoute(): {
view: View;
threadId: string | null;
setView: (v: View) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
useEffect(() => {
function onHashChange(): void {
setRoute(parseHash(window.location.hash));
}
window.addEventListener("hashchange", onHashChange);
return () => window.removeEventListener("hashchange", onHashChange);
}, []);
const navigate = useCallback((next: HashRoute) => {
const hash = buildHash(next);
window.location.hash = hash;
setRoute(next);
}, []);
const setView = useCallback((v: View) => navigate({ view: v, threadId: null }), [navigate]);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", threadId: id }),
[navigate],
);
return { view: route.view, threadId: route.threadId, setView, setThreadId };
}
@@ -1,2 +0,0 @@
export type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export { validateWorkflowDescriptor } from "./workflow-descriptor.js";
@@ -1,13 +0,0 @@
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
@@ -1,40 +0,0 @@
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
@@ -1 +0,0 @@
export type { CasStore } from "./types.js";
@@ -1,6 +0,0 @@
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
@@ -1,10 +1,10 @@
import type * as z from "zod/v4";
import type { CasStore } from "../cas/types.js";
import {
type AgentBinding,
type AgentContext,
type AgentFn,
type CasStore,
END,
type ExtractContext,
type ModeratorContext,
@@ -18,8 +18,7 @@ import {
type WorkflowDefinition,
type WorkflowFn,
type WorkflowRuntime,
} from "../types.js";
import { mergeRefsWithContentHash } from "../util/index.js";
} from "./types.js";
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
@@ -97,10 +96,11 @@ async function advanceOneRound<M extends RoleMeta>(
);
const contentHash = await putContentBlob(runtime.cas, raw);
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
const refsFromMeta = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
);
const refs = refsFromMeta.includes(contentHash) ? refsFromMeta : [...refsFromMeta, contentHash];
const step = {
role: next,
@@ -1 +0,0 @@
export { createWorkflow } from "./create-workflow.js";
@@ -1 +0,0 @@
export type { ExtractFn } from "./types.js";
@@ -1,9 +0,0 @@
import type * as z from "zod/v4";
import type { ExtractContext } from "../types.js";
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
+8 -11
View File
@@ -1,20 +1,16 @@
export type {
WorkflowDescriptor,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
} from "./bundle/types.js";
export { validateWorkflowDescriptor } from "./bundle/workflow-descriptor.js";
export type { CasStore } from "./cas/index.js";
export { createWorkflow } from "./engine/index.js";
export type { ExtractFn } from "./extract/index.js";
export { createWorkflow } from "./create-workflow.js";
export { err, ok } from "./result.js";
export type {
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
@@ -23,10 +19,11 @@ export type {
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
export type { Result } from "./util/index.js";
export { err, ok } from "./util/index.js";
+35 -3
View File
@@ -1,12 +1,33 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas/index.js";
import type { ExtractFn } from "./extract/types.js";
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
/** Expected success/failure outcome without throwing for recoverable errors. */
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
/** Maps role names → their meta types. Single generic drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
@@ -96,6 +117,12 @@ export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
};
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
/** Raw string output from an LLM/CLI adapter; meta is extracted by the engine. */
export type AgentFn = (ctx: AgentContext) => Promise<string>;
@@ -131,3 +158,8 @@ export type WorkflowDefinition<M extends RoleMeta> = {
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
/** Internal outcome of advancing one moderator round inside {@link createWorkflow}. */
export type AdvanceOutcome<M extends RoleMeta> =
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
@@ -1,3 +0,0 @@
export { mergeRefsWithContentHash } from "./refs-field.js";
export { err, ok } from "./result.js";
export type { Result } from "./types.js";
@@ -1,8 +0,0 @@
/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */
export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] {
const out = [...refs];
if (!out.includes(contentHash)) {
out.push(contentHash);
}
return out;
}
@@ -1 +0,0 @@
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
@@ -1,11 +1,6 @@
import { describe, expect, test } from "bun:test";
import {
END,
type ModeratorContext,
type RoleStep,
START,
validateWorkflowDescriptor,
} from "@uncaged/workflow-runtime";
import { validateWorkflowDescriptor } from "@uncaged/workflow";
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
import { buildDevelopDescriptor } from "../src/descriptor.js";
import { developModerator } from "../src/index.js";
import type { CommitterMeta, PlannerMeta } from "../src/roles/index.js";
@@ -2,14 +2,13 @@ import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, createExtract, createWorkflow } from "@uncaged/workflow";
import {
END,
type ModeratorContext,
type RoleStep,
START,
createCasStore,
createExtract,
createWorkflow,
validateWorkflowDescriptor,
} from "@uncaged/workflow-runtime";
} from "@uncaged/workflow";
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
import { buildSolveIssueDescriptor } from "../src/descriptor.js";
import type { DeveloperMeta } from "../src/developer.js";
import { solveIssueModerator, solveIssueWorkflowDefinition } from "../src/index.js";
+1 -1
View File
@@ -29,7 +29,7 @@ import { createWorkflow, readWorkflowRegistry, executeThread } from "@uncaged/wo
| **Registry** | `readWorkflowRegistry`, `writeWorkflowRegistry`, `registerWorkflowVersion`, `workflowRegistryPath`, YAML helpers |
| **CAS** | `createCasStore`, Merkle helpers (`putStepMerkleNode`, `getContentMerklePayload`, …), `hashWorkflowBundleBytes` |
| **Engine** | `createWorkflow`, `executeThread`, `parseThreadDataJsonl`, fork helpers, `garbageCollectCas` |
| **Extract / LLM tools** | `llmExtract`, `reactExtract`, `createExtract`, `getExtractProvider` |
| **Extract / LLM tools** | `llmExtract`, `createExtract`, `createThreadReactor`, `createLlmFn`, `getExtractProvider` |
| **Agent bridge** | `workflowAsAgent` — expose a registered workflow as an agent-backed role |
| **Utilities** | `createLogger`, ULID / Crockford Base32 codecs, `getDefaultWorkflowStorageRoot`, paths |
+10 -8
View File
@@ -101,10 +101,10 @@ async function writeRegistryYaml(storageRoot: string, yaml: string): Promise<voi
await writeFile(join(storageRoot, "workflow.yaml"), yaml, "utf8");
}
/** Extract rounds reply with schema-shaped JSON in `content`; supervisor uses plain `content` (no tools advertised). */
/** Extract and supervisor both run via {@link createThreadReactor}; differentiate by `body.model`. */
function installMockExtractThenSupervisor(params: {
extractArgs: ReadonlyArray<Record<string, unknown>>;
supervisorContent: string;
supervisorDecision: "continue" | "stop";
onSupervisorCall?: () => void;
}): () => void {
const origFetch = globalThis.fetch;
@@ -114,9 +114,9 @@ function installMockExtractThenSupervisor(params: {
init?: RequestInit,
): Promise<Response> => {
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const hasTools = Array.isArray(tools) && tools.length > 0;
if (hasTools) {
const model = typeof body.model === "string" ? body.model : "";
const isSupervisor = model.startsWith("supervisor-");
if (!isSupervisor) {
const args =
params.extractArgs[extractI] ?? params.extractArgs[params.extractArgs.length - 1];
if (args === undefined) {
@@ -133,7 +133,9 @@ function installMockExtractThenSupervisor(params: {
params.onSupervisorCall?.();
return new Response(
JSON.stringify({
choices: [{ message: { content: params.supervisorContent } }],
choices: [
{ message: { content: JSON.stringify({ decision: params.supervisorDecision }) } },
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
@@ -674,7 +676,7 @@ describe("executeThread", () => {
test("supervisor stops thread when interval elapses and model returns stop", async () => {
restoreFetch = installMockExtractThenSupervisor({
extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }],
supervisorContent: "stop",
supervisorDecision: "stop",
});
const root = await mkdtemp(join(tmpdir(), "wf-engine-sup-stop-"));
@@ -725,7 +727,7 @@ describe("executeThread", () => {
let supervisorCalls = 0;
restoreFetch = installMockExtractThenSupervisor({
extractArgs: [{ plan: "do-it", files: ["a.ts"] }, { diff: "+ok" }],
supervisorContent: "stop",
supervisorDecision: "stop",
onSupervisorCall: () => {
supervisorCalls += 1;
},
+78 -54
View File
@@ -1,6 +1,6 @@
import { afterEach, describe, expect, test } from "bun:test";
import { parseSupervisorDecisionText, runSupervisor } from "../src/engine/supervisor.js";
import { runSupervisor } from "../src/engine/supervisor.js";
import type { WorkflowConfig } from "../src/registry/index.js";
import type { LogFn } from "../src/util/index.js";
@@ -20,28 +20,23 @@ function supervisorOnlyConfig(): WorkflowConfig {
};
}
describe("parseSupervisorDecisionText", () => {
test("reads continue and stop case-insensitively", () => {
expect(parseSupervisorDecisionText("continue")).toBe("continue");
expect(parseSupervisorDecisionText("CONTINUE")).toBe("continue");
expect(parseSupervisorDecisionText("stop")).toBe("stop");
expect(parseSupervisorDecisionText("STOP.")).toBe("stop");
function jsonResponse(body: Record<string, unknown>, status = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "Content-Type": "application/json" },
});
}
test("finds token inside a sentence", () => {
expect(parseSupervisorDecisionText("Answer: continue")).toBe("continue");
expect(parseSupervisorDecisionText("I recommend stop now")).toBe("stop");
});
test("when both appear, earlier token wins", () => {
expect(parseSupervisorDecisionText("continue then stop")).toBe("continue");
expect(parseSupervisorDecisionText("stop then continue")).toBe("stop");
});
test("defaults to continue when unclear", () => {
expect(parseSupervisorDecisionText("maybe later")).toBe("continue");
});
});
function installFetchMock(impl: (init?: RequestInit) => Promise<Response>): () => void {
const origFetch = globalThis.fetch;
globalThis.fetch = Object.assign(
async (_input: Parameters<typeof fetch>[0], init?: RequestInit) => impl(init),
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
describe("runSupervisor", () => {
let restoreFetch: (() => void) | null = null;
@@ -52,16 +47,9 @@ describe("runSupervisor", () => {
});
test("returns continue when supervisor model cannot be resolved (no fetch)", async () => {
const origFetch = globalThis.fetch;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(
async () => {
throw new Error("fetch should not run when supervisor is not configured");
},
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
restoreFetch = installFetchMock(async () => {
throw new Error("fetch should not run when supervisor is not configured");
});
const config: WorkflowConfig = {
maxDepth: 1,
@@ -87,21 +75,27 @@ describe("runSupervisor", () => {
expect(r.value).toBe("continue");
});
test("returns stop from chat/completions assistant content", async () => {
const origFetch = globalThis.fetch;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(
async () =>
new Response(
JSON.stringify({
choices: [{ message: { content: "stop" } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
),
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
test("returns stop from structured tool call", async () => {
restoreFetch = installFetchMock(async () =>
jsonResponse({
choices: [
{
message: {
tool_calls: [
{
id: "t1",
type: "function",
function: {
name: "supervisor_decision",
arguments: JSON.stringify({ decision: "stop" }),
},
},
],
},
},
],
}),
);
const r = await runSupervisor({
config: supervisorOnlyConfig(),
@@ -116,14 +110,44 @@ describe("runSupervisor", () => {
expect(r.value).toBe("stop");
});
test("returns err on invalid JSON body", async () => {
const origFetch = globalThis.fetch;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(async () => new Response("not-json", { status: 200 }), {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
test("returns continue from plain JSON content (reactor short-circuit)", async () => {
restoreFetch = installFetchMock(async () =>
jsonResponse({
choices: [{ message: { content: '{"decision":"continue"}' } }],
}),
);
const r = await runSupervisor({
config: supervisorOnlyConfig(),
prompt: "do Y",
recentSteps: [],
logger: noopLogger,
});
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value).toBe("continue");
});
test("returns err when reactor cannot validate the schema within max rounds", async () => {
restoreFetch = installFetchMock(async () =>
jsonResponse({
choices: [{ message: { content: "not-json" } }],
}),
);
const r = await runSupervisor({
config: supervisorOnlyConfig(),
prompt: "p",
recentSteps: [],
logger: noopLogger,
});
expect(r.ok).toBe(false);
});
test("returns err on HTTP failure", async () => {
restoreFetch = installFetchMock(async () => new Response("boom", { status: 500 }));
const r = await runSupervisor({
config: supervisorOnlyConfig(),
@@ -6,7 +6,8 @@ import type { LlmProvider } from "@uncaged/workflow-runtime";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas/cas.js";
import { createContentMerkleNode, serializeMerkleNode } from "../src/cas/merkle.js";
import { reactExtract } from "../src/extract/react-extract.js";
import { extractFunctionToolFromZodSchema } from "../src/extract/llm-extract.js";
import { createLlmFn, createThreadReactor } from "../src/reactor/index.js";
const metaSchema = z.object({ seen: z.string() });
@@ -16,7 +17,57 @@ const provider: LlmProvider = {
model: "test",
};
describe("reactExtract", () => {
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description: "Read CAS node",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "hash" },
},
required: ["hash"],
},
},
};
type ThreadCtx = { cas: ReturnType<typeof createCasStore> };
function createTestReactor() {
const llm = createLlmFn(provider);
return createThreadReactor<ThreadCtx>({
llm,
maxRounds: 10,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`Extract metadata. Use cas_get when needed. Call ${structuredToolName} with JSON args matching the schema, or reply with plain JSON.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `unexpected tool ${call.function.name}`;
}
const ta = JSON.parse(call.function.arguments) as { hash: string };
const blob = await thread.cas.get(ta.hash);
return blob === null ? "null" : blob;
},
});
}
describe("createThreadReactor (extract-shaped)", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
@@ -25,7 +76,7 @@ describe("reactExtract", () => {
});
test("cas_get rounds then extract tool yields validated meta", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-"));
const casDir = await mkdtemp(join(tmpdir(), "thread-reactor-"));
const cas = createCasStore(casDir);
try {
const blob = serializeMerkleNode(createContentMerkleNode("needle"));
@@ -87,12 +138,12 @@ describe("reactExtract", () => {
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const reactor = createTestReactor();
const text = `## Agent Output\n${h}\n## Extraction Instruction\nExtract seen from CAS.`;
const result = await reactExtract({
text,
const result = await reactor({
thread: { cas },
input: text,
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(true);
@@ -107,7 +158,7 @@ describe("reactExtract", () => {
});
test("stops after max tool rounds when model keeps calling cas_get", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-max-"));
const casDir = await mkdtemp(join(tmpdir(), "thread-reactor-max-"));
const cas = createCasStore(casDir);
try {
const blob = serializeMerkleNode(createContentMerkleNode("x"));
@@ -146,11 +197,11 @@ describe("reactExtract", () => {
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const result = await reactExtract({
text: "## Agent Output\nnoop\n## Extraction Instruction\nExtract seen.",
const reactor = createTestReactor();
const result = await reactor({
thread: { cas },
input: "## Agent Output\nnoop\n## Extraction Instruction\nExtract seen.",
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(false);
@@ -165,7 +216,7 @@ describe("reactExtract", () => {
});
test("passthrough JSON assistant message without tool calls", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-pass-"));
const casDir = await mkdtemp(join(tmpdir(), "thread-reactor-pass-"));
const cas = createCasStore(casDir);
try {
const origFetch = globalThis.fetch;
@@ -189,11 +240,11 @@ describe("reactExtract", () => {
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const result = await reactExtract({
text: "## Agent Output\nok\n## Extraction Instruction\nExtract.",
const reactor = createTestReactor();
const result = await reactor({
thread: { cas },
input: "## Agent Output\nok\n## Extraction Instruction\nExtract.",
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(true);
@@ -1 +1,40 @@
export { validateWorkflowDescriptor } from "@uncaged/workflow-runtime";
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
+54 -109
View File
@@ -1,67 +1,27 @@
import * as z from "zod/v4";
import { resolveModel } from "../config/index.js";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import { createLlmFn, createThreadReactor } from "../reactor/index.js";
import type { WorkflowConfig } from "../registry/index.js";
import { err, type LogFn, ok, type Result } from "../util/index.js";
import type { SupervisorDecision } from "./types.js";
const SUPERVISOR_RECENT_STEP_LIMIT = 12;
const SUPERVISOR_MAX_REACT_ROUNDS = 4;
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
const supervisorDecisionSchema = z
.object({
decision: z.enum(["continue", "stop"]),
})
.meta({
title: "supervisor_decision",
description:
'Workflow supervisor decision. "continue" when the thread is making progress; "stop" when done, looping, or stuck.',
});
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function readAssistantContent(parsed: unknown): string | null {
if (!isRecord(parsed)) {
return null;
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return null;
}
const first = choices[0];
if (!isRecord(first)) {
return null;
}
const messageObj = first.message;
if (!isRecord(messageObj)) {
return null;
}
const content = messageObj.content;
if (typeof content !== "string") {
return null;
}
return content;
}
/** Lenient: accepts STOP/stop/stop. as prose; prefers {@link SupervisorDecision.stop} when both tokens appear. */
export function parseSupervisorDecisionText(text: string): SupervisorDecision {
const lower = text.toLowerCase();
const stopWord = /\bstop\b/.test(lower);
const continueWord = /\bcontinue\b/.test(lower);
if (stopWord && continueWord) {
const si = lower.search(/\bstop\b/);
const ci = lower.search(/\bcontinue\b/);
return si <= ci ? "stop" : "continue";
}
if (stopWord) {
return "stop";
}
if (continueWord) {
return "continue";
}
if (lower.includes("stop")) {
return "stop";
}
if (lower.includes("continue")) {
return "continue";
}
return "continue";
}
type SupervisorThreadContext = Record<string, never>;
type RunSupervisorArgs = {
config: WorkflowConfig;
@@ -70,7 +30,13 @@ type RunSupervisorArgs = {
logger: LogFn;
};
/** Calls the `supervisor` scene LLM; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */
function buildSupervisorInput(args: RunSupervisorArgs): string {
const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT);
const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n");
return `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`;
}
/** Calls the `supervisor` scene via {@link createThreadReactor}; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */
export async function runSupervisor(
args: RunSupervisorArgs,
): Promise<Result<SupervisorDecision, string>> {
@@ -78,63 +44,42 @@ export async function runSupervisor(
if (!resolved.ok) {
return ok("continue");
}
const provider = resolved.value;
const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT);
const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n");
const body = {
model: provider.model,
messages: [
{
role: "system" as const,
content:
'You supervise a multi-step workflow. Decide if the thread should keep running or halt.\n\nReply with exactly one token: either "continue" (progress toward the goal, not obviously stuck) or "stop" (done, looping, or no progress). Do not add explanation.',
},
{
role: "user" as const,
content: `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`,
},
],
};
const reactor = createThreadReactor<SupervisorThreadContext>({
llm: createLlmFn(resolved.value),
maxRounds: SUPERVISOR_MAX_REACT_ROUNDS,
staticTools: [],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You supervise a multi-step workflow. Decide whether the thread should keep running or halt. Reply with "continue" when the thread is making progress toward the task, or "stop" when it is finished, looping, or no longer making progress. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"stop"}.`,
toolHandler: async (call) => `Unknown tool: ${call.function.name}`,
});
let response: Response;
try {
response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
args.logger("R9CW4PLM", `supervisor request failed: ${message}`);
return err(`supervisor network error: ${message}`);
const result = await reactor({
thread: {} as SupervisorThreadContext,
input: buildSupervisorInput(args),
schema: supervisorDecisionSchema,
});
if (!result.ok) {
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
return err(`supervisor: ${result.error}`);
}
const responseText = await response.text();
if (!response.ok) {
args.logger("T3HN8VKQ", `supervisor HTTP ${response.status}: ${responseText.slice(0, 200)}`);
return err(`supervisor HTTP ${response.status}: ${responseText.slice(0, 500)}`);
}
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
args.logger("W7BQ2NXM", `supervisor response is not JSON: ${message}`);
return err(`supervisor invalid JSON: ${message}`);
}
const content = readAssistantContent(parsed);
if (content === null || content.trim() === "") {
args.logger("Y4JX9PKW", "supervisor returned empty assistant content");
return err("supervisor empty assistant content");
}
const decision = parseSupervisorDecisionText(content);
const decision: SupervisorDecision = result.value.decision;
args.logger("Z8KM5QWT", `supervisor says ${decision}`);
return ok(decision);
}
+74 -3
View File
@@ -1,12 +1,39 @@
import type { ExtractContext, ExtractFn, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { type CasStore, getContentMerklePayload } from "../cas/index.js";
import { reactExtract } from "./react-extract.js";
import { createLlmFn, createThreadReactor } from "../reactor/index.js";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
export type ExtractDeps = {
cas: CasStore;
};
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
export type ExtractThreadContext = {
cas: CasStore;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
/** Builds the user-side extraction prompt (thread + agent output + instruction). */
export async function buildExtractUserContent(
ctx: ExtractContext,
@@ -46,17 +73,61 @@ export async function buildExtractUserContent(
* Create an ExtractFn backed by an LLM provider.
*
* Internally runs a multi-turn ReAct loop with two tools (`cas_get` for traversing the
* Merkle DAG and a schema-shaped `extract` tool); the loop also accepts a plain-JSON
* Merkle DAG and a schema-shaped extract tool); the loop also accepts a plain-JSON
* assistant reply as a short-circuit, which covers the legacy "single" extraction path.
*/
export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn {
const llm = createLlmFn(provider);
const reactor = createThreadReactor<ExtractThreadContext>({
llm,
maxRounds: MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires a JSON object with a string "hash" field.';
}
hash = ta.hash;
} catch {
return 'cas_get arguments were not valid JSON. Provide {"hash": "<cas-hash>"}.';
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
const text = await buildExtractUserContent(ctx, prompt, deps);
const result = await reactExtract({ text, schema, provider, cas: deps.cas });
const result = await reactor({
thread: { cas: deps.cas },
input: text,
schema,
});
if (!result.ok) {
throw new Error(`extract failed: ${result.error}`);
}
+2 -7
View File
@@ -1,16 +1,11 @@
export {
buildExtractUserContent,
createExtract,
type ExtractThreadContext,
} from "./extract-fn.js";
export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
} from "./llm-extract.js";
export { reactExtract } from "./react-extract.js";
export type {
ExtractFn,
LlmError,
LlmExtractArgs,
ReactExtractArgs,
} from "./types.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./types.js";
@@ -1,343 +0,0 @@
import type { CasStore, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { err, ok, type Result } from "../util/index.js";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
import type { ReactExtractArgs } from "./types.js";
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function tryParseJsonContent(content: string): unknown | null {
const trimmed = content.trim();
const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed);
const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed;
try {
return JSON.parse(payload) as unknown;
} catch {
return null;
}
}
type ToolCall = {
id: string;
type: "function";
function: { name: string; arguments: string };
};
type ChatMessage =
| { role: "system"; content: string }
| { role: "user"; content: string }
| {
role: "assistant";
content: string | null;
tool_calls: ToolCall[];
}
| { role: "assistant"; content: string }
| { role: "tool"; tool_call_id: string; content: string };
type AssistantTurn<T> =
| { kind: "plain_json"; value: T }
| { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null };
function firstAssistantMessage(responseText: string): Result<Record<string, unknown>, string> {
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`invalid_response_json:${message}`);
}
if (!isRecord(parsed)) {
return err("invalid_response_top_level");
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return err("no_choices_in_response");
}
const firstChoice = choices[0];
if (!isRecord(firstChoice)) {
return err("invalid_choice");
}
const messageObj = firstChoice.message;
if (!isRecord(messageObj)) {
return err("invalid_message");
}
return ok(messageObj);
}
function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string> {
const toolCalls: ToolCall[] = [];
for (const tc of toolCallsRaw) {
if (!isRecord(tc)) {
return err("invalid_tool_call");
}
const id = tc.id;
const tcType = tc.type;
const fn = tc.function;
if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) {
return err("invalid_tool_call_shape");
}
const name = fn.name;
const argumentsStr = fn.arguments;
if (typeof name !== "string" || typeof argumentsStr !== "string") {
return err("invalid_tool_call_function");
}
toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } });
}
return ok(toolCalls);
}
type AssistantTurnOrCorrection<T extends Record<string, unknown>> =
| AssistantTurn<T>
| { kind: "plain_json_invalid"; rawContent: string; correction: string };
function classifyAssistantTurn<T extends Record<string, unknown>>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
): Result<AssistantTurnOrCorrection<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
if (typeof content !== "string") {
return err("no_tool_calls_and_no_string_content");
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction:
"Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the extract tool with the structured arguments.",
});
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the extract tool with the structured arguments.`,
});
}
return ok({ kind: "plain_json", value: validated.data });
}
const callsResult = normalizeToolCalls(toolCallsRaw);
if (!callsResult.ok) {
return err(callsResult.error);
}
const assistantContent = messageObj.content;
return ok({
kind: "tool_calls",
calls: callsResult.value,
assistantContent: typeof assistantContent === "string" ? assistantContent : null,
});
}
async function appendCasGetToolResult(
tc: ToolCall,
cas: CasStore,
messages: ChatMessage[],
): Promise<Result<null, string>> {
let hash: string;
try {
const ta = JSON.parse(tc.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return err("cas_get_invalid_arguments");
}
hash = ta.hash;
} catch {
return err("cas_get_arguments_not_json");
}
const blob = await cas.get(hash);
const toolContent = blob === null ? "null" : blob;
messages.push({
role: "tool",
tool_call_id: tc.id,
content: toolContent,
});
return ok(null);
}
async function appendExtractToolResult<T extends Record<string, unknown>>(
tc: ToolCall,
schema: z.ZodType<T>,
messages: ChatMessage[],
): Promise<Result<T, string>> {
let parsedArgs: unknown;
try {
parsedArgs = JSON.parse(tc.function.arguments) as unknown;
} catch {
return err("extract_tool_arguments_not_json");
}
const validated = schema.safeParse(parsedArgs);
if (!validated.success) {
return err(`schema_validation_failed:${validated.error.message}`);
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: '{"ok":true}',
});
return ok(validated.data);
}
async function appendToolResults<T extends Record<string, unknown>>(
toolCalls: ToolCall[],
extractToolName: string,
schema: z.ZodType<T>,
cas: CasStore,
messages: ChatMessage[],
): Promise<Result<T | null, string>> {
let extracted: T | null = null;
for (const tc of toolCalls) {
if (tc.function.name === "cas_get") {
const casRes = await appendCasGetToolResult(tc, cas, messages);
if (!casRes.ok) {
return casRes;
}
continue;
}
if (tc.function.name === extractToolName) {
const exRes = await appendExtractToolResult(tc, schema, messages);
if (!exRes.ok) {
return exRes;
}
extracted = exRes.value;
continue;
}
return err(`unknown_tool:${tc.function.name}`);
}
return ok(extracted);
}
async function postChatCompletion(
provider: LlmProvider,
messages: ChatMessage[],
tools: readonly Record<string, unknown>[],
): Promise<Result<string, string>> {
try {
const response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: provider.model,
messages,
tools,
tool_choice: "auto",
}),
});
const responseText = await response.text();
if (!response.ok) {
return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`);
}
return ok(responseText);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`network_error:${message}`);
}
}
/**
* Multi-turn ReAct extraction with `cas_get` plus a schema-shaped extract tool (OpenAI-compatible).
* Final meta comes from a successful extract tool call or from plain JSON in the assistant message.
*/
export async function reactExtract<T extends Record<string, unknown>>(
args: ReactExtractArgs<T>,
): Promise<Result<T, string>> {
const extractTool = extractFunctionToolFromZodSchema(args.schema);
const tools = [
CAS_GET_TOOL_DEFINITION,
{
type: "function" as const,
function: {
name: extractTool.name,
description: extractTool.description,
parameters: extractTool.parameters,
},
},
];
const systemContent = `You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${extractTool.name} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`;
const messages: ChatMessage[] = [
{ role: "system", content: systemContent },
{ role: "user", content: args.text },
];
for (let round = 0; round < MAX_REACT_ROUNDS; round++) {
const bodyResult = await postChatCompletion(args.provider, messages, tools);
if (!bodyResult.ok) {
return bodyResult;
}
const msgResult = firstAssistantMessage(bodyResult.value);
if (!msgResult.ok) {
return msgResult;
}
const classified = classifyAssistantTurn(msgResult.value, args.schema);
if (!classified.ok) {
return classified;
}
const turn = classified.value;
if (turn.kind === "plain_json") {
return ok(turn.value);
}
if (turn.kind === "plain_json_invalid") {
messages.push({ role: "assistant", content: turn.rawContent });
messages.push({ role: "user", content: turn.correction });
continue;
}
messages.push({
role: "assistant",
content: turn.assistantContent,
tool_calls: turn.calls,
});
const toolsRound = await appendToolResults(
turn.calls,
extractTool.name,
args.schema,
args.cas,
messages,
);
if (!toolsRound.ok) {
return toolsRound;
}
if (toolsRound.value !== null) {
return ok(toolsRound.value);
}
}
return err("max_react_rounds_exceeded");
}
+1 -8
View File
@@ -1,15 +1,8 @@
import type { CasStore, LlmProvider } from "@uncaged/workflow-runtime";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
export type { ExtractFn } from "@uncaged/workflow-runtime";
export type ReactExtractArgs<T extends Record<string, unknown>> = {
text: string;
schema: z.ZodType<T>;
provider: LlmProvider;
cas: CasStore;
};
export type LlmExtractArgs<T> = {
text: string;
schema: z.ZodType<T>;
+13 -2
View File
@@ -56,12 +56,23 @@ export {
export {
createExtract,
type ExtractFn,
type ExtractThreadContext,
type LlmError,
llmErrorToCause,
llmExtract,
type ReactExtractArgs,
reactExtract,
} from "./extract/index.js";
export {
type ChatMessage,
createLlmFn,
createThreadReactor,
type LlmFn,
type StructuredToolSpec,
type ThreadReactorConfig,
type ThreadReactorFn,
type ThreadReactorInvokeArgs,
type ToolCall,
type ToolDefinition,
} from "./reactor/index.js";
export {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
+12
View File
@@ -0,0 +1,12 @@
export { createLlmFn } from "./llm-fn.js";
export { createThreadReactor } from "./thread-reactor.js";
export type {
ChatMessage,
LlmFn,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ThreadReactorInvokeArgs,
ToolCall,
ToolDefinition,
} from "./types.js";
+48
View File
@@ -0,0 +1,48 @@
import type { LlmProvider } from "@uncaged/workflow-runtime";
import { err, ok } from "../util/index.js";
import type { ChatMessage, LlmFn, ToolDefinition } from "./types.js";
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
/**
* Wraps provider credentials into an {@link LlmFn}: single POST to chat/completions,
* returns raw JSON body text or a {@link Result} error. Callers parse assistant messages.
*/
export function createLlmFn(provider: LlmProvider): LlmFn {
return async ({
messages,
tools,
}: {
messages: ChatMessage[];
tools: readonly ToolDefinition[];
}) => {
try {
const response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: provider.model,
messages,
tools,
tool_choice: "auto",
}),
});
const responseText = await response.text();
if (!response.ok) {
return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`);
}
return ok(responseText);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`network_error:${message}`);
}
};
}
@@ -0,0 +1,317 @@
import type * as z from "zod/v4";
import { err, ok, type Result } from "../util/index.js";
import type {
ChatMessage,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ToolCall,
ToolDefinition,
} from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function tryParseJsonContent(content: string): unknown | null {
const trimmed = content.trim();
const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed);
const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed;
try {
return JSON.parse(payload) as unknown;
} catch {
return null;
}
}
function firstAssistantMessage(responseText: string): Result<Record<string, unknown>, string> {
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`invalid_response_json:${message}`);
}
if (!isRecord(parsed)) {
return err("invalid_response_top_level");
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return err("no_choices_in_response");
}
const firstChoice = choices[0];
if (!isRecord(firstChoice)) {
return err("invalid_choice");
}
const messageObj = firstChoice.message;
if (!isRecord(messageObj)) {
return err("invalid_message");
}
return ok(messageObj);
}
function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string> {
const toolCalls: ToolCall[] = [];
for (const tc of toolCallsRaw) {
if (!isRecord(tc)) {
return err("invalid_tool_call");
}
const id = tc.id;
const tcType = tc.type;
const fn = tc.function;
if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) {
return err("invalid_tool_call_shape");
}
const name = fn.name;
const argumentsStr = fn.arguments;
if (typeof name !== "string" || typeof argumentsStr !== "string") {
return err("invalid_tool_call_function");
}
toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } });
}
return ok(toolCalls);
}
type AssistantTurn<T> =
| { kind: "plain_json"; value: T }
| { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null };
type AssistantTurnOrCorrection<T> =
| AssistantTurn<T>
| { kind: "plain_json_invalid"; rawContent: string; correction: string };
function classifyAssistantTurn<T>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
structuredToolName: string,
): Result<AssistantTurnOrCorrection<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
if (typeof content !== "string") {
return err("no_tool_calls_and_no_string_content");
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
return ok({ kind: "plain_json", value: validated.data });
}
const callsResult = normalizeToolCalls(toolCallsRaw);
if (!callsResult.ok) {
return err(callsResult.error);
}
const assistantContent = messageObj.content;
return ok({
kind: "tool_calls",
calls: callsResult.value,
assistantContent: typeof assistantContent === "string" ? assistantContent : null,
});
}
function toolNamesFromDefinitions(tools: readonly { function: { name: string } }[]): Set<string> {
return new Set(tools.map((t) => t.function.name));
}
function appendStructuredToolResult<T>(
tc: ToolCall,
schema: z.ZodType<T>,
messages: ChatMessage[],
): T | null {
let parsedArgs: unknown;
try {
parsedArgs = JSON.parse(tc.function.arguments) as unknown;
} catch {
messages.push({
role: "tool",
tool_call_id: tc.id,
content:
"Tool arguments were not valid JSON. Provide valid JSON object arguments matching the schema.",
});
return null;
}
const validated = schema.safeParse(parsedArgs);
if (!validated.success) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Schema validation failed: ${validated.error.message}. Fix the arguments and call the tool again with a JSON object that matches the schema.`,
});
return null;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: '{"ok":true}',
});
return validated.data;
}
async function dispatchToolCall<T, TThread>(
tc: ToolCall,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<T | null> {
if (!knownNames.has(tc.function.name)) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Unknown tool: ${tc.function.name}. Use one of the declared tools only.`,
});
return null;
}
if (tc.function.name === spec.name) {
return appendStructuredToolResult(tc, schema, messages);
}
let toolContent: string;
try {
toolContent = await toolHandler(tc, thread);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
toolContent = `Tool execution failed: ${message}`;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: toolContent,
});
return null;
}
async function resolveToolCallRound<T, TThread>(
turn: Extract<AssistantTurn<T>, { kind: "tool_calls" }>,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
messages.push({
role: "assistant",
content: turn.assistantContent,
tool_calls: turn.calls,
});
let extractedRound: T | null = null;
for (const tc of turn.calls) {
const extracted = await dispatchToolCall(
tc,
spec,
knownNames,
schema,
thread,
toolHandler,
messages,
);
if (extracted !== null) {
extractedRound = extracted;
}
}
if (extractedRound !== null) {
return ok(extractedRound);
}
return null;
}
async function runOneReactRound<T, TThread>(
config: ThreadReactorConfig<TThread>,
args: { thread: TThread; schema: z.ZodType<T> },
tools: readonly ToolDefinition[],
knownNames: Set<string>,
spec: StructuredToolSpec,
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
const bodyResult = await config.llm({ messages, tools });
if (!bodyResult.ok) {
return bodyResult;
}
const msgResult = firstAssistantMessage(bodyResult.value);
if (!msgResult.ok) {
return msgResult;
}
const classified = classifyAssistantTurn(msgResult.value, args.schema, spec.name);
if (!classified.ok) {
return classified;
}
const turn = classified.value;
if (turn.kind === "plain_json") {
return ok(turn.value);
}
if (turn.kind === "plain_json_invalid") {
messages.push({ role: "assistant", content: turn.rawContent });
messages.push({ role: "user", content: turn.correction });
return null;
}
return resolveToolCallRound(
turn,
spec,
knownNames,
args.schema,
args.thread,
config.toolHandler,
messages,
);
}
/**
* Generic ReAct loop: LLM round-trips with tools until structured output validates,
* plain JSON matches schema, or {@link ThreadReactorConfig.maxRounds} is exceeded.
*/
export function createThreadReactor<TThread>(
config: ThreadReactorConfig<TThread>,
): ThreadReactorFn<TThread> {
return async <T>(args: {
thread: TThread;
input: string;
schema: z.ZodType<T>;
}): Promise<Result<T, string>> => {
const spec = config.structuredToolFromSchema(args.schema);
const tools = [...config.staticTools, spec.tool];
const knownNames = toolNamesFromDefinitions(tools);
const systemPrompt = config.systemPromptForStructuredTool(spec.name);
const messages: ChatMessage[] = [
{ role: "system", content: systemPrompt },
{ role: "user", content: args.input },
];
for (let round = 0; round < config.maxRounds; round++) {
const step = await runOneReactRound(
config,
{ thread: args.thread, schema: args.schema },
tools,
knownNames,
spec,
messages,
);
if (step !== null) {
return step;
}
}
return err("max_react_rounds_exceeded");
};
}
+62
View File
@@ -0,0 +1,62 @@
import type * as z from "zod/v4";
import type { Result } from "../util/index.js";
export type ToolCall = {
id: string;
type: "function";
function: { name: string; arguments: string };
};
export type ToolDefinition = {
type: "function";
function: {
name: string;
description: string;
parameters: Record<string, unknown>;
};
};
export type ChatMessage =
| { role: "system"; content: string }
| { role: "user"; content: string }
| {
role: "assistant";
content: string | null;
tool_calls: ToolCall[];
}
| { role: "assistant"; content: string }
| { role: "tool"; tool_call_id: string; content: string };
export type LlmFn = (input: {
messages: ChatMessage[];
tools: readonly ToolDefinition[];
}) => Promise<Result<string, string>>;
/** Structured tool derived from the per-invocation Zod schema (e.g. extract tool). */
export type StructuredToolSpec = {
name: string;
tool: ToolDefinition;
};
export type ThreadReactorConfig<TThread> = {
llm: LlmFn;
/** Static tools (e.g. cas_get); structured tool is appended per invocation. */
staticTools: readonly ToolDefinition[];
/** Builds the schema-shaped tool and its OpenAI name for this invocation. */
structuredToolFromSchema: (schema: z.ZodType<unknown>) => StructuredToolSpec;
/** System prompt for this run; include the structured tool name for cache stability per schema. */
systemPromptForStructuredTool: (structuredToolName: string) => string;
toolHandler: (call: ToolCall, thread: TThread) => Promise<string>;
maxRounds: number;
};
export type ThreadReactorInvokeArgs<TThread, T> = {
thread: TThread;
input: string;
schema: z.ZodType<T>;
};
export type ThreadReactorFn<TThread> = <T>(
args: ThreadReactorInvokeArgs<TThread, T>,
) => Promise<Result<T, string>>;