refactor: WorkflowFn input → ThreadInput, remove threadId from bundle contract

- WorkflowFn first param is now ThreadInput { prompt, steps }
- threadId removed from WorkflowFnOptions and ThreadContext (engine-only)
- createRoleModerator seeds context from input.steps (fork/resume ready)
- New test: pre-filled steps skip already-completed roles

Closes #6
小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-06 05:27:14 +00:00
parent 9a4cec2b2d
commit 9943f21f5c
11 changed files with 94 additions and 35 deletions
@@ -35,9 +35,9 @@ describe("cli workflow commands", () => {
bundlePath, bundlePath,
`import fs from "node:fs"; `import fs from "node:fs";
export default async function* () { export default async function* (input) {
fs.existsSync("."); fs.existsSync(".");
yield { role: "noop", content: "ok", meta: { done: true } }; yield { role: "noop", content: input.prompt, meta: { done: true } };
return { returnCode: 0, summary: "done" }; return { returnCode: 0, summary: "done" };
} }
`, `,
@@ -81,7 +81,7 @@ export default async function* () {
const bundlePath = join(storageRoot, "bad.esm.js"); const bundlePath = join(storageRoot, "bad.esm.js");
await writeFile( await writeFile(
bundlePath, bundlePath,
'import x from "./local";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', 'import x from "./local";\nexport default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n',
"utf8", "utf8",
); );
const r = await cmdAdd(storageRoot, "solve-issue", bundlePath); const r = await cmdAdd(storageRoot, "solve-issue", bundlePath);
@@ -13,16 +13,16 @@ import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js"; import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js"; import { pathExists } from "../src/fs-utils.js";
const fastBundleSource = `export default async function* () { const fastBundleSource = `export default async function* (input) {
yield { role: "planner", content: "plan", meta: { plan: "x" } }; yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } }; yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" }; return { returnCode: 0, summary: "done" };
} }
`; `;
const slowPlannerBundleSource = `export default async function* () { const slowPlannerBundleSource = `export default async function* (input) {
await new Promise((r) => setTimeout(r, 400)); await new Promise((r) => setTimeout(r, 400));
yield { role: "planner", content: "plan", meta: { plan: "x" } }; yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } }; yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" }; return { returnCode: 0, summary: "done" };
} }
@@ -30,9 +30,9 @@ const slowPlannerBundleSource = `export default async function* () {
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `export default async function* () { const abortablePlannerBundleSource = `export default async function* (input) {
await new Promise((r) => setTimeout(r, 600)); await new Promise((r) => setTimeout(r, 600));
yield { role: "planner", content: "plan", meta: { plan: "x" } }; yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } }; yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" }; return { returnCode: 0, summary: "done" };
} }
@@ -6,9 +6,9 @@ describe("validateWorkflowBundle", () => {
test("accepts minimal valid builtin-only bundle", () => { test("accepts minimal valid builtin-only bundle", () => {
const source = `import fs from "node:fs"; const source = `import fs from "node:fs";
export default async function* run() { export default async function* (input) {
fs.existsSync("."); fs.existsSync(".");
return { returnCode: 0, summary: "ok" }; return { returnCode: 0, summary: input.prompt };
} }
`; `;
const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source }); const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source });
@@ -18,7 +18,8 @@ export default async function* run() {
test("rejects wrong filename suffix", () => { test("rejects wrong filename suffix", () => {
const r = validateWorkflowBundle({ const r = validateWorkflowBundle({
filePath: "/tmp/w.js", filePath: "/tmp/w.js",
source: "export default async function* run() { return { returnCode: 0, summary: '' }; }\n", source:
"export default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n",
}); });
expect(r.ok).toBe(false); expect(r.ok).toBe(false);
}); });
@@ -49,7 +50,7 @@ export default async function* run() {
const r = validateWorkflowBundle({ const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js", filePath: "/tmp/w.esm.js",
source: source:
'import x from "some-package";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', 'import x from "some-package";\nexport default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n',
}); });
expect(r.ok).toBe(false); expect(r.ok).toBe(false);
}); });
@@ -58,7 +59,7 @@ export default async function* run() {
const r = validateWorkflowBundle({ const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js", filePath: "/tmp/w.esm.js",
source: source:
'export default async function* run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n', 'export default async function* (input) { await import("fs"); return { returnCode: 0, summary: input.prompt }; }\n',
}); });
expect(r.ok).toBe(false); expect(r.ok).toBe(false);
if (!r.ok) { if (!r.ok) {
@@ -70,7 +71,7 @@ export default async function* run() {
const r = validateWorkflowBundle({ const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js", filePath: "/tmp/w.esm.js",
source: source:
'export default async function* run() { require("fs"); return { returnCode: 0, summary: "" }; }\n', 'export default async function* (input) { require("fs"); return { returnCode: 0, summary: input.prompt }; }\n',
}); });
expect(r.ok).toBe(false); expect(r.ok).toBe(false);
}); });
+49 -2
View File
@@ -51,7 +51,7 @@ describe("executeThread", () => {
const result = await executeThread( const result = await executeThread(
demoWorkflow, demoWorkflow,
"demo-flow", "demo-flow",
"Fix the login redirect bug in #3", { prompt: "Fix the login redirect bug in #3", steps: [] },
{ isDryRun: false, maxRounds: 5, signal: ac.signal }, { isDryRun: false, maxRounds: 5, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger, logger,
@@ -103,6 +103,53 @@ describe("executeThread", () => {
} }
}); });
test("pre-filled ThreadInput.steps skips roles already present", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-engine-fork-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
demoWorkflow,
"demo-flow",
{
prompt: "continue from planner",
steps: [
{
role: "planner",
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
},
],
},
{ isDryRun: false, maxRounds: 5, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("coder");
expect(role1.content).toBe("code-body");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("respects maxRounds=0 (start record only)", async () => { test("respects maxRounds=0 (start record only)", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-engine-max0-")); const root = await mkdtemp(join(tmpdir(), "wf-engine-max0-"));
try { try {
@@ -118,7 +165,7 @@ describe("executeThread", () => {
const result = await executeThread( const result = await executeThread(
demoWorkflow, demoWorkflow,
"demo-flow", "demo-flow",
"hello", { prompt: "hello", steps: [] },
{ isDryRun: false, maxRounds: 0, signal: ac.signal }, { isDryRun: false, maxRounds: 0, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger, logger,
+1 -1
View File
@@ -17,7 +17,7 @@ describe("hashWorkflowBundleBytes", () => {
test("stable for identical content", () => { test("stable for identical content", () => {
const encoder = new TextEncoder(); const encoder = new TextEncoder();
const data = encoder.encode( const data = encoder.encode(
"export default async function* run() { return { returnCode: 0, summary: '' }; }\n", "export default async function* (input) { return { returnCode: 0, summary: input.prompt }; }\n",
); );
expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data)); expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data));
}); });
+2 -2
View File
@@ -7,8 +7,8 @@ import { join } from "node:path";
import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; import { getWorkerHostScriptPath } from "../src/worker-entry-path.js";
const bundleSource = `export default async function* () { const bundleSource = `export default async function* (input) {
yield { role: "planner", content: "p", meta: { plan: "x" } }; yield { role: "planner", content: "p", meta: { plan: input.prompt } };
yield { role: "coder", content: "c", meta: { diff: "y" } }; yield { role: "coder", content: "c", meta: { diff: "y" } };
return { returnCode: 0, summary: "completed: moderator returned END" }; return { returnCode: 0, summary: "completed: moderator returned END" };
} }
+12 -5
View File
@@ -1,9 +1,11 @@
import { import {
END, END,
type RoleMeta, type RoleMeta,
type RoleOutput,
type RoleStep, type RoleStep,
START, START,
type ThreadContext, type ThreadContext,
type ThreadInput,
type WorkflowDefinition, type WorkflowDefinition,
type WorkflowFn, type WorkflowFn,
type WorkflowFnOptions, type WorkflowFnOptions,
@@ -24,18 +26,24 @@ export function createRoleModerator<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">, def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
): WorkflowFn { ): WorkflowFn {
return async function* roleModeratorWorkflow( return async function* roleModeratorWorkflow(
prompt: string, input: ThreadInput,
options: WorkflowFnOptions, options: WorkflowFnOptions,
): AsyncGenerator<RoleOutput, WorkflowResult> { ): AsyncGenerator<RoleOutput, WorkflowResult> {
const nowMs = Date.now(); const nowMs = Date.now();
const start: ThreadContext<M>["start"] = { const start: ThreadContext<M>["start"] = {
role: START, role: START,
content: prompt, content: input.prompt,
meta: { maxRounds: options.maxRounds, threadId: options.threadId }, meta: { maxRounds: options.maxRounds },
timestamp: nowMs, timestamp: nowMs,
}; };
let steps: RoleStep<M>[] = []; const baseTs = Date.now();
let steps: RoleStep<M>[] = input.steps.map((out, i) => ({
role: out.role,
content: out.content,
meta: out.meta,
timestamp: baseTs + i,
})) as RoleStep<M>[];
while (true) { while (true) {
if (steps.length >= options.maxRounds) { if (steps.length >= options.maxRounds) {
@@ -46,7 +54,6 @@ export function createRoleModerator<M extends RoleMeta>(
} }
const ctx: ThreadContext<M> = { const ctx: ThreadContext<M> = {
threadId: options.threadId,
start, start,
steps, steps,
}; };
+4 -5
View File
@@ -2,7 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path"; import { dirname } from "node:path";
import type { LogFn } from "./logger.js"; import type { LogFn } from "./logger.js";
import type { WorkflowFn, WorkflowResult } from "./types.js"; import type { ThreadInput, WorkflowFn, WorkflowResult } from "./types.js";
export type ExecuteThreadIo = { export type ExecuteThreadIo = {
threadId: string; threadId: string;
@@ -29,7 +29,7 @@ async function appendDataLine(path: string, record: unknown): Promise<void> {
export async function executeThread( export async function executeThread(
fn: WorkflowFn, fn: WorkflowFn,
workflowName: string, workflowName: string,
prompt: string, input: ThreadInput,
options: ExecuteThreadOptions, options: ExecuteThreadOptions,
io: ExecuteThreadIo, io: ExecuteThreadIo,
logger: LogFn, logger: LogFn,
@@ -43,7 +43,7 @@ export async function executeThread(
hash: io.hash, hash: io.hash,
threadId: io.threadId, threadId: io.threadId,
parameters: { parameters: {
prompt, prompt: input.prompt,
options: { options: {
isDryRun: options.isDryRun, isDryRun: options.isDryRun,
maxRounds: options.maxRounds, maxRounds: options.maxRounds,
@@ -64,10 +64,9 @@ export async function executeThread(
}; };
} }
const gen = fn(prompt, { const gen = fn(input, {
isDryRun: options.isDryRun, isDryRun: options.isDryRun,
maxRounds: options.maxRounds, maxRounds: options.maxRounds,
threadId: io.threadId,
}); });
let written = 0; let written = 0;
+1
View File
@@ -47,6 +47,7 @@ export {
START, START,
type StartStep, type StartStep,
type ThreadContext, type ThreadContext,
type ThreadInput,
type WorkflowDefinition, type WorkflowDefinition,
type WorkflowFn, type WorkflowFn,
type WorkflowFnOptions, type WorkflowFnOptions,
+8 -4
View File
@@ -18,16 +18,21 @@ export type WorkflowResult = {
summary: string; summary: string;
}; };
/** Input to a workflow — prompt plus optional historical steps for fork/resume. */
export type ThreadInput = {
prompt: string;
steps: RoleOutput[];
};
/** Options passed to a workflow bundle's default-export function (engine-provided). */ /** Options passed to a workflow bundle's default-export function (engine-provided). */
export type WorkflowFnOptions = { export type WorkflowFnOptions = {
isDryRun: boolean; isDryRun: boolean;
maxRounds: number; maxRounds: number;
threadId: string;
}; };
/** Bundle contract — default export is a function returning an AsyncGenerator. */ /** Bundle contract — default export is a function returning an AsyncGenerator. */
export type WorkflowFn = ( export type WorkflowFn = (
prompt: string, input: ThreadInput,
options: WorkflowFnOptions, options: WorkflowFnOptions,
) => AsyncGenerator<RoleOutput, WorkflowResult>; ) => AsyncGenerator<RoleOutput, WorkflowResult>;
@@ -41,7 +46,7 @@ export type RoleResult<Meta extends Record<string, unknown>> = {
export type StartStep = { export type StartStep = {
role: typeof START; role: typeof START;
content: string; content: string;
meta: { maxRounds: number; threadId: string }; meta: { maxRounds: number };
timestamp: number; timestamp: number;
}; };
@@ -52,7 +57,6 @@ export type RoleStep<M extends RoleMeta> = {
/** Thread-scoped context passed to roles and moderator. */ /** Thread-scoped context passed to roles and moderator. */
export type ThreadContext<M extends RoleMeta = RoleMeta> = { export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
start: StartStep; start: StartStep;
steps: RoleStep<M>[]; steps: RoleStep<M>[];
}; };
+1 -1
View File
@@ -228,7 +228,7 @@ async function main(): Promise<void> {
await executeThread( await executeThread(
workflowFn, workflowFn,
cmd.workflowName, cmd.workflowName,
cmd.prompt, { prompt: cmd.prompt, steps: [] },
{ ...cmd.options, signal: ac.signal }, { ...cmd.options, signal: ac.signal },
io, io,
logger, logger,