diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts index 6ce819d..cd5b85e 100644 --- a/packages/cli-workflow/__tests__/commands.test.ts +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -35,9 +35,9 @@ describe("cli workflow commands", () => { bundlePath, `import fs from "node:fs"; -export default async function* () { +export default async function* (input) { fs.existsSync("."); - yield { role: "noop", content: "ok", meta: { done: true } }; + yield { role: "noop", content: input.prompt, meta: { done: true } }; return { returnCode: 0, summary: "done" }; } `, @@ -81,7 +81,7 @@ export default async function* () { const bundlePath = join(storageRoot, "bad.esm.js"); await writeFile( bundlePath, - 'import x from "./local";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', + 'import x from "./local";\nexport default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n', "utf8", ); const r = await cmdAdd(storageRoot, "solve-issue", bundlePath); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 3fa147e..664fc41 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -13,16 +13,16 @@ import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; import { cmdThreads } from "../src/cmd-threads.js"; import { pathExists } from "../src/fs-utils.js"; -const fastBundleSource = `export default async function* () { - yield { role: "planner", content: "plan", meta: { plan: "x" } }; +const fastBundleSource = `export default async function* (input) { + yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; } `; -const slowPlannerBundleSource = `export default async function* () { +const slowPlannerBundleSource = `export default async function* (input) { await new Promise((r) => setTimeout(r, 400)); - yield { role: "planner", content: "plan", meta: { plan: "x" } }; + yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; } @@ -30,9 +30,9 @@ const slowPlannerBundleSource = `export default async function* () { const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); -const abortablePlannerBundleSource = `export default async function* () { +const abortablePlannerBundleSource = `export default async function* (input) { await new Promise((r) => setTimeout(r, 600)); - yield { role: "planner", content: "plan", meta: { plan: "x" } }; + yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; } diff --git a/packages/workflow/__tests__/bundle-validator.test.ts b/packages/workflow/__tests__/bundle-validator.test.ts index f9b6afa..9d9c6bc 100644 --- a/packages/workflow/__tests__/bundle-validator.test.ts +++ b/packages/workflow/__tests__/bundle-validator.test.ts @@ -6,9 +6,9 @@ describe("validateWorkflowBundle", () => { test("accepts minimal valid builtin-only bundle", () => { const source = `import fs from "node:fs"; -export default async function* run() { +export default async function* (input) { fs.existsSync("."); - return { returnCode: 0, summary: "ok" }; + return { returnCode: 0, summary: input.prompt }; } `; const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source }); @@ -18,7 +18,8 @@ export default async function* run() { test("rejects wrong filename suffix", () => { const r = validateWorkflowBundle({ filePath: "/tmp/w.js", - source: "export default async function* run() { return { returnCode: 0, summary: '' }; }\n", + source: + "export default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n", }); expect(r.ok).toBe(false); }); @@ -49,7 +50,7 @@ export default async function* run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'import x from "some-package";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', + 'import x from "some-package";\nexport default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n', }); expect(r.ok).toBe(false); }); @@ -58,7 +59,7 @@ export default async function* run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'export default async function* run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n', + 'export default async function* (input) { await import("fs"); return { returnCode: 0, summary: input.prompt }; }\n', }); expect(r.ok).toBe(false); if (!r.ok) { @@ -70,7 +71,7 @@ export default async function* run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'export default async function* run() { require("fs"); return { returnCode: 0, summary: "" }; }\n', + 'export default async function* (input) { require("fs"); return { returnCode: 0, summary: input.prompt }; }\n', }); expect(r.ok).toBe(false); }); diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index 6ec61fa..af2da5d 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -51,7 +51,7 @@ describe("executeThread", () => { const result = await executeThread( demoWorkflow, "demo-flow", - "Fix the login redirect bug in #3", + { prompt: "Fix the login redirect bug in #3", steps: [] }, { isDryRun: false, maxRounds: 5, signal: ac.signal }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, @@ -103,6 +103,53 @@ describe("executeThread", () => { } }); + test("pre-filled ThreadInput.steps skips roles already present", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-engine-fork-")); + try { + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const hash = "C9NMV6V2TQT81"; + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); + await mkdir(join(root, "logs", hash), { recursive: true }); + + const logger = createLogger({ sink: { kind: "file", path: infoPath } }); + const ac = new AbortController(); + + const result = await executeThread( + demoWorkflow, + "demo-flow", + { + prompt: "continue from planner", + steps: [ + { + role: "planner", + content: "plan-body", + meta: { plan: "do-it", files: ["a.ts"] }, + }, + ], + }, + { isDryRun: false, maxRounds: 5, signal: ac.signal }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + logger, + ); + + expect(result.returnCode).toBe(0); + + const dataText = await readFile(dataPath, "utf8"); + const lines = dataText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(2); + + const role1 = JSON.parse(lines[1] ?? "{}") as Record; + expect(role1.role).toBe("coder"); + expect(role1.content).toBe("code-body"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); + test("respects maxRounds=0 (start record only)", async () => { const root = await mkdtemp(join(tmpdir(), "wf-engine-max0-")); try { @@ -118,7 +165,7 @@ describe("executeThread", () => { const result = await executeThread( demoWorkflow, "demo-flow", - "hello", + { prompt: "hello", steps: [] }, { isDryRun: false, maxRounds: 0, signal: ac.signal }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, diff --git a/packages/workflow/__tests__/hash.test.ts b/packages/workflow/__tests__/hash.test.ts index dca18e8..23cbdd4 100644 --- a/packages/workflow/__tests__/hash.test.ts +++ b/packages/workflow/__tests__/hash.test.ts @@ -17,7 +17,7 @@ describe("hashWorkflowBundleBytes", () => { test("stable for identical content", () => { const encoder = new TextEncoder(); const data = encoder.encode( - "export default async function* run() { return { returnCode: 0, summary: '' }; }\n", + "export default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n", ); expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data)); }); diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 599b97c..74e5c9b 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -7,8 +7,8 @@ import { join } from "node:path"; import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; -const bundleSource = `export default async function* () { - yield { role: "planner", content: "p", meta: { plan: "x" } }; +const bundleSource = `export default async function* (input) { + yield { role: "planner", content: "p", meta: { plan: input.prompt } }; yield { role: "coder", content: "c", meta: { diff: "y" } }; return { returnCode: 0, summary: "completed: moderator returned END" }; } diff --git a/packages/workflow/src/create-role-moderator.ts b/packages/workflow/src/create-role-moderator.ts index b36db96..abb2f11 100644 --- a/packages/workflow/src/create-role-moderator.ts +++ b/packages/workflow/src/create-role-moderator.ts @@ -1,9 +1,11 @@ import { END, type RoleMeta, + type RoleOutput, type RoleStep, START, type ThreadContext, + type ThreadInput, type WorkflowDefinition, type WorkflowFn, type WorkflowFnOptions, @@ -24,18 +26,24 @@ export function createRoleModerator( def: Pick, "roles" | "moderator">, ): WorkflowFn { return async function* roleModeratorWorkflow( - prompt: string, + input: ThreadInput, options: WorkflowFnOptions, ): AsyncGenerator { const nowMs = Date.now(); const start: ThreadContext["start"] = { role: START, - content: prompt, - meta: { maxRounds: options.maxRounds, threadId: options.threadId }, + content: input.prompt, + meta: { maxRounds: options.maxRounds }, timestamp: nowMs, }; - let steps: RoleStep[] = []; + const baseTs = Date.now(); + let steps: RoleStep[] = input.steps.map((out, i) => ({ + role: out.role, + content: out.content, + meta: out.meta, + timestamp: baseTs + i, + })) as RoleStep[]; while (true) { if (steps.length >= options.maxRounds) { @@ -46,7 +54,6 @@ export function createRoleModerator( } const ctx: ThreadContext = { - threadId: options.threadId, start, steps, }; diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 68de1f3..0d145f0 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -2,7 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import type { LogFn } from "./logger.js"; -import type { WorkflowFn, WorkflowResult } from "./types.js"; +import type { ThreadInput, WorkflowFn, WorkflowResult } from "./types.js"; export type ExecuteThreadIo = { threadId: string; @@ -29,7 +29,7 @@ async function appendDataLine(path: string, record: unknown): Promise { export async function executeThread( fn: WorkflowFn, workflowName: string, - prompt: string, + input: ThreadInput, options: ExecuteThreadOptions, io: ExecuteThreadIo, logger: LogFn, @@ -43,7 +43,7 @@ export async function executeThread( hash: io.hash, threadId: io.threadId, parameters: { - prompt, + prompt: input.prompt, options: { isDryRun: options.isDryRun, maxRounds: options.maxRounds, @@ -64,10 +64,9 @@ export async function executeThread( }; } - const gen = fn(prompt, { + const gen = fn(input, { isDryRun: options.isDryRun, maxRounds: options.maxRounds, - threadId: io.threadId, }); let written = 0; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index a7c33cf..79fc6b5 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -47,6 +47,7 @@ export { START, type StartStep, type ThreadContext, + type ThreadInput, type WorkflowDefinition, type WorkflowFn, type WorkflowFnOptions, diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts index 37e1b5f..f4dfd9b 100644 --- a/packages/workflow/src/types.ts +++ b/packages/workflow/src/types.ts @@ -18,16 +18,21 @@ export type WorkflowResult = { summary: string; }; +/** Input to a workflow — prompt plus optional historical steps for fork/resume. */ +export type ThreadInput = { + prompt: string; + steps: RoleOutput[]; +}; + /** Options passed to a workflow bundle's default-export function (engine-provided). */ export type WorkflowFnOptions = { isDryRun: boolean; maxRounds: number; - threadId: string; }; /** Bundle contract — default export is a function returning an AsyncGenerator. */ export type WorkflowFn = ( - prompt: string, + input: ThreadInput, options: WorkflowFnOptions, ) => AsyncGenerator; @@ -41,7 +46,7 @@ export type RoleResult> = { export type StartStep = { role: typeof START; content: string; - meta: { maxRounds: number; threadId: string }; + meta: { maxRounds: number }; timestamp: number; }; @@ -52,7 +57,6 @@ export type RoleStep = { /** Thread-scoped context passed to roles and moderator. */ export type ThreadContext = { - threadId: string; start: StartStep; steps: RoleStep[]; }; diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 1e7f60b..a0b7520 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -228,7 +228,7 @@ async function main(): Promise { await executeThread( workflowFn, cmd.workflowName, - cmd.prompt, + { prompt: cmd.prompt, steps: [] }, { ...cmd.options, signal: ac.signal }, io, logger,