From a8c1c158d63e47da1bb8cdbada518ec9ce71e0e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 8 May 2026 02:21:45 +0000 Subject: [PATCH] feat: engine injects extract provider at runtime (Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - createWorkflow(def, binding) — no more extract/llmProvider params - Engine resolves extract provider from workflow.yaml via resolveModel - WorkflowFnOptions now carries extract + llmProvider (engine-injected) - Delete extract-provider.ts, inline maxDepth helper - Template packages simplified: only take agent binding - Breaking change: bundles no longer carry provider config Refs #110 --- examples/hello-world.ts | 10 +- .../cli-workflow/__tests__/fork-cli.test.ts | 2 + .../cli-workflow/__tests__/thread-cli.test.ts | 2 + .../__tests__/workflow-registry-fixture.ts | 18 ++++ .../src/commands/init/workspace.ts | 2 +- .../workflow-template-develop/src/index.ts | 10 +- .../__tests__/solve-issue-template.test.ts | 93 ++++++++++--------- .../src/index.ts | 10 +- packages/workflow/__tests__/engine.test.ts | 40 +++++--- .../__tests__/extract-provider.test.ts | 93 ------------------- .../workflow/__tests__/refs-tracking.test.ts | 22 +++-- packages/workflow/__tests__/worker.test.ts | 13 +++ .../workflow-as-agent-integration.test.ts | 21 +++-- .../__tests__/workflow-as-agent.test.ts | 15 +++ .../workflow/src/engine/create-workflow.ts | 30 +++--- packages/workflow/src/engine/engine.ts | 39 +++++++- packages/workflow/src/engine/types.ts | 2 + packages/workflow/src/engine/worker.ts | 1 + packages/workflow/src/extract-provider.ts | 39 -------- packages/workflow/src/index.ts | 1 - packages/workflow/src/types.ts | 5 + packages/workflow/src/workflow-as-agent.ts | 14 ++- 22 files changed, 227 insertions(+), 255 deletions(-) create mode 100644 packages/cli-workflow/__tests__/workflow-registry-fixture.ts delete mode 100644 packages/workflow/__tests__/extract-provider.test.ts delete mode 100644 packages/workflow/src/extract-provider.ts diff --git a/examples/hello-world.ts b/examples/hello-world.ts index 5991056..dccabea 100644 --- a/examples/hello-world.ts +++ b/examples/hello-world.ts @@ -1,4 +1,4 @@ -import { createExtract, createWorkflow, END, type RoleDefinition } from "@uncaged/workflow"; +import { createWorkflow, END, type RoleDefinition } from "@uncaged/workflow"; import * as z from "zod/v4"; type Roles = { @@ -32,12 +32,6 @@ const greeter: RoleDefinition = { extractMode: "single", }; -const extract = createExtract({ - baseUrl: "http://127.0.0.1:9", - apiKey: "", - model: "", -}); - export const run = createWorkflow( { roles: { greeter }, @@ -48,6 +42,4 @@ export const run = createWorkflow( { agent: async (ctx) => `Hello, ${ctx.start.content}`, }, - extract, - null, ); diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 7b340ac..9a0f385 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -7,6 +7,7 @@ import { cmdFork, cmdRun } from "../src/commands/thread/index.js"; import { cmdAdd } from "../src/commands/workflow/index.js"; import { pathExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; +import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js"; /** Three-role workflow that respects `input.steps` for fork/resume. */ const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; @@ -77,6 +78,7 @@ describe("cli fork", () => { prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-fork-")); process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + await ensureTestWorkflowRegistryConfig(storageRoot); }); afterEach(async () => { diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index dc24c43..715508b 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -19,6 +19,7 @@ import { import { cmdAdd } from "../src/commands/workflow/index.js"; import { pathExists, readTextFileIfExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; +import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js"; const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow"; `; @@ -142,6 +143,7 @@ describe("cli thread commands", () => { prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-thread-")); process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + await ensureTestWorkflowRegistryConfig(storageRoot); }); afterEach(async () => { diff --git a/packages/cli-workflow/__tests__/workflow-registry-fixture.ts b/packages/cli-workflow/__tests__/workflow-registry-fixture.ts new file mode 100644 index 0000000..6f2878a --- /dev/null +++ b/packages/cli-workflow/__tests__/workflow-registry-fixture.ts @@ -0,0 +1,18 @@ +import { writeFile } from "node:fs/promises"; +import { join } from "node:path"; + +/** Minimal valid global config so {@link executeThread} can resolve the extract scene (CLI integration tests). */ +export const TEST_WORKFLOW_REGISTRY_YAML = `config: + maxDepth: 3 + providers: + stub: + baseUrl: http://127.0.0.1:9 + apiKey: test + models: + default: stub/m +workflows: {} +`; + +export async function ensureTestWorkflowRegistryConfig(storageRoot: string): Promise { + await writeFile(join(storageRoot, "workflow.yaml"), TEST_WORKFLOW_REGISTRY_YAML, "utf8"); +} diff --git a/packages/cli-workflow/src/commands/init/workspace.ts b/packages/cli-workflow/src/commands/init/workspace.ts index 26dba34..16c3f87 100644 --- a/packages/cli-workflow/src/commands/init/workspace.ts +++ b/packages/cli-workflow/src/commands/init/workspace.ts @@ -107,7 +107,7 @@ Init 生成的骨架:\`templates/\` 下放可复用定义,\`workflows/\` 下 2. **编写 RoleDefinition**:为每个角色写 Zod \`schema\`,补齐 \`systemPrompt\` / \`extractPrompt\` / \`description\`。 3. **编写 Moderator**:根据 \`ctx.steps\` 与业务状态返回下一个角色名或 \`END\`。 4. **组装 WorkflowDefinition**:在模板 \`index\` 中导出 definition(以及必要的角色 / moderator 导出)。 -5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding, extract)\`(或项目约定的封装)绑定 **AgentFn** / **ExtractFn**。 +5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding)\`(或项目约定的封装)绑定 **AgentFn**;**ExtractFn** 由引擎从 **workflow.yaml** 注入 \`WorkflowFnOptions\`。 6. **构建**:打包为单个 **.esm.js** bundle,使用 **uncaged-workflow add** 注册。 ## 4. 编码规范 diff --git a/packages/workflow-template-develop/src/index.ts b/packages/workflow-template-develop/src/index.ts index 1bb54f2..b7a91ee 100644 --- a/packages/workflow-template-develop/src/index.ts +++ b/packages/workflow-template-develop/src/index.ts @@ -1,8 +1,6 @@ import { type AgentBinding, createWorkflow, - type ExtractFn, - type LlmProvider, type WorkflowDefinition, type WorkflowFn, } from "@uncaged/workflow"; @@ -43,10 +41,6 @@ export const developWorkflowDefinition: WorkflowDefinition = { moderator: developModerator, }; -export function createDevelopRun( - binding: AgentBinding, - extract: ExtractFn, - llmProvider: LlmProvider | null, -): WorkflowFn { - return createWorkflow(developWorkflowDefinition, binding, extract, llmProvider); +export function createDevelopRun(binding: AgentBinding): WorkflowFn { + return createWorkflow(developWorkflowDefinition, binding); } diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index ce01557..c194609 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -250,17 +250,20 @@ describe("createSolveIssueRun", () => { const cas = createCasStore(casDir); // Override developer so the test does not spin up a child workflow. - const run = createSolveIssueRun( - { - agent: async () => "", - overrides: { developer: async () => "stub-root-hash" }, - }, - stubExtract, - stubLlmProvider, - ); + const run = createSolveIssueRun({ + agent: async () => "", + overrides: { developer: async () => "stub-root-hash" }, + }); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas }, + { + threadId: "01TEST000000000000000000TR", + maxRounds: 20, + depth: 0, + cas, + extract: stubExtract, + llmProvider: stubLlmProvider, + }, ); const first = await gen.next(); expect(first.done).toBe(false); @@ -294,33 +297,36 @@ describe("createSolveIssueRun", () => { const cas = createCasStore(casDir); const calls: string[] = []; - const run = createSolveIssueRun( - { - agent: async () => { - calls.push("default"); + const run = createSolveIssueRun({ + agent: async () => { + calls.push("default"); + return ""; + }, + overrides: { + preparer: async () => { + calls.push("preparer"); return ""; }, - overrides: { - preparer: async () => { - calls.push("preparer"); - return ""; - }, - developer: async () => { - calls.push("developer"); - return "stub-root-hash"; - }, - submitter: async () => { - calls.push("submitter"); - return ""; - }, + developer: async () => { + calls.push("developer"); + return "stub-root-hash"; + }, + submitter: async () => { + calls.push("submitter"); + return ""; }, }, - stubExtract, - stubLlmProvider, - ); + }); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas }, + { + threadId: "01TEST000000000000000000TR", + maxRounds: 20, + depth: 0, + cas, + extract: stubExtract, + llmProvider: stubLlmProvider, + }, ); await gen.next(); expect(calls).toEqual(["preparer"]); @@ -353,22 +359,25 @@ describe("createSolveIssueRun", () => { const cas = createCasStore(casDir); let developerInvocations = 0; - const run = createSolveIssueRun( - { - agent: async () => "", - overrides: { - developer: async () => { - developerInvocations += 1; - return "stub-root-hash"; - }, + const run = createSolveIssueRun({ + agent: async () => "", + overrides: { + developer: async () => { + developerInvocations += 1; + return "stub-root-hash"; }, }, - stubExtract, - stubLlmProvider, - ); + }); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas }, + { + threadId: "01TEST000000000000000000TR", + maxRounds: 20, + depth: 0, + cas, + extract: stubExtract, + llmProvider: stubLlmProvider, + }, ); // preparer await gen.next(); diff --git a/packages/workflow-template-solve-issue/src/index.ts b/packages/workflow-template-solve-issue/src/index.ts index 42c4d47..d345fb6 100644 --- a/packages/workflow-template-solve-issue/src/index.ts +++ b/packages/workflow-template-solve-issue/src/index.ts @@ -1,8 +1,6 @@ import { type AgentBinding, createWorkflow, - type ExtractFn, - type LlmProvider, type WorkflowDefinition, type WorkflowFn, workflowAsAgent, @@ -46,11 +44,7 @@ export const solveIssueWorkflowDefinition: WorkflowDefinition = * {@link workflowAsAgent}; if the caller supplies their own `developer` override in * `binding.overrides`, it takes precedence so tests and custom hosts can stub it. */ -export function createSolveIssueRun( - binding: AgentBinding, - extract: ExtractFn, - llmProvider: LlmProvider | null, -): WorkflowFn { +export function createSolveIssueRun(binding: AgentBinding): WorkflowFn { const developerOverride = binding.overrides?.developer ?? workflowAsAgent("develop"); const mergedBinding: AgentBinding = { agent: binding.agent, @@ -59,5 +53,5 @@ export function createSolveIssueRun( developer: developerOverride, }, }; - return createWorkflow(solveIssueWorkflowDefinition, mergedBinding, extract, llmProvider); + return createWorkflow(solveIssueWorkflowDefinition, mergedBinding); } diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index ae05a7c..78f04b1 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; -import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import * as z from "zod/v4"; @@ -13,8 +13,7 @@ import { } from "../src/cas/merkle.js"; import { createWorkflow } from "../src/engine/create-workflow.js"; import { executeThread } from "../src/engine/engine.js"; -import { createExtract } from "../src/extract/extract-fn.js"; -import { END, type LlmProvider } from "../src/types.js"; +import { END } from "../src/types.js"; import { createLogger } from "../src/util/logger.js"; const plannerMetaSchema = z.object({ @@ -82,11 +81,20 @@ function installMockChatCompletions(sequence: ReadonlyArray { + await writeFile(join(storageRoot, "workflow.yaml"), EXTRACT_REGISTRY_YAML, "utf8"); +} const demoWorkflow = createWorkflow( { @@ -125,8 +133,6 @@ const demoWorkflow = createWorkflow( coder: async () => "code-body", }, }, - demoExtract, - null, ); describe("executeThread", () => { @@ -150,6 +156,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + await writeExtractRegistryConfig(root); const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); @@ -166,6 +173,7 @@ describe("executeThread", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, @@ -258,6 +266,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + await writeExtractRegistryConfig(root); const cas = createCasStore(join(root, "cas")); const plannerHash = await cas.put(serializeMerkleNode(createContentMerkleNode("plan-body"))); @@ -295,6 +304,7 @@ describe("executeThread", () => { timestamp: histTs, }, ], + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, @@ -354,6 +364,7 @@ describe("executeThread", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, @@ -391,6 +402,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + await writeExtractRegistryConfig(root); const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); @@ -407,6 +419,7 @@ describe("executeThread", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, @@ -549,9 +562,6 @@ describe("executeThread", () => { { preconnect: origFetch.preconnect.bind(origFetch) }, ) as typeof fetch; - const llm: LlmProvider = { baseUrl: "http://127.0.0.1:9", apiKey: "test", model: "test" }; - const extractFn = createExtract(llm); - const dagWorkflow = createWorkflow( { roles: { @@ -568,8 +578,6 @@ describe("executeThread", () => { moderator: (ctx) => (ctx.steps.length === 0 ? "walker" : END), }, { agent: async () => dagRootHash }, - extractFn, - llm, ); const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; @@ -577,6 +585,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + await writeExtractRegistryConfig(root); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); @@ -592,6 +601,7 @@ describe("executeThread", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, diff --git a/packages/workflow/__tests__/extract-provider.test.ts b/packages/workflow/__tests__/extract-provider.test.ts deleted file mode 100644 index 991e2a7..0000000 --- a/packages/workflow/__tests__/extract-provider.test.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { describe, expect, test } from "bun:test"; -import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; -import { tmpdir } from "node:os"; -import { join } from "node:path"; - -import { getExtractProvider } from "../src/extract-provider.js"; - -describe("getExtractProvider", () => { - test("returns provider when config.models.extract is present", async () => { - const root = await mkdtemp(join(tmpdir(), "wf-ext-prov-ok-")); - try { - await mkdir(root, { recursive: true }); - await writeFile( - join(root, "workflow.yaml"), - `config: - maxDepth: 3 - providers: - dashscope: - baseUrl: https://dashscope.aliyuncs.com/compatible-mode/v1 - apiKey: literal-key - models: - default: dashscope/qwen-turbo - extract: dashscope/qwen-plus -workflows: {} -`, - "utf8", - ); - const r = await getExtractProvider(root); - expect(r.ok).toBe(true); - if (!r.ok) { - return; - } - expect(r.value.baseUrl).toBe("https://dashscope.aliyuncs.com/compatible-mode/v1"); - expect(r.value.model).toBe("qwen-plus"); - expect(r.value.apiKey).toBe("literal-key"); - } finally { - await rm(root, { recursive: true, force: true }); - } - }); - - test("errs when registry has no config section", async () => { - const root = await mkdtemp(join(tmpdir(), "wf-ext-prov-missing-")); - try { - await mkdir(root, { recursive: true }); - await writeFile(join(root, "workflow.yaml"), "workflows: {}\n", "utf8"); - const r = await getExtractProvider(root); - expect(r.ok).toBe(false); - if (r.ok) { - return; - } - expect(r.error).toContain("no global config"); - } finally { - await rm(root, { recursive: true, force: true }); - } - }); - - test("resolves apiKey from env at registry read time", async () => { - const root = await mkdtemp(join(tmpdir(), "wf-ext-prov-env-")); - const prev = process.env.WF_GET_EXTRACT_PROVIDER_KEY; - process.env.WF_GET_EXTRACT_PROVIDER_KEY = "resolved-secret"; - try { - await mkdir(root, { recursive: true }); - await writeFile( - join(root, "workflow.yaml"), - `config: - maxDepth: 1 - providers: - p: - baseUrl: https://example.com - apiKey: env:WF_GET_EXTRACT_PROVIDER_KEY - models: - default: p/other-model - extract: p/m -workflows: {} -`, - "utf8", - ); - const r = await getExtractProvider(root); - expect(r.ok).toBe(true); - if (!r.ok) { - return; - } - expect(r.value.apiKey).toBe("resolved-secret"); - } finally { - if (prev === undefined) { - delete process.env.WF_GET_EXTRACT_PROVIDER_KEY; - } else { - process.env.WF_GET_EXTRACT_PROVIDER_KEY = prev; - } - await rm(root, { recursive: true, force: true }); - } - }); -}); diff --git a/packages/workflow/__tests__/refs-tracking.test.ts b/packages/workflow/__tests__/refs-tracking.test.ts index d862940..acaef7d 100644 --- a/packages/workflow/__tests__/refs-tracking.test.ts +++ b/packages/workflow/__tests__/refs-tracking.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; -import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import * as z from "zod/v4"; @@ -8,7 +8,6 @@ import { createCasStore } from "../src/cas/cas.js"; import { createWorkflow } from "../src/engine/create-workflow.js"; import { executeThread } from "../src/engine/engine.js"; import { buildForkPlan, parseThreadDataJsonl } from "../src/engine/fork-thread.js"; -import { createExtract } from "../src/extract/extract-fn.js"; import { END } from "../src/types.js"; import { createLogger } from "../src/util/logger.js"; @@ -76,11 +75,16 @@ function installMockChatCompletions(sequence: ReadonlyArray( { @@ -99,8 +103,6 @@ const refsDemoWorkflow = createWorkflow( { agent: async () => "plan-output", }, - refsDemoExtract, - null, ); describe("RoleStep refs tracking", () => { @@ -142,6 +144,7 @@ describe("RoleStep refs tracking", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + await writeFile(join(root, "workflow.yaml"), EXTRACT_REGISTRY_YAML, "utf8"); const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); @@ -158,6 +161,7 @@ describe("RoleStep refs tracking", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 943a71a..07b353b 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -9,6 +9,17 @@ import { createCasStore } from "../src/cas/cas.js"; import { createContentMerkleNode, serializeMerkleNode } from "../src/cas/merkle.js"; import { getWorkerHostScriptPath } from "../src/engine/worker-entry-path.js"; +const WORKER_REGISTRY_YAML = `config: + maxDepth: 3 + providers: + stub: + baseUrl: http://127.0.0.1:9 + apiKey: test + models: + default: stub/model +workflows: {} +`; + const bundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; export const descriptor = { @@ -89,6 +100,7 @@ describe("worker process", () => { try { const hash = "C9NMV6V2TQT81"; await mkdir(join(root, "bundles"), { recursive: true }); + await writeFile(join(root, "workflow.yaml"), WORKER_REGISTRY_YAML, "utf8"); const bundlePath = join(root, "bundles", `${hash}.esm.js`); await writeFile(bundlePath, bundleSource, "utf8"); @@ -136,6 +148,7 @@ describe("worker process", () => { try { const hash = "C9NMV6V2TQT81"; await mkdir(join(root, "bundles"), { recursive: true }); + await writeFile(join(root, "workflow.yaml"), WORKER_REGISTRY_YAML, "utf8"); const bundlePath = join(root, "bundles", `${hash}.esm.js`); await writeFile(bundlePath, bundleSource, "utf8"); diff --git a/packages/workflow/__tests__/workflow-as-agent-integration.test.ts b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts index 2761e79..93008c7 100644 --- a/packages/workflow/__tests__/workflow-as-agent-integration.test.ts +++ b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts @@ -9,7 +9,6 @@ import { hashWorkflowBundleBytes } from "../src/cas/hash.js"; import { getContentMerklePayload, parseMerkleNode } from "../src/cas/merkle.js"; import { createWorkflow } from "../src/engine/create-workflow.js"; import { executeThread } from "../src/engine/engine.js"; -import { createExtract } from "../src/extract/extract-fn.js"; import { readWorkflowRegistry, registerWorkflowVersion, @@ -76,11 +75,16 @@ function installMockChatCompletions(sequence: ReadonlyArray { const root = await mkdtemp(join(tmpdir(), "wf-waa-int-")); try { + await mkdir(root, { recursive: true }); + await writeFile(join(root, "workflow.yaml"), PARENT_REGISTRY_WITH_CONFIG, "utf8"); const { hash: childHash } = await installChildWorkflow(root); const parentWorkflow = createWorkflow( @@ -148,8 +154,6 @@ describe("workflowAsAgent integration", () => { moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END), }, { agent: workflowAsAgent("child-wf", { storageRoot: root }) }, - parentExtract, - null, ); const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; @@ -173,6 +177,7 @@ describe("workflowAsAgent integration", () => { awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + storageRoot: root, }, { threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, diff --git a/packages/workflow/__tests__/workflow-as-agent.test.ts b/packages/workflow/__tests__/workflow-as-agent.test.ts index 64bef83..7e3d1d0 100644 --- a/packages/workflow/__tests__/workflow-as-agent.test.ts +++ b/packages/workflow/__tests__/workflow-as-agent.test.ts @@ -93,6 +93,21 @@ describe("workflowAsAgent", () => { test("runs registered workflow and returns child thread root CAS hash", async () => { const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-")); try { + await mkdir(root, { recursive: true }); + await writeFile( + join(root, "workflow.yaml"), + `config: + maxDepth: 3 + providers: + stub: + baseUrl: http://127.0.0.1:9 + apiKey: test + models: + default: stub/m +workflows: {} +`, + "utf8", + ); await installChildWorkflow(root); const agent = workflowAsAgent("child-wf", { storageRoot: root }); const out = await agent( diff --git a/packages/workflow/src/engine/create-workflow.ts b/packages/workflow/src/engine/create-workflow.ts index 133b7a1..fc2f95b 100644 --- a/packages/workflow/src/engine/create-workflow.ts +++ b/packages/workflow/src/engine/create-workflow.ts @@ -1,12 +1,10 @@ -import type { CasStore } from "../cas/index.js"; import { putContentMerkleNode } from "../cas/index.js"; -import { buildExtractUserContent, type ExtractFn, reactExtract } from "../extract/index.js"; +import { buildExtractUserContent, reactExtract } from "../extract/index.js"; import { type AgentBinding, type AgentContext, END, type ExtractContext, - type LlmProvider, type ModeratorContext, type RoleDefinition, type RoleMeta, @@ -41,14 +39,12 @@ function resolveExtractedRefs( async function resolveRoleMeta( roleDef: RoleDefinition>, extractCtx: ExtractContext, - extract: ExtractFn, - llmProvider: LlmProvider | null, - cas: CasStore, + options: WorkflowFnOptions, ): Promise> { if (roleDef.extractMode === "react") { - if (llmProvider === null) { + if (options.llmProvider === null) { throw new Error( - 'createWorkflow: llmProvider is required when a role uses extractMode "react"', + 'createWorkflow: WorkflowFnOptions.llmProvider is required when a role uses extractMode "react"', ); } const text = await buildExtractUserContent( @@ -58,15 +54,15 @@ async function resolveRoleMeta( const reactResult = await reactExtract({ text, schema: roleDef.schema, - provider: llmProvider, - cas, + provider: options.llmProvider, + cas: options.cas, }); if (!reactResult.ok) { throw new Error(`react extract failed: ${reactResult.error}`); } return reactResult.value as Record; } - return (await extract( + return (await options.extract( roleDef.schema, roleDef.extractPrompt, extractCtx as unknown as ExtractContext, @@ -74,15 +70,13 @@ async function resolveRoleMeta( } /** - * Binds pure role definitions + moderator to runtime agents and structured extraction. - * Assign with `export const run = createWorkflow(def, binding, extract, llmProvider)`. - * Pass the same {@link LlmProvider} as {@link createExtract} when any role uses `extractMode: "react"`. + * Binds pure role definitions + moderator to runtime agents. + * Assign with `export const run = createWorkflow(def, binding)`. + * The engine supplies {@link WorkflowFnOptions.extract} and {@link WorkflowFnOptions.llmProvider} from workflow.yaml. */ export function createWorkflow( def: Pick, "roles" | "moderator">, binding: AgentBinding, - extract: ExtractFn, - llmProvider: LlmProvider | null, ): WorkflowFn { return async function* workflowLoop( input: ThreadInput, @@ -149,9 +143,7 @@ export function createWorkflow( const meta = await resolveRoleMeta( roleDef as unknown as RoleDefinition>, extractCtx, - extract, - llmProvider, - options.cas, + options, ); const contentHash = await putContentMerkleNode(options.cas, raw); diff --git a/packages/workflow/src/engine/engine.ts b/packages/workflow/src/engine/engine.ts index 3f76956..f603fb0 100644 --- a/packages/workflow/src/engine/engine.ts +++ b/packages/workflow/src/engine/engine.ts @@ -7,17 +7,47 @@ import { putStepMerkleNode, putThreadMerkleNode, } from "../cas/index.js"; +import { resolveModel } from "../config/index.js"; +import { createExtract } from "../extract/index.js"; +import { readWorkflowRegistry } from "../registry/index.js"; import type { + LlmProvider, ThreadInput, WorkflowCompletion, WorkflowFn, WorkflowFnOptions, WorkflowResult, } from "../types.js"; -import { type LogFn, normalizeRefsField } from "../util/index.js"; +import { err, type LogFn, normalizeRefsField, ok, type Result } from "../util/index.js"; import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js"; +async function resolveExtractRuntime( + storageRoot: string, +): Promise< + Result<{ extract: ReturnType; llmProvider: LlmProvider }, string> +> { + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + const cfg = reg.value.config; + if (cfg === null) { + return err("workflow registry has no global config section"); + } + const resolved = resolveModel(cfg, "extract"); + if (!resolved.ok) { + return resolved; + } + const ex = resolved.value; + const llmProvider: LlmProvider = { + baseUrl: ex.baseUrl, + apiKey: ex.apiKey, + model: ex.model, + }; + return ok({ extract: createExtract(llmProvider), llmProvider }); +} + async function appendDataLine(path: string, record: unknown): Promise { const line = `${JSON.stringify(record)}\n`; await appendFile(path, line, "utf8"); @@ -250,11 +280,18 @@ export async function executeThread( }); } + const extractRuntime = await resolveExtractRuntime(options.storageRoot); + if (!extractRuntime.ok) { + throw new Error(extractRuntime.error); + } + const bundleOptions: WorkflowFnOptions = { threadId: io.threadId, maxRounds: options.maxRounds, depth: options.depth, cas: io.cas, + extract: extractRuntime.value.extract, + llmProvider: extractRuntime.value.llmProvider, }; return await driveWorkflowGenerator({ diff --git a/packages/workflow/src/engine/types.ts b/packages/workflow/src/engine/types.ts index a99ac01..d0c1391 100644 --- a/packages/workflow/src/engine/types.ts +++ b/packages/workflow/src/engine/types.ts @@ -33,6 +33,8 @@ export type ExecuteThreadOptions = { * Must match `input.steps` length and order when present. */ prefilledDiskSteps: PrefilledDiskStep[] | null; + /** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */ + storageRoot: string; }; /** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ diff --git a/packages/workflow/src/engine/worker.ts b/packages/workflow/src/engine/worker.ts index e98ee80..3ab969b 100644 --- a/packages/workflow/src/engine/worker.ts +++ b/packages/workflow/src/engine/worker.ts @@ -417,6 +417,7 @@ async function main(): Promise { awaitAfterEachYield: () => pauseGate.awaitAfterYield(), forkSourceThreadId: cmd.forkSourceThreadId, prefilledDiskSteps, + storageRoot, }, io, logger, diff --git a/packages/workflow/src/extract-provider.ts b/packages/workflow/src/extract-provider.ts deleted file mode 100644 index f8e549d..0000000 --- a/packages/workflow/src/extract-provider.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { resolveModel } from "./config/index.js"; -import type { WorkflowConfig } from "./registry/index.js"; -import { readWorkflowRegistry } from "./registry/index.js"; -import type { LlmProvider } from "./types.js"; -import { err, getDefaultWorkflowStorageRoot, ok, type Result } from "./util/index.js"; - -const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3; - -export function getWorkflowAsAgentMaxDepth(config: WorkflowConfig | null): number { - if (config === null) { - return DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH; - } - return config.maxDepth; -} - -/** Loads the LLM provider for scene `extract` from workflow.yaml (`config.models` + `config.providers`; apiKey resolved at registry parse time). */ -export async function getExtractProvider( - storageRoot: string | undefined, -): Promise> { - const root = storageRoot ?? getDefaultWorkflowStorageRoot(); - const regResult = await readWorkflowRegistry(root); - if (!regResult.ok) { - return err(regResult.error.message); - } - const cfg = regResult.value.config; - if (cfg === null) { - return err("workflow registry has no global config section"); - } - const resolved = resolveModel(cfg, "extract"); - if (!resolved.ok) { - return resolved; - } - const ex = resolved.value; - return ok({ - baseUrl: ex.baseUrl, - apiKey: ex.apiKey, - model: ex.model, - }); -} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index fe7642e..617ba9e 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -63,7 +63,6 @@ export { type ReactExtractArgs, reactExtract, } from "./extract/index.js"; -export { getExtractProvider } from "./extract-provider.js"; export { getRegisteredWorkflow, listRegisteredWorkflowNames, diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts index 0536c7f..60a600c 100644 --- a/packages/workflow/src/types.ts +++ b/packages/workflow/src/types.ts @@ -1,6 +1,7 @@ import type * as z from "zod/v4"; import type { CasStore } from "./cas/index.js"; +import type { ExtractFn } from "./extract/types.js"; /** Sentinel values for automaton control flow. */ export const START = "__start__" as const; @@ -54,6 +55,10 @@ export type WorkflowFnOptions = { depth: number; /** Global CAS store for Merkle content blobs (role step bodies). */ cas: CasStore; + /** Structured meta extraction; resolved from workflow.yaml `extract` scene by the engine. */ + extract: ExtractFn; + /** Provider for `extractMode: "react"` roles; same backing config as `extract`. */ + llmProvider: LlmProvider | null; }; /** Bundle contract — named export `run` is a function returning an AsyncGenerator. */ diff --git a/packages/workflow/src/workflow-as-agent.ts b/packages/workflow/src/workflow-as-agent.ts index 48a3dac..bd15122 100644 --- a/packages/workflow/src/workflow-as-agent.ts +++ b/packages/workflow/src/workflow-as-agent.ts @@ -4,7 +4,7 @@ import { extractBundleExports } from "./bundle/index.js"; import { createCasStore } from "./cas/index.js"; import type { ExecuteThreadIo } from "./engine/index.js"; import { executeThread } from "./engine/index.js"; -import { getWorkflowAsAgentMaxDepth } from "./extract-provider.js"; +import type { WorkflowConfig } from "./registry/index.js"; import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry/index.js"; import type { AgentContext, AgentFn, ThreadInput } from "./types.js"; import { @@ -14,6 +14,15 @@ import { getGlobalCasDir, } from "./util/index.js"; +const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3; + +function workflowAsAgentMaxDepth(config: WorkflowConfig | null): number { + if (config === null) { + return DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH; + } + return config.maxDepth; +} + export type WorkflowAsAgentOptions = { /** When `null`, uses `getDefaultWorkflowStorageRoot()`. */ storageRoot: string | null; @@ -44,7 +53,7 @@ export function workflowAsAgent( return `ERROR: failed to read workflow registry: ${registryResult.error.message}`; } - const maxDepth = getWorkflowAsAgentMaxDepth(registryResult.value.config); + const maxDepth = workflowAsAgentMaxDepth(registryResult.value.config); if (nextDepth > maxDepth) { return `ERROR: workflow-as-agent depth limit exceeded (max ${maxDepth})`; } @@ -92,6 +101,7 @@ export function workflowAsAgent( awaitAfterEachYield: async () => {}, forkSourceThreadId: ctx.threadId, prefilledDiskSteps: null, + storageRoot, }, io, logger,