Merge pull request 'refactor(workflow): simplify extraction + thread runtime contract' (#132) from refactor/thread-context-runtime into main

This commit is contained in:
2026-05-08 09:34:26 +00:00
50 changed files with 373 additions and 628 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
{
"$schema": "https://biomejs.dev/schemas/2.4.14/schema.json",
"files": {
"includes": ["**", "!**/dist", "!**/node_modules"]
"includes": ["**", "!**/dist", "!**/node_modules", "!packages/workflow/workflow"]
},
"assist": { "actions": { "source": { "organizeImports": "on" } } },
"formatter": {
@@ -107,7 +107,7 @@ Init 生成的骨架:\`templates/\` 下放可复用定义,\`workflows/\` 下
2. **编写 RoleDefinition**:为每个角色写 Zod \`schema\`,补齐 \`systemPrompt\` / \`extractPrompt\` / \`description\`
3. **编写 Moderator**:根据 \`ctx.steps\` 与业务状态返回下一个角色名或 \`END\`
4. **组装 WorkflowDefinition**:在模板 \`index\` 中导出 definition(以及必要的角色 / moderator 导出)。
5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding)\`(或项目约定的封装)绑定 **AgentFn**;**ExtractFn** 由引擎从 **workflow.yaml** 注入 \`WorkflowFnOptions\`
5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding)\`(或项目约定的封装)绑定 **AgentFn**;**ExtractFn** 由引擎从 **workflow.yaml** 注入 \`WorkflowRuntime\`
6. **构建**:打包为单个 **.esm.js** bundle,使用 **uncaged-workflow add** 注册。
## 4. 编码规范
@@ -60,8 +60,9 @@ export function createLiveRoutes(storageRoot: string): Hono {
if (dataPath === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const resolvedDataPath = dataPath;
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`);
return streamSSE(c, async (stream) => {
const dataState: PumpState = { contentOffset: 0, carry: "" };
@@ -71,7 +72,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
async function pumpData(): Promise<boolean> {
let text: string;
try {
text = await readFile(dataPath, "utf8");
text = await readFile(resolvedDataPath, "utf8");
} catch {
return false;
}
@@ -131,7 +132,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
const controller = new AbortController();
let completed = false;
const dataWatcher = watch(dataPath, async () => {
const dataWatcher = watch(resolvedDataPath, async () => {
if (completed) return;
const finished = await pumpData();
if (finished) {
-1
View File
@@ -203,7 +203,6 @@ Each role has:
| \`extractPrompt\` | string | Instruction for extracting structured meta |
| \`schema\` | ZodSchema | Validates the extracted meta |
| \`extractRefs\` | fn or null | Extracts CAS hashes from meta for DAG linking |
| \`extractMode\` | "single" | Extraction mode |
## Development Workflow
+6 -2
View File
@@ -7,7 +7,7 @@ async function postJson<T>(path: string, body: unknown): Promise<T> {
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.json().catch(() => ({ error: res.statusText })) as { error: string };
const err = (await res.json().catch(() => ({ error: res.statusText }))) as { error: string };
throw new Error(err.error || `API ${res.status}`);
}
return res.json() as Promise<T>;
@@ -59,7 +59,11 @@ export function getThread(id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(`/threads/${id}`);
}
export function runThread(workflow: string, prompt: string, maxRounds: number = 10): Promise<{ threadId: string }> {
export function runThread(
workflow: string,
prompt: string,
maxRounds: number = 10,
): Promise<{ threadId: string }> {
return postJson("/threads", { workflow, prompt, maxRounds });
}
+6 -8
View File
@@ -1,10 +1,10 @@
import { useState } from "react";
import { Sidebar } from "./components/sidebar.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
import { StatusBar } from "./components/status-bar.tsx";
import { RunDialog } from "./components/run-dialog.tsx";
import { Sidebar } from "./components/sidebar.tsx";
import { StatusBar } from "./components/status-bar.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
type View = "threads" | "workflows";
@@ -19,9 +19,7 @@ export function App() {
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{view === "threads" && !selectedThread && (
<ThreadList onSelect={setSelectedThread} />
)}
{view === "threads" && !selectedThread && <ThreadList onSelect={setSelectedThread} />}
{view === "threads" && selectedThread && (
<ThreadDetail threadId={selectedThread} onBack={() => setSelectedThread(null)} />
)}
@@ -41,10 +41,15 @@ export function RunDialog({ onClose, onCreated }: Props) {
<h3 className="text-lg font-semibold mb-4">Run Thread</h3>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label className="text-sm block mb-1" style={{ color: "var(--color-text-muted)" }}>
<label
htmlFor="run-workflow"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Workflow
</label>
<select
id="run-workflow"
value={workflow}
onChange={(e) => setWorkflow(e.target.value)}
className="w-full px-3 py-2 rounded border text-sm"
@@ -64,10 +69,15 @@ export function RunDialog({ onClose, onCreated }: Props) {
</select>
</div>
<div>
<label className="text-sm block mb-1" style={{ color: "var(--color-text-muted)" }}>
<label
htmlFor="run-prompt"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Prompt
</label>
<textarea
id="run-prompt"
value={prompt}
onChange={(e) => setPrompt(e.target.value)}
rows={4}
@@ -81,10 +91,15 @@ export function RunDialog({ onClose, onCreated }: Props) {
/>
</div>
<div>
<label className="text-sm block mb-1" style={{ color: "var(--color-text-muted)" }}>
<label
htmlFor="run-max-rounds"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Max Rounds
</label>
<input
id="run-max-rounds"
type="number"
value={maxRounds}
onChange={(e) => setMaxRounds(Number(e.target.value))}
@@ -98,7 +113,11 @@ export function RunDialog({ onClose, onCreated }: Props) {
}}
/>
</div>
{error && <p className="text-sm" style={{ color: "var(--color-error)" }}>{error}</p>}
{error && (
<p className="text-sm" style={{ color: "var(--color-error)" }}>
{error}
</p>
)}
<div className="flex gap-2 justify-end">
<button
type="button"
@@ -10,16 +10,22 @@ export function Sidebar({ view, onViewChange }: Props) {
];
return (
<aside className="w-56 border-r flex flex-col" style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}>
<aside
className="w-56 border-r flex flex-col"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="p-4 border-b" style={{ borderColor: "var(--color-border)" }}>
<h1 className="text-lg font-semibold" style={{ color: "var(--color-accent)" }}>
Workflow
</h1>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>Dashboard</p>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>
Dashboard
</p>
</div>
<nav className="flex-1 p-2 space-y-1">
{items.map((item) => (
<button
type="button"
key={item.key}
onClick={() => onViewChange(item.key)}
className="w-full text-left px-3 py-2 rounded text-sm transition-colors"
@@ -16,6 +16,7 @@ export function StatusBar({ onRun }: Props) {
<div className="flex items-center gap-4">
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
<button
type="button"
onClick={onRun}
className="px-3 py-1 rounded text-xs font-medium"
style={{ background: "var(--color-accent)", color: "#fff" }}
@@ -26,6 +26,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
<div>
<div className="flex items-center justify-between mb-4">
<button
type="button"
onClick={onBack}
className="text-sm hover:underline"
style={{ color: "var(--color-accent)" }}
@@ -34,6 +35,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
</button>
<div className="flex gap-2">
<button
type="button"
onClick={() => handleAction("pause")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-warning)", color: "var(--color-warning)" }}
@@ -41,6 +43,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
Pause
</button>
<button
type="button"
onClick={() => handleAction("resume")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-success)", color: "var(--color-success)" }}
@@ -48,6 +51,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
Resume
</button>
<button
type="button"
onClick={() => handleAction("kill")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-error)", color: "var(--color-error)" }}
@@ -68,9 +72,9 @@ export function ThreadDetail({ threadId, onBack }: Props) {
{status === "error" && <p style={{ color: "var(--color-error)" }}>Error: {error}</p>}
{status === "ok" && (
<div className="space-y-3">
{data.records.map((r, i) => (
{data.records.map((r) => (
<div
key={i}
key={`${r.type}:${r.role ?? ""}:${r.timestamp ?? 0}:${String(r.content ?? "")}`}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
@@ -93,7 +97,10 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
</div>
{r.content && (
<pre className="whitespace-pre-wrap text-xs mt-1" style={{ color: "var(--color-text)" }}>
<pre
className="whitespace-pre-wrap text-xs mt-1"
style={{ color: "var(--color-text)" }}
>
{typeof r.content === "string" ? r.content : JSON.stringify(r.content, null, 2)}
</pre>
)}
@@ -8,7 +8,8 @@ type Props = {
export function ThreadList({ onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(), []);
if (status === "loading") return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
if (status === "error") return <p style={{ color: "var(--color-error)" }}>Error: {error}</p>;
const threads = data.threads;
@@ -22,6 +23,7 @@ export function ThreadList({ onSelect }: Props) {
<div className="space-y-2">
{threads.map((t) => (
<button
type="button"
key={t.threadId}
onClick={() => onSelect(t.threadId)}
className="w-full text-left p-4 rounded-lg border transition-colors hover:border-[var(--color-accent-dim)]"
@@ -4,7 +4,8 @@ import { useFetch } from "../hooks.ts";
export function WorkflowList() {
const { status, data, error } = useFetch(() => listWorkflows(), []);
if (status === "loading") return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
if (status === "error") return <p style={{ color: "var(--color-error)" }}>Error: {error}</p>;
const workflows = data.workflows;
@@ -28,7 +29,10 @@ export function WorkflowList() {
{w.versions} version{w.versions !== 1 ? "s" : ""}
</span>
</div>
<code className="text-xs mt-1 block font-mono" style={{ color: "var(--color-accent)" }}>
<code
className="text-xs mt-1 block font-mono"
style={{ color: "var(--color-accent)" }}
>
{w.currentHash}
</code>
</div>
+1
View File
@@ -30,6 +30,7 @@ export function useFetch<T>(fetcher: () => Promise<T>, deps: unknown[] = []): Fe
return () => {
cancelled = true;
};
// biome-ignore lint/correctness/useExhaustiveDependencies: this helper intentionally accepts caller-provided dependency arrays
}, deps);
return state;
+1
View File
@@ -2,6 +2,7 @@ import tailwindcss from "@tailwindcss/vite";
import react from "@vitejs/plugin-react";
import { defineConfig } from "vite";
// biome-ignore lint/style/noDefaultExport: Vite loads config from default export.
export default defineConfig({
plugins: [react(), tailwindcss()],
server: {
@@ -1,16 +1,9 @@
import { describe, expect, test } from "bun:test";
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow";
import { START, type ThreadContext } from "@uncaged/workflow-runtime";
import { type AgentContext, START } from "@uncaged/workflow-runtime";
import { createLlmAdapter } from "../src/create-llm-adapter.js";
const casDir = mkdtempSync(join(tmpdir(), "wf-llm-adapter-cas-"));
const testCas = createCasStore(casDir);
function makeCtx(userContent: string): ThreadContext {
function makeCtx(userContent: string): AgentContext {
return {
start: {
role: START,
@@ -22,7 +15,6 @@ function makeCtx(userContent: string): ThreadContext {
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
cas: testCas,
};
}
@@ -1,3 +1,5 @@
import type * as z from "zod/v4";
import type { CasStore } from "../cas/types.js";
import {
type AgentBinding,
@@ -6,17 +8,16 @@ import {
END,
type ExtractContext,
type ModeratorContext,
type ResolveRoleMetaFn,
type RoleDefinition,
type RoleMeta,
type RoleOutput,
type RoleStep,
START,
type ThreadInput,
type ThreadContext,
type WorkflowCompletion,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
type WorkflowRuntime,
} from "../types.js";
import { mergeRefsWithContentHash } from "../util/index.js";
@@ -55,20 +56,13 @@ type AdvanceOutcome<M extends RoleMeta> =
async function advanceOneRound<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
binding: AgentBinding,
resolveRoleMeta: ResolveRoleMetaFn<M>,
params: {
start: ModeratorContext<M>["start"];
steps: RoleStep<M>[];
options: WorkflowFnOptions;
thread: ModeratorContext<M>;
runtime: WorkflowRuntime;
},
): Promise<AdvanceOutcome<M>> {
const { start, steps, options } = params;
const modCtx: ModeratorContext<M> = {
threadId: options.threadId,
depth: options.depth,
start,
steps,
};
const { thread, runtime } = params;
const modCtx: ModeratorContext<M> = thread;
const next = def.moderator(modCtx);
if (!isRoleNext(next)) {
@@ -86,7 +80,6 @@ async function advanceOneRound<M extends RoleMeta>(
const agentCtx: AgentContext<M> = {
...modCtx,
currentRole: { name: next, systemPrompt: roleDef.systemPrompt },
cas: options.cas,
};
const agent = agentForRole(binding, next);
@@ -97,13 +90,13 @@ async function advanceOneRound<M extends RoleMeta>(
agentContent: raw,
};
const meta = await resolveRoleMeta(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
extractCtx,
options,
const meta = await runtime.extract(
roleDef.schema as z.ZodType<Record<string, unknown>>,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
);
const contentHash = await putContentBlob(options.cas, raw);
const contentHash = await putContentBlob(runtime.cas, raw);
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
@@ -131,47 +124,36 @@ async function advanceOneRound<M extends RoleMeta>(
/**
* Binds pure role definitions + moderator to runtime agents.
* Assign with `export const run = createWorkflow(def, binding)` via `@uncaged/workflow-runtime`,
* which supplies {@link ResolveRoleMetaFn}.
* Assign with `export const run = createWorkflow(def, binding)`.
*
* Structured meta extraction is delegated to {@link WorkflowRuntime.extract}, which the
* engine resolves from the workflow registry's `extract` scene.
*/
export function createWorkflow<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
binding: AgentBinding,
resolveRoleMeta: ResolveRoleMetaFn<M>,
): WorkflowFn {
return async function* workflowLoop(
input: ThreadInput,
options: WorkflowFnOptions,
thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const nowMs = Date.now();
const start: ModeratorContext<M>["start"] = {
role: START,
content: input.prompt,
meta: { maxRounds: options.maxRounds },
timestamp: nowMs,
};
const baseTs = Date.now();
let steps: RoleStep<M>[] = input.steps.map((out, i) => ({
role: out.role,
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: baseTs + i,
})) as RoleStep<M>[];
if (thread.start.role !== START) {
throw new Error(`workflow loop expected start role to be ${START}`);
}
const maxRounds = thread.start.meta.maxRounds;
let currentThread = thread as ModeratorContext<M>;
while (true) {
if (steps.length >= options.maxRounds) {
if (currentThread.steps.length >= maxRounds) {
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
summary: `completed: reached maxRounds (${maxRounds})`,
};
}
const outcome = await advanceOneRound(def, binding, resolveRoleMeta, {
start,
steps,
options,
const outcome = await advanceOneRound(def, binding, {
thread: currentThread,
runtime,
});
if (outcome.kind === "complete") {
@@ -179,7 +161,10 @@ export function createWorkflow<M extends RoleMeta>(
}
yield outcome.output;
steps = [...steps, outcome.step];
currentThread = {
...currentThread,
steps: [...currentThread.steps, outcome.step],
};
}
};
}
+1 -4
View File
@@ -12,23 +12,20 @@ export type {
AgentContext,
AgentFn,
ExtractContext,
ExtractMode,
LlmProvider,
Moderator,
ModeratorContext,
ResolveRoleMetaFn,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
ThreadInput,
WorkflowCompletion,
WorkflowDefinition,
WorkflowFn,
WorkflowFnOptions,
WorkflowResult,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
export type { Result } from "./util/index.js";
+10 -34
View File
@@ -17,9 +17,6 @@ export type LlmProvider = {
model: string;
};
/** How the engine runs meta extraction for a role after the agent phase. */
export type ExtractMode = "single" | "react";
/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */
export type RoleOutput = {
role: string;
@@ -41,30 +38,18 @@ export type WorkflowResult = WorkflowCompletion & {
rootHash: string;
};
/** Input to a workflow — prompt plus optional historical steps for fork/resume. */
export type ThreadInput = {
prompt: string;
steps: RoleOutput[];
};
/** Options passed to a workflow bundle's `run` export (engine-provided). */
export type WorkflowFnOptions = {
threadId: string;
maxRounds: number;
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
/** Runtime dependencies passed to a workflow bundle's `run` export (engine-provided). */
export type WorkflowRuntime = {
/** Global CAS store for Merkle content blobs (role step bodies). */
cas: CasStore;
/** Structured meta extraction; resolved from workflow.yaml `extract` scene by the engine. */
extract: ExtractFn;
/** Provider for `extractMode: "react"` roles; same backing config as `extract`. */
llmProvider: LlmProvider | null;
};
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
export type WorkflowFn = (
input: ThreadInput,
options: WorkflowFnOptions,
thread: ThreadContext,
runtime: WorkflowRuntime,
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
/** Engine start frame: initial prompt + thread identity. */
@@ -86,22 +71,24 @@ export type RoleStep<M extends RoleMeta> = {
};
}[keyof M & string];
/** Phase 1: Moderator decides next role. */
export type ModeratorContext<M extends RoleMeta = RoleMeta> = {
/** Thread runtime context shared by moderator/agent/extractor phases. */
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
/** Same as `WorkflowFnOptions.depth` for the active thread. */
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
/** Phase 1: Moderator decides next role. */
export type ModeratorContext<M extends RoleMeta = RoleMeta> = ThreadContext<M>;
/** Phase 2: Agent executes — knows its role and prompt. */
export type AgentContext<M extends RoleMeta = RoleMeta> = ModeratorContext<M> & {
currentRole: {
name: string;
systemPrompt: string;
};
cas: CasStore;
};
/** Phase 3: Extractor runs — has agent output; the extraction instruction is a separate argument to the extract function. */
@@ -109,9 +96,6 @@ export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
};
/** Alias — most external consumers see the agent-phase context. */
export type ThreadContext<M extends RoleMeta = RoleMeta> = AgentContext<M>;
/** Raw string output from an LLM/CLI adapter; meta is extracted by the engine. */
export type AgentFn = (ctx: AgentContext) => Promise<string>;
@@ -129,7 +113,6 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
schema: z.ZodType<Meta>;
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
extractRefs: ((meta: Meta) => string[]) | null;
extractMode: ExtractMode;
};
/**
@@ -148,10 +131,3 @@ export type WorkflowDefinition<M extends RoleMeta> = {
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
/** Engine-injected meta extraction for workflow loops (single + react modes). */
export type ResolveRoleMetaFn<M extends RoleMeta = RoleMeta> = (
roleDef: RoleDefinition<Record<string, unknown>>,
extractCtx: ExtractContext<M>,
options: WorkflowFnOptions,
) => Promise<Record<string, unknown>>;
+4 -5
View File
@@ -2,7 +2,7 @@
Reference **develop** workflow template: plan phases, implement in a loop, review, test, then commit.
Export a `WorkflowDefinition` and `createDevelopRun` so a host can bind agents/LLM and run the same graph the bundled `.esm.js` would use. Use `buildDevelopDescriptor()` when assembling `descriptor` metadata for a bundle.
Export a pure `WorkflowDefinition` (`developWorkflowDefinition`) and role/moderator pieces. Workflow instantiation (`createWorkflow(definition, binding)`) happens in the workflow instance layer, not in this template package.
## Install
@@ -15,10 +15,10 @@ In this monorepo: `workspace:*` for `@uncaged/workflow-template-develop` and `@u
## Usage
```typescript
import { createDevelopRun, developWorkflowDefinition } from "@uncaged/workflow-template-develop";
import { createWorkflow } from "@uncaged/workflow";
import { developWorkflowDefinition } from "@uncaged/workflow-template-develop";
const run = createDevelopRun(binding, extract, llmProvider);
// run(...) executes the develop moderator graph with your AgentBinding
const run = createWorkflow(developWorkflowDefinition, binding);
```
## Roles
@@ -46,7 +46,6 @@ Also exported: role factories/meta schemas (`plannerRole`, `coderRole`, …), `D
| Export | Description |
|--------|-------------|
| `createDevelopRun` | `createWorkflow(developWorkflowDefinition, …)` factory |
| `developWorkflowDefinition` | `description`, `roles`, `developModerator` |
| `developModerator` | `Moderator<DevelopMeta>` |
| `buildDevelopDescriptor` | `buildDescriptor({ … })` for bundle metadata |
@@ -1,5 +1,4 @@
import { createWorkflow } from "@uncaged/workflow";
import type { AgentBinding, WorkflowDefinition, WorkflowFn } from "@uncaged/workflow-runtime";
import type { WorkflowDefinition } from "@uncaged/workflow-runtime";
import { developModerator } from "./moderator.js";
import { DEVELOP_WORKFLOW_DESCRIPTION, type DevelopMeta, developRoles } from "./roles.js";
@@ -36,7 +35,3 @@ export const developWorkflowDefinition: WorkflowDefinition<DevelopMeta> = {
roles: developRoles,
moderator: developModerator,
};
export function createDevelopRun(binding: AgentBinding): WorkflowFn {
return createWorkflow(developWorkflowDefinition, binding);
}
@@ -31,5 +31,4 @@ export const coderRole: RoleDefinition<CoderMeta> = {
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
schema: coderMetaSchema,
extractRefs: (meta) => [meta.completedPhase],
extractMode: "single",
};
@@ -32,5 +32,4 @@ export const committerRole: RoleDefinition<CommitterMeta> = {
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
schema: committerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -48,5 +48,4 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
extractMode: "single",
};
@@ -41,5 +41,4 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
schema: reviewerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -23,5 +23,4 @@ export const testerRole: RoleDefinition<TesterMeta> = {
"Extract the verification result: passed with summary details, or failed with details of what broke.",
schema: testerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -2,7 +2,7 @@
Reference **solve-issue** workflow template: prepare a repo, delegate implementation to the **develop** workflow, then submit (e.g. open a PR).
`createSolveIssueRun` wires the `developer` role to `workflowAsAgent("develop")` by default; `binding.overrides.developer` wins if you pass one (for tests or custom hosts).
This package exports a pure `WorkflowDefinition` (`solveIssueWorkflowDefinition`). Workflow instantiation (`createWorkflow(definition, binding)`) and any role-specific agent wiring (for example delegating `developer` to `workflowAsAgent("develop")`) are done in the workflow instance layer.
## Install
@@ -15,9 +15,10 @@ In this monorepo: `workspace:*` for this package and `@uncaged/workflow`.
## Usage
```typescript
import { createSolveIssueRun, solveIssueWorkflowDefinition } from "@uncaged/workflow-template-solve-issue";
import { createWorkflow } from "@uncaged/workflow";
import { solveIssueWorkflowDefinition } from "@uncaged/workflow-template-solve-issue";
const run = createSolveIssueRun(binding, extract, llmProvider);
const run = createWorkflow(solveIssueWorkflowDefinition, binding);
```
## Roles
@@ -41,7 +42,6 @@ Also exported: `preparerRole`, `developerRole`, `submitterRole` and their Zod me
| Export | Description |
|--------|-------------|
| `createSolveIssueRun` | Merges `developer` override with `workflowAsAgent("develop")`, then `createWorkflow` |
| `solveIssueWorkflowDefinition` | `description`, `roles`, `solveIssueModerator` |
| `solveIssueModerator` | Linear `Moderator<SolveIssueMeta>` |
| `buildSolveIssueDescriptor` | Descriptor helper for bundles |
@@ -2,7 +2,7 @@ import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, createExtract } from "@uncaged/workflow";
import { createCasStore, createExtract, createWorkflow } from "@uncaged/workflow";
import {
END,
type ModeratorContext,
@@ -12,7 +12,7 @@ import {
} from "@uncaged/workflow-runtime";
import { buildSolveIssueDescriptor } from "../src/descriptor.js";
import type { DeveloperMeta } from "../src/developer.js";
import { createSolveIssueRun, solveIssueModerator } from "../src/index.js";
import { solveIssueModerator, solveIssueWorkflowDefinition } from "../src/index.js";
import type { PreparerMeta, SubmitterMeta } from "../src/roles/index.js";
import type { SolveIssueMeta } from "../src/roles.js";
@@ -23,46 +23,7 @@ function jsonResponse(payload: Record<string, unknown>): Response {
});
}
function readToolListFromBody(init: RequestInit | undefined): readonly Record<string, unknown>[] {
if (init === undefined || init.body === undefined || init.body === null) {
return [];
}
const body = JSON.parse(String(init.body)) as Record<string, unknown>;
const tools = body.tools;
if (!Array.isArray(tools)) {
return [];
}
return tools.filter((t): t is Record<string, unknown> => t !== null && typeof t === "object");
}
function singleToolName(tools: readonly Record<string, unknown>[]): string {
if (tools.length === 0) {
return "extract";
}
const fn = tools[0].function as Record<string, unknown> | undefined;
return typeof fn?.name === "string" ? fn.name : "extract";
}
function buildSingleModeResponse(args: Record<string, unknown>, toolName: string): Response {
return jsonResponse({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: { name: toolName, arguments: JSON.stringify(args) },
},
],
},
},
],
});
}
function buildReactModeResponse(args: Record<string, unknown>): Response {
// reactExtract accepts a plain-JSON assistant message and validates it
// directly against the schema, so we skip the cas_get / extract tool dance.
function buildPlainJsonResponse(args: Record<string, unknown>): Response {
return jsonResponse({
choices: [{ message: { content: JSON.stringify(args) } }],
});
@@ -73,18 +34,59 @@ function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unkno
let i = 0;
const mockFetch = async (
_input: Parameters<typeof fetch>[0],
init?: RequestInit,
_init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
const tools = readToolListFromBody(init);
if (tools.length > 1) {
return buildReactModeResponse(args);
return buildPlainJsonResponse(args);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
function buildToolCallResponse(args: Record<string, unknown>): Response {
return jsonResponse({
choices: [
{
message: {
tool_calls: [
{
id: "tc_extract_1",
type: "function",
function: {
name: "extract",
arguments: JSON.stringify(args),
},
},
],
},
},
],
});
}
function installMockToolCallCompletions(
sequence: ReadonlyArray<Record<string, unknown>>,
): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
_input: Parameters<typeof fetch>[0],
_init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockToolCallCompletions: empty sequence");
}
return buildSingleModeResponse(args, singleToolName(tools));
i += 1;
return buildToolCallResponse(args);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
@@ -160,17 +162,30 @@ function submitterStep(meta: SubmitterMeta): RoleStep<SolveIssueMeta> {
};
}
const stubExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "",
model: "test",
});
function createStubExtract(casDir: string) {
return createExtract(
{
baseUrl: "http://127.0.0.1:9",
apiKey: "",
model: "test",
},
{ cas: createCasStore(casDir) },
);
}
const stubLlmProvider = {
baseUrl: "http://127.0.0.1:9",
apiKey: "",
model: "test",
};
function makeThread(prompt: string) {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
start: {
role: START,
content: prompt,
meta: { maxRounds: 20 },
timestamp: Date.now(),
},
steps: [],
};
}
describe("solveIssueModerator", () => {
test("routes initial → preparer → developer → submitter → END", () => {
@@ -218,7 +233,7 @@ describe("solveIssueModerator", () => {
});
});
describe("createSolveIssueRun", () => {
describe("solveIssueWorkflowDefinition + createWorkflow", () => {
let restoreFetch: (() => void) | null = null;
let casDir: string | undefined;
@@ -248,22 +263,48 @@ describe("createSolveIssueRun", () => {
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
// Override developer so the test does not spin up a child workflow.
const run = createSolveIssueRun({
const run = createWorkflow(solveIssueWorkflowDefinition, {
agent: async () => "",
overrides: { developer: async () => "stub-root-hash" },
});
const gen = run(
{ prompt: "task", steps: [] },
{
threadId: "01TEST000000000000000000TR",
maxRounds: 20,
depth: 0,
cas,
extract: stubExtract,
llmProvider: stubLlmProvider,
const gen = run(makeThread("task"), {
cas,
extract: createStubExtract(casDir),
});
const first = await gen.next();
expect(first.done).toBe(false);
if (first.done) {
throw new Error("expected yield");
}
expect(first.value.role).toBe("preparer");
expect(first.value.meta).toEqual(EXPECT_PREPARER_META);
});
test("structured extraction also accepts tool_calls extraction path", async () => {
const EXPECT_PREPARER_META: PreparerMeta = {
repoPath: "/home/user/repos/tool-call",
defaultBranch: "main",
conventions: null,
toolchain: {
packageManager: "bun",
testCommand: "bun test",
lintCommand: null,
buildCommand: "bun run build",
},
);
};
restoreFetch = installMockToolCallCompletions([EXPECT_PREPARER_META]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const run = createWorkflow(solveIssueWorkflowDefinition, {
agent: async () => "",
overrides: { developer: async () => "stub-root-hash" },
});
const gen = run(makeThread("task"), {
cas,
extract: createStubExtract(casDir),
});
const first = await gen.next();
expect(first.done).toBe(false);
if (first.done) {
@@ -296,7 +337,7 @@ describe("createSolveIssueRun", () => {
const cas = createCasStore(casDir);
const calls: string[] = [];
const run = createSolveIssueRun({
const run = createWorkflow(solveIssueWorkflowDefinition, {
agent: async () => {
calls.push("default");
return "";
@@ -316,17 +357,10 @@ describe("createSolveIssueRun", () => {
},
},
});
const gen = run(
{ prompt: "task", steps: [] },
{
threadId: "01TEST000000000000000000TR",
maxRounds: 20,
depth: 0,
cas,
extract: stubExtract,
llmProvider: stubLlmProvider,
},
);
const gen = run(makeThread("task"), {
cas,
extract: createStubExtract(casDir),
});
await gen.next();
expect(calls).toEqual(["preparer"]);
@@ -338,58 +372,6 @@ describe("createSolveIssueRun", () => {
await gen.next();
expect(calls).toEqual(["submitter"]);
});
test("developer defaults to workflowAsAgent override (caller override still wins)", async () => {
const PREPARER_META: PreparerMeta = {
repoPath: "/tmp/r",
defaultBranch: "main",
conventions: null,
toolchain: { packageManager: null, testCommand: null, lintCommand: null, buildCommand: null },
};
const DEVELOPER_META: DeveloperMeta = {
branch: "feat/y",
commitSha: "def5678",
filesChanged: ["b.ts"],
summary: "more work",
};
restoreFetch = installMockChatCompletions([PREPARER_META, DEVELOPER_META]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
let developerInvocations = 0;
const run = createSolveIssueRun({
agent: async () => "",
overrides: {
developer: async () => {
developerInvocations += 1;
return "stub-root-hash";
},
},
});
const gen = run(
{ prompt: "task", steps: [] },
{
threadId: "01TEST000000000000000000TR",
maxRounds: 20,
depth: 0,
cas,
extract: stubExtract,
llmProvider: stubLlmProvider,
},
);
// preparer
await gen.next();
// developer (caller override should be invoked, NOT workflowAsAgent default)
const devYield = await gen.next();
expect(devYield.done).toBe(false);
if (devYield.done) {
throw new Error("expected yield");
}
expect(devYield.value.role).toBe("developer");
expect(devYield.value.meta).toEqual(DEVELOPER_META);
expect(developerInvocations).toBe(1);
});
});
describe("buildSolveIssueDescriptor", () => {
@@ -32,8 +32,7 @@ describe("submitterRole", () => {
expect(submitterRole.systemPrompt).toContain("pull request");
});
test("uses single extract mode without refs", () => {
expect(submitterRole.extractMode).toBe("single");
test("has no refs extractor", () => {
expect(submitterRole.extractRefs).toBeNull();
});
});
@@ -33,5 +33,4 @@ export const developerRole: RoleDefinition<DeveloperMeta> = {
extractPrompt: DEVELOPER_EXTRACT_PROMPT,
schema: developerMetaSchema,
extractRefs: () => [],
extractMode: "react",
};
@@ -1,5 +1,4 @@
import { createWorkflow, workflowAsAgent } from "@uncaged/workflow";
import type { AgentBinding, WorkflowDefinition, WorkflowFn } from "@uncaged/workflow-runtime";
import type { WorkflowDefinition } from "@uncaged/workflow-runtime";
import { solveIssueModerator } from "./moderator.js";
import { SOLVE_ISSUE_WORKFLOW_DESCRIPTION, type SolveIssueMeta, solveIssueRoles } from "./roles.js";
@@ -31,22 +30,3 @@ export const solveIssueWorkflowDefinition: WorkflowDefinition<SolveIssueMeta> =
roles: solveIssueRoles,
moderator: solveIssueModerator,
};
/**
* Build the solve-issue {@link WorkflowFn}.
*
* The `developer` role always delegates to the registered `develop` workflow via
* {@link workflowAsAgent}; if the caller supplies their own `developer` override in
* `binding.overrides`, it takes precedence so tests and custom hosts can stub it.
*/
export function createSolveIssueRun(binding: AgentBinding): WorkflowFn {
const developerOverride = binding.overrides?.developer ?? workflowAsAgent("develop");
const mergedBinding: AgentBinding = {
agent: binding.agent,
overrides: {
...(binding.overrides ?? {}),
developer: developerOverride,
},
};
return createWorkflow(solveIssueWorkflowDefinition, mergedBinding);
}
@@ -48,5 +48,4 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
schema: preparerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -40,5 +40,4 @@ export const submitterRole: RoleDefinition<SubmitterMeta> = {
extractPrompt: SUBMITTER_EXTRACT_PROMPT,
schema: submitterMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -1,13 +1,9 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow";
import { START, type ThreadContext } from "@uncaged/workflow-runtime";
import { describe, expect, test } from "bun:test";
import { type AgentContext, START } from "@uncaged/workflow-runtime";
import { buildAgentPrompt } from "../src/index.js";
function startTask(content: string): ThreadContext["start"] {
function startTask(content: string): AgentContext["start"] {
return {
role: START,
content,
@@ -17,25 +13,13 @@ function startTask(content: string): ThreadContext["start"] {
}
describe("buildAgentPrompt", () => {
let casRoot: string;
beforeEach(async () => {
casRoot = await mkdtemp(join(tmpdir(), "wf-build-prompt-cas-"));
});
afterEach(async () => {
await rm(casRoot, { recursive: true, force: true });
});
test("includes system prompt and full task; omits tools when there are no steps", async () => {
const cas = createCasStore(casRoot);
const ctx: ThreadContext = {
const ctx: AgentContext = {
start: startTask("fix the bug"),
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
cas,
};
const text = await buildAgentPrompt(ctx);
expect(text).toContain("You are an agent.");
@@ -44,15 +28,13 @@ describe("buildAgentPrompt", () => {
expect(text).not.toContain("## Tools");
});
test("single step shows full content and meta, and includes tools", async () => {
const cas = createCasStore(casRoot);
const onlyHash = await putContentMerkleNode(cas, "only step full body");
const ctx: ThreadContext = {
test("single step shows hash and meta, and includes tools", async () => {
const onlyHash = "01HASHSINGLESTEP0000000001";
const ctx: AgentContext = {
start: startTask("user task"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
cas,
steps: [
{
role: "coder",
@@ -67,22 +49,20 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("## Task");
expect(text).toContain("user task");
expect(text).toContain("## Step: coder");
expect(text).toContain("only step full body");
expect(text).toContain(`ContentHash: ${onlyHash}`);
expect(text).toContain('Meta: {"files":["a.ts"]}');
expect(text).toContain("## Tools");
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("two or more steps: previous steps are meta-only; latest step is full", async () => {
const cas = createCasStore(casRoot);
const plannerHash = await putContentMerkleNode(cas, "PLANNER_SECRET_FULL_TEXT");
const coderHash = await putContentMerkleNode(cas, "last step full content");
const ctx: ThreadContext = {
test("two or more steps: previous steps are meta-only; latest step includes hash", async () => {
const plannerHash = "01HASHPLANNER0000000000001";
const coderHash = "01HASHCODER0000000000000001";
const ctx: AgentContext = {
start: startTask("first message full: task content here"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
cas,
steps: [
{
role: "planner",
@@ -105,25 +85,22 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("## Previous Steps");
expect(text).toContain("### Step 1: planner");
expect(text).toContain('Summary: {"plan":"short"}');
expect(text).not.toContain("PLANNER_SECRET_FULL_TEXT");
expect(text).toContain("## Latest Step: coder");
expect(text).toContain("last step full content");
expect(text).toContain(`ContentHash: ${coderHash}`);
expect(text).toContain('Meta: {"done":true}');
expect(text).toContain("## Tools");
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("middle steps show meta summary only, not full content", async () => {
const cas = createCasStore(casRoot);
const ha = await putContentMerkleNode(cas, "HIDDEN_A");
const hb = await putContentMerkleNode(cas, "HIDDEN_B_MIDDLE");
const hc = await putContentMerkleNode(cas, "VISIBLE_LAST");
const ctx: ThreadContext = {
test("middle steps show meta summary only and latest shows hash", async () => {
const ha = "01HASHA00000000000000000001";
const hb = "01HASHB00000000000000000001";
const hc = "01HASHC00000000000000000001";
const ctx: AgentContext = {
start: startTask("start"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
cas,
steps: [
{
role: "a",
@@ -149,11 +126,9 @@ describe("buildAgentPrompt", () => {
],
};
const text = await buildAgentPrompt(ctx);
expect(text).not.toContain("HIDDEN_A");
expect(text).not.toContain("HIDDEN_B_MIDDLE");
expect(text).toContain('Summary: {"n":1}');
expect(text).toContain('Summary: {"n":2}');
expect(text).toContain("VISIBLE_LAST");
expect(text).toContain(`ContentHash: ${hc}`);
expect(text).toContain("## Latest Step: c");
});
});
@@ -1,14 +1,5 @@
import { getContentMerklePayload } from "@uncaged/workflow";
import type { AgentContext } from "@uncaged/workflow-runtime";
async function resolveStepText(ctx: AgentContext, contentHash: string): Promise<string> {
const text = await getContentMerklePayload(ctx.cas, contentHash);
if (text === null) {
throw new Error(`buildAgentPrompt: missing CAS blob for ${contentHash}`);
}
return text;
}
/** Builds the full agent prompt: system instructions plus summarized thread history. */
export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
const lines: string[] = [];
@@ -24,12 +15,10 @@ export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
if (steps.length === 1) {
const s = steps[0];
const body = await resolveStepText(ctx, s.contentHash);
lines.push("");
lines.push(`## Step: ${s.role}`);
lines.push("");
lines.push(body);
lines.push("");
lines.push(`ContentHash: ${s.contentHash}`);
lines.push(`Meta: ${JSON.stringify(s.meta)}`);
} else {
lines.push("");
@@ -41,12 +30,10 @@ export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
lines.push(`Summary: ${JSON.stringify(s.meta)}`);
}
const last = steps[steps.length - 1];
const lastBody = await resolveStepText(ctx, last.contentHash);
lines.push("");
lines.push(`## Latest Step: ${last.role}`);
lines.push("");
lines.push(lastBody);
lines.push("");
lines.push(`ContentHash: ${last.contentHash}`);
lines.push(`Meta: ${JSON.stringify(last.meta)}`);
}
@@ -22,7 +22,6 @@ describe("buildDescriptor", () => {
extractPrompt: "Extract title and count from the analysis.",
schema,
extractRefs: null,
extractMode: "single",
},
},
moderator: () => END,
+7 -51
View File
@@ -33,41 +33,17 @@ function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unkno
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
_input: Parameters<typeof fetch>[0],
_init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
choices: [{ message: { content: JSON.stringify(args) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
@@ -125,7 +101,7 @@ async function writeRegistryYaml(storageRoot: string, yaml: string): Promise<voi
await writeFile(join(storageRoot, "workflow.yaml"), yaml, "utf8");
}
/** Extract rounds use tool_calls; supervisor uses plain `content` (no tools). */
/** Extract rounds reply with schema-shaped JSON in `content`; supervisor uses plain `content` (no tools advertised). */
function installMockExtractThenSupervisor(params: {
extractArgs: ReadonlyArray<Record<string, unknown>>;
supervisorContent: string;
@@ -147,26 +123,9 @@ function installMockExtractThenSupervisor(params: {
throw new Error("installMockExtractThenSupervisor: empty extractArgs");
}
extractI += 1;
const firstTool = tools[0] as Record<string, unknown>;
const fn = firstTool.function as Record<string, unknown> | undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
choices: [{ message: { content: JSON.stringify(args) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
@@ -196,7 +155,6 @@ const demoWorkflow = createWorkflow<DemoMeta>(
extractPrompt: "Extract plan text and affected files list.",
schema: plannerMetaSchema,
extractRefs: null,
extractMode: "single",
},
coder: {
description: "Demo coder",
@@ -204,7 +162,6 @@ const demoWorkflow = createWorkflow<DemoMeta>(
extractPrompt: "Extract the code diff summary.",
schema: coderMetaSchema,
extractRefs: null,
extractMode: "single",
},
},
moderator: (ctx) => {
@@ -347,7 +304,7 @@ describe("executeThread", () => {
}
});
test("pre-filled ThreadInput.steps skips roles already present", async () => {
test("pre-filled input.steps skips roles already present", async () => {
restoreFetch = installMockChatCompletions([{ diff: "+ok" }]);
const root = await mkdtemp(join(tmpdir(), "wf-engine-fork-"));
@@ -553,7 +510,7 @@ describe("executeThread", () => {
}
});
test("extractMode react traverses CAS DAG via cas_get during extraction", async () => {
test("extract traverses CAS DAG via cas_get during extraction", async () => {
const dagMetaSchema = z.object({ leafPayload: z.string() });
type DagDemoMeta = { walker: z.infer<typeof dagMetaSchema> };
@@ -663,7 +620,6 @@ describe("executeThread", () => {
"Set leafPayload to the string payload of the content Merkle node under the root.",
schema: dagMetaSchema,
extractRefs: null,
extractMode: "react",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "walker" : END),
@@ -27,41 +27,17 @@ function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unkno
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
_input: Parameters<typeof fetch>[0],
_init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
choices: [{ message: { content: JSON.stringify(args) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
@@ -94,7 +70,6 @@ const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
extractPrompt: "Extract phases with CAS hashes.",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
extractMode: "single",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
@@ -27,41 +27,17 @@ function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unkno
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
_input: Parameters<typeof fetch>[0],
_init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
choices: [{ message: { content: JSON.stringify(args) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
@@ -96,11 +72,11 @@ export const descriptor = {
},
},
};
export async function* run(input, options) {
const cas = options.cas;
export async function* run(thread, runtime) {
const cas = runtime.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
return { returnCode: 0, summary: "child-done:" + thread.start.content };
}
`;
@@ -147,7 +123,6 @@ describe("workflowAsAgent integration", () => {
extractPrompt: "extract done flag",
schema: callerMetaSchema,
extractRefs: null,
extractMode: "single",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
@@ -34,7 +34,6 @@ function makeAgentCtx(params: {
name: "caller",
systemPrompt: "caller",
},
cas: createCasStore(join(params.storageRoot, "agent-ctx-cas")),
};
}
@@ -49,11 +48,11 @@ export const descriptor = {
},
},
};
export async function* run(input, options) {
const cas = options.cas;
export async function* run(thread, runtime) {
const cas = runtime.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
return { returnCode: 0, summary: "child-done:" + thread.start.content };
}
`;
@@ -1,21 +1,8 @@
import type {
AgentBinding,
RoleMeta,
WorkflowDefinition,
WorkflowFn,
} from "@uncaged/workflow-runtime";
import { createWorkflow as createWorkflowRuntime } from "@uncaged/workflow-runtime";
import { resolveRoleMeta } from "./resolve-role-meta.js";
/**
* Binds pure role definitions + moderator to runtime agents.
* Assign with `export const run = createWorkflow(def, binding)`.
* The engine supplies {@link WorkflowFnOptions.extract} and {@link WorkflowFnOptions.llmProvider} from workflow.yaml.
* Re-export of {@link createWorkflow} from `@uncaged/workflow-runtime`.
*
* The runtime's `createWorkflow` already binds role definitions + agents to a workflow loop
* and delegates structured meta extraction to `WorkflowRuntime.extract`, which the engine
* supplies (resolved from the `extract` scene in workflow.yaml).
*/
export function createWorkflow<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
binding: AgentBinding,
): WorkflowFn {
return createWorkflowRuntime(def, binding, resolveRoleMeta);
}
export { createWorkflow } from "@uncaged/workflow-runtime";
+39 -21
View File
@@ -2,12 +2,14 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type {
LlmProvider,
ThreadInput,
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowFnOptions,
WorkflowResult,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { START } from "@uncaged/workflow-runtime";
import {
type CasStore,
getContentMerklePayload,
@@ -22,11 +24,13 @@ import { err, type LogFn, normalizeRefsField, ok, type Result } from "../util/in
import { runSupervisor } from "./supervisor.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
async function resolveEngineRegistryRuntime(storageRoot: string): Promise<
async function resolveEngineRegistryRuntime(
storageRoot: string,
cas: CasStore,
): Promise<
Result<
{
extract: ReturnType<typeof createExtract>;
llmProvider: LlmProvider;
workflowConfig: WorkflowConfig;
},
string
@@ -50,7 +54,7 @@ async function resolveEngineRegistryRuntime(storageRoot: string): Promise<
apiKey: ex.apiKey,
model: ex.model,
};
return ok({ extract: createExtract(llmProvider), llmProvider, workflowConfig: cfg });
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
@@ -104,7 +108,7 @@ async function finalizeAbortedThread(params: {
async function maybeSupervisorHaltsThread(params: {
workflowConfig: WorkflowConfig;
input: ThreadInput;
thread: ThreadContext;
written: number;
recentSupervisorSteps: readonly { role: string; summary: string }[];
logger: LogFn;
@@ -119,7 +123,7 @@ async function maybeSupervisorHaltsThread(params: {
}
const sup = await runSupervisor({
config: params.workflowConfig,
prompt: params.input.prompt,
prompt: params.thread.start.content,
recentSteps: params.recentSupervisorSteps,
logger: params.logger,
});
@@ -144,8 +148,8 @@ async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
workflowConfig: WorkflowConfig;
input: ThreadInput;
bundleOptions: WorkflowFnOptions;
thread: ThreadContext;
runtime: WorkflowRuntime;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
@@ -157,8 +161,8 @@ async function driveWorkflowGenerator(params: {
fn,
workflowName,
workflowConfig,
input,
bundleOptions,
thread,
runtime,
executeOptions,
dataJsonlPath,
threadId,
@@ -166,9 +170,9 @@ async function driveWorkflowGenerator(params: {
cas,
stepMerkleHashes,
} = params;
const gen = fn(input, bundleOptions);
const gen = fn(thread, runtime);
let written = 0;
const recentSupervisorSteps: { role: string; summary: string }[] = input.steps.map((s) => ({
const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({
role: s.role,
summary: JSON.stringify(s.meta),
}));
@@ -268,7 +272,7 @@ async function driveWorkflowGenerator(params: {
const supervised = await maybeSupervisorHaltsThread({
workflowConfig,
input,
thread,
written,
recentSupervisorSteps,
logger,
@@ -290,7 +294,7 @@ async function driveWorkflowGenerator(params: {
export async function executeThread(
fn: WorkflowFn,
workflowName: string,
input: ThreadInput,
input: { prompt: string; steps: RoleOutput[] },
options: ExecuteThreadOptions,
io: ExecuteThreadIo,
logger: LogFn,
@@ -367,26 +371,40 @@ export async function executeThread(
});
}
const registryRuntime = await resolveEngineRegistryRuntime(options.storageRoot);
const registryRuntime = await resolveEngineRegistryRuntime(options.storageRoot, io.cas);
if (!registryRuntime.ok) {
throw new Error(registryRuntime.error);
}
const bundleOptions: WorkflowFnOptions = {
const thread: ThreadContext = {
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
start: {
role: START,
content: input.prompt,
meta: { maxRounds: options.maxRounds },
timestamp: nowMs,
},
steps: input.steps.map((out, i) => ({
role: out.role,
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
})),
};
const runtime: WorkflowRuntime = {
cas: io.cas,
extract: registryRuntime.value.extract,
llmProvider: registryRuntime.value.llmProvider,
};
return await driveWorkflowGenerator({
fn,
workflowName,
workflowConfig: registryRuntime.value.workflowConfig,
input,
bundleOptions,
thread,
runtime,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
@@ -1,42 +0,0 @@
import type {
ExtractContext,
RoleDefinition,
RoleMeta,
WorkflowFnOptions,
} from "@uncaged/workflow-runtime";
import { buildExtractUserContent } from "../extract/extract-fn.js";
import { reactExtract } from "../extract/react-extract.js";
export async function resolveRoleMeta<M extends RoleMeta>(
roleDef: RoleDefinition<Record<string, unknown>>,
extractCtx: ExtractContext<M>,
options: WorkflowFnOptions,
): Promise<Record<string, unknown>> {
if (roleDef.extractMode === "react") {
if (options.llmProvider === null) {
throw new Error(
'createWorkflow: WorkflowFnOptions.llmProvider is required when a role uses extractMode "react"',
);
}
const text = await buildExtractUserContent(
extractCtx as unknown as ExtractContext,
roleDef.extractPrompt,
);
const reactResult = await reactExtract({
text,
schema: roleDef.schema,
provider: options.llmProvider,
cas: options.cas,
});
if (!reactResult.ok) {
throw new Error(`react extract failed: ${reactResult.error}`);
}
return reactResult.value as Record<string, unknown>;
}
return (await options.extract(
roleDef.schema,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
)) as Record<string, unknown>;
}
+1 -1
View File
@@ -23,7 +23,7 @@ export type PrefilledDiskStep = {
export type ExecuteThreadOptions = {
maxRounds: number;
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
/** Passed to the bundle thread context as `ThreadContext.depth`. */
depth: number;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
+16 -8
View File
@@ -1,12 +1,17 @@
import type { ExtractContext, ExtractFn, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { getContentMerklePayload } from "../cas/index.js";
import { llmExtractWithRetry } from "./llm-extract.js";
import { type CasStore, getContentMerklePayload } from "../cas/index.js";
import { reactExtract } from "./react-extract.js";
export type ExtractDeps = {
cas: CasStore;
};
/** Builds the user-side extraction prompt (thread + agent output + instruction). */
export async function buildExtractUserContent(
ctx: ExtractContext,
prompt: string,
deps: ExtractDeps,
): Promise<string> {
const lines: string[] = [];
lines.push(`## Role: ${ctx.currentRole.name}`);
@@ -18,7 +23,7 @@ export async function buildExtractUserContent(
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(ctx.cas, step.contentHash);
const body = await getContentMerklePayload(deps.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
@@ -39,18 +44,21 @@ export async function buildExtractUserContent(
/**
* Create an ExtractFn backed by an LLM provider.
* Builds prompt text from {@link ExtractContext} plus `prompt` and calls structured extraction.
*
* Internally runs a multi-turn ReAct loop with two tools (`cas_get` for traversing the
* Merkle DAG and a schema-shaped `extract` tool); the loop also accepts a plain-JSON
* assistant reply as a short-circuit, which covers the legacy "single" extraction path.
*/
export function createExtract(provider: LlmProvider): ExtractFn {
export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn {
return async <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
const text = await buildExtractUserContent(ctx, prompt);
const result = await llmExtractWithRetry({ text, schema, provider });
const text = await buildExtractUserContent(ctx, prompt, deps);
const result = await reactExtract({ text, schema, provider, cas: deps.cas });
if (!result.ok) {
throw new Error(`extract failed: ${JSON.stringify(result.error)}`);
throw new Error(`extract failed: ${result.error}`);
}
return result.value;
};
-1
View File
@@ -6,7 +6,6 @@ export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
llmExtractWithRetry,
} from "./llm-extract.js";
export { reactExtract } from "./react-extract.js";
export type {
@@ -92,20 +92,6 @@ function readToolArgumentsJson(parsed: unknown, previewSource: string): Result<s
return ok(argsRaw);
}
function isRetryableExtractError(error: LlmError): boolean {
return error.kind === "schema_validation_failed" || error.kind === "tool_arguments_invalid_json";
}
function describeRetryHint(error: LlmError): string {
if (error.kind === "schema_validation_failed") {
return `Schema validation failed: ${error.message}`;
}
if (error.kind === "tool_arguments_invalid_json") {
return `Tool arguments were not valid JSON: ${error.message}`;
}
return JSON.stringify(error);
}
export function llmErrorToCause(error: LlmError): Error {
switch (error.kind) {
case "http_error":
@@ -206,40 +192,3 @@ async function performLlmExtract<T>(
export async function llmExtract<T>(options: LlmExtractArgs<T>): Promise<Result<T, LlmError>> {
return performLlmExtract({ ...options, userContent: options.text });
}
/**
* Runs extract up to two times: on the first schema/tool-args parse failure, resends the agent
* output plus the error so the model can correct the tool call.
*/
export async function llmExtractWithRetry<T>(
options: LlmExtractArgs<T>,
): Promise<Result<T, LlmError>> {
const first = await performLlmExtract({
...options,
userContent: options.text,
});
if (first.ok) {
return first;
}
if (!isRetryableExtractError(first.error)) {
return first;
}
const hint = describeRetryHint(first.error);
const correction = `The previous extraction attempt failed.
${hint}
Respond again with a single tool call whose \`arguments\` JSON strictly matches the schema.`;
const secondContent = `${options.text}
---
${correction}`;
return performLlmExtract({
...options,
userContent: secondContent,
});
}
+23 -3
View File
@@ -57,6 +57,7 @@ type ChatMessage =
content: string | null;
tool_calls: ToolCall[];
}
| { role: "assistant"; content: string }
| { role: "tool"; tool_call_id: string; content: string };
type AssistantTurn<T> =
@@ -111,10 +112,14 @@ function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string>
return ok(toolCalls);
}
type AssistantTurnOrCorrection<T extends Record<string, unknown>> =
| AssistantTurn<T>
| { kind: "plain_json_invalid"; rawContent: string; correction: string };
function classifyAssistantTurn<T extends Record<string, unknown>>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
): Result<AssistantTurn<T>, string> {
): Result<AssistantTurnOrCorrection<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
@@ -123,11 +128,20 @@ function classifyAssistantTurn<T extends Record<string, unknown>>(
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return err("no_tool_calls_and_content_not_json");
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction:
"Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the extract tool with the structured arguments.",
});
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return err(`schema_validation_failed:${validated.error.message}`);
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the extract tool with the structured arguments.`,
});
}
return ok({ kind: "plain_json", value: validated.data });
}
@@ -298,6 +312,12 @@ export async function reactExtract<T extends Record<string, unknown>>(
return ok(turn.value);
}
if (turn.kind === "plain_json_invalid") {
messages.push({ role: "assistant", content: turn.rawContent });
messages.push({ role: "user", content: turn.correction });
continue;
}
messages.push({
role: "assistant",
content: turn.assistantContent,
-1
View File
@@ -59,7 +59,6 @@ export {
type LlmError,
llmErrorToCause,
llmExtract,
llmExtractWithRetry,
type ReactExtractArgs,
reactExtract,
} from "./extract/index.js";
+3 -3
View File
@@ -1,5 +1,5 @@
import { join } from "node:path";
import type { AgentContext, AgentFn, ThreadInput } from "@uncaged/workflow-runtime";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import { extractBundleExports } from "./bundle/index.js";
import { createCasStore } from "./cas/index.js";
import type { ExecuteThreadIo } from "./engine/index.js";
@@ -36,7 +36,7 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
*/
export function workflowAsAgent(
workflowName: string,
@@ -68,7 +68,7 @@ export function workflowAsAgent(
return `ERROR: ${bundleExportsResult.error}`;
}
const input: ThreadInput = {
const input = {
prompt: ctx.start.content,
steps: [],
};
+1
View File
@@ -1,4 +1,5 @@
{
"files": [],
"compilerOptions": {
"target": "ES2022",
"lib": ["ES2022"],