docs(rfc-001): WorkflowFn input → ThreadInput for fork/resume support

- First param is now { prompt, steps } instead of bare prompt
- steps: [] for new thread, pre-filled for fork/resume
- createRoleModerator naturally handles resume via moderator routing
- No special replay logic needed

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-06 05:25:00 +00:00
parent eda00d1c8e
commit 9a4cec2b2d
14 changed files with 279 additions and 168 deletions
@@ -6,7 +6,7 @@ describe("validateWorkflowBundle", () => {
test("accepts minimal valid builtin-only bundle", () => {
const source = `import fs from "node:fs";
export default async function run() {
export default async function* run() {
fs.existsSync(".");
return { returnCode: 0, summary: "ok" };
}
@@ -18,11 +18,22 @@ export default async function run() {
test("rejects wrong filename suffix", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.js",
source: "export default async function run() { return { returnCode: 0, summary: '' }; }\n",
source: "export default async function* run() { return { returnCode: 0, summary: '' }; }\n",
});
expect(r.ok).toBe(false);
});
test("rejects default export that is not a callable bundle shape", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source: 'export default { name: "x", roles: {}, moderator() { return "__end__"; } };\n',
});
expect(r.ok).toBe(false);
if (!r.ok) {
expect(r.error).toContain("default export must be a function");
}
});
test("rejects missing default export", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
@@ -38,7 +49,7 @@ export default async function run() {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'import x from "some-package";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n',
'import x from "some-package";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
});
@@ -47,7 +58,7 @@ export default async function run() {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'export default async function run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n',
'export default async function* run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
if (!r.ok) {
@@ -59,7 +70,7 @@ export default async function run() {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'export default async function run() { require("fs"); return { returnCode: 0, summary: "" }; }\n',
'export default async function* run() { require("fs"); return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
});
+6 -4
View File
@@ -3,17 +3,17 @@ import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createRoleModerator } from "../src/create-role-moderator.js";
import { executeThread } from "../src/engine.js";
import { createLogger } from "../src/logger.js";
import { END, type WorkflowDefinition } from "../src/types.js";
import { END } from "../src/types.js";
type DemoMeta = {
planner: Record<string, unknown>;
coder: Record<string, unknown>;
};
const demoWorkflow: WorkflowDefinition<DemoMeta> = {
name: "demo-flow",
const demoWorkflow = createRoleModerator<DemoMeta>({
roles: {
planner: async () => ({
content: "plan-body",
@@ -33,7 +33,7 @@ const demoWorkflow: WorkflowDefinition<DemoMeta> = {
}
return END;
},
};
});
describe("executeThread", () => {
test("writes RFC-001 `.data.jsonl` start + role records and `.info.jsonl` logs", async () => {
@@ -50,6 +50,7 @@ describe("executeThread", () => {
const result = await executeThread(
demoWorkflow,
"demo-flow",
"Fix the login redirect bug in #3",
{ isDryRun: false, maxRounds: 5, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
@@ -116,6 +117,7 @@ describe("executeThread", () => {
const result = await executeThread(
demoWorkflow,
"demo-flow",
"hello",
{ isDryRun: false, maxRounds: 0, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
+1 -1
View File
@@ -17,7 +17,7 @@ describe("hashWorkflowBundleBytes", () => {
test("stable for identical content", () => {
const encoder = new TextEncoder();
const data = encoder.encode(
"export default async function run() { return { returnCode: 0, summary: '' }; }\n",
"export default async function* run() { return { returnCode: 0, summary: '' }; }\n",
);
expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data));
});
+6 -12
View File
@@ -7,18 +7,11 @@ import { join } from "node:path";
import { getWorkerHostScriptPath } from "../src/worker-entry-path.js";
const bundleSource = `export default {
name: "demo-flow",
roles: {
planner: async () => ({ content: "p", meta: { plan: "x" } }),
coder: async () => ({ content: "c", meta: { diff: "y" } }),
},
moderator(ctx) {
if (ctx.steps.length === 0) return "planner";
if (ctx.steps.length === 1) return "coder";
return "__end__";
},
};
const bundleSource = `export default async function* () {
yield { role: "planner", content: "p", meta: { plan: "x" } };
yield { role: "coder", content: "c", meta: { diff: "y" } };
return { returnCode: 0, summary: "completed: moderator returned END" };
}
`;
async function readReadyPort(child: import("node:child_process").ChildProcess): Promise<number> {
@@ -95,6 +88,7 @@ describe("worker process", () => {
await sendJson(port, {
type: "run",
threadId,
workflowName: "demo-flow",
prompt: "hello",
options: { isDryRun: false, maxRounds: 5 },
});
+28
View File
@@ -2,6 +2,7 @@ import { isBuiltin } from "node:module";
import type {
CallExpression,
ExportAllDeclaration,
ExportDefaultDeclaration,
ExportNamedDeclaration,
ImportDeclaration,
Node,
@@ -74,6 +75,27 @@ function programHasDefaultExport(body: readonly Node[]): boolean {
return false;
}
function defaultExportDeclarationIsCallable(program: Program): boolean {
for (const stmt of program.body) {
if (stmt.type !== "ExportDefaultDeclaration") {
continue;
}
const decl = (stmt as ExportDefaultDeclaration).declaration;
if (
decl.type === "FunctionDeclaration" ||
decl.type === "FunctionExpression" ||
decl.type === "ArrowFunctionExpression"
) {
return true;
}
if (decl.type === "CallExpression") {
return true;
}
return false;
}
return false;
}
function stringLiteralModuleSpecifier(src: Node): string | null {
if (src.type !== "Literal" || typeof src.value !== "string") {
return null;
@@ -183,6 +205,12 @@ export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Re
return err("workflow bundle must have a default export");
}
if (!defaultExportDeclarationIsCallable(program)) {
return err(
"workflow bundle default export must be a function (e.g. async function*) or a call expression that returns one",
);
}
let violation: string | null = null;
walkAst(ast, (node) => {
if (violation !== null) {
@@ -0,0 +1,83 @@
import {
END,
type RoleMeta,
type RoleStep,
START,
type ThreadContext,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
type WorkflowResult,
} from "./types.js";
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
): next is keyof M & string {
return next !== END;
}
/**
* Role + Moderator pattern as an optional helper: returns a {@link WorkflowFn} that runs the
* moderator loop and yields each {@link RoleOutput}.
*/
export function createRoleModerator<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
): WorkflowFn {
return async function* roleModeratorWorkflow(
prompt: string,
options: WorkflowFnOptions,
): AsyncGenerator<RoleOutput, WorkflowResult> {
const nowMs = Date.now();
const start: ThreadContext<M>["start"] = {
role: START,
content: prompt,
meta: { maxRounds: options.maxRounds, threadId: options.threadId },
timestamp: nowMs,
};
let steps: RoleStep<M>[] = [];
while (true) {
if (steps.length >= options.maxRounds) {
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
};
}
const ctx: ThreadContext<M> = {
threadId: options.threadId,
start,
steps,
};
const next = def.moderator(ctx);
if (!isRoleNext(next)) {
return { returnCode: 0, summary: "completed: moderator returned END" };
}
const roleFn = def.roles[next];
if (roleFn === undefined) {
return { returnCode: 1, summary: `unknown role: ${next}` };
}
const result = await roleFn(ctx);
const ts = Date.now();
const step = {
role: next,
content: result.content,
meta: result.meta,
timestamp: ts,
} as RoleStep<M>;
yield {
role: step.role,
content: step.content,
meta: step.meta,
};
steps = [...steps, step];
}
};
}
+32 -62
View File
@@ -2,14 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { LogFn } from "./logger.js";
import {
END,
type RoleMeta,
type RoleStep,
START,
type ThreadContext,
type WorkflowDefinition,
} from "./types.js";
import type { WorkflowFn, WorkflowResult } from "./types.js";
export type ExecuteThreadIo = {
threadId: string;
@@ -24,41 +17,29 @@ export type ExecuteThreadOptions = {
signal: AbortSignal;
};
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
): next is keyof M & string {
return next !== END;
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
const line = `${JSON.stringify(record)}\n`;
await appendFile(path, line, "utf8");
}
/**
* Execute a workflow thread: moderator loop, role steps, RFC-001 `.data.jsonl` records,
* Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records,
* debug lines via `logger` to `.info.jsonl`.
*/
export async function executeThread<M extends RoleMeta>(
def: WorkflowDefinition<M>,
export async function executeThread(
fn: WorkflowFn,
workflowName: string,
prompt: string,
options: ExecuteThreadOptions,
io: ExecuteThreadIo,
logger: LogFn,
): Promise<{ returnCode: number; summary: string }> {
): Promise<WorkflowResult> {
await mkdir(dirname(io.dataJsonlPath), { recursive: true });
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const nowMs = Date.now();
const start: ThreadContext<M>["start"] = {
role: START,
content: prompt,
meta: { maxRounds: options.maxRounds, threadId: io.threadId },
timestamp: nowMs,
};
const startRecord = {
name: def.name,
name: workflowName,
hash: io.hash,
threadId: io.threadId,
parameters: {
@@ -73,9 +54,23 @@ export async function executeThread<M extends RoleMeta>(
await appendDataLine(io.dataJsonlPath, startRecord);
let steps: RoleStep<M>[] = [];
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${def.name}`);
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
};
}
const gen = fn(prompt, {
isDryRun: options.isDryRun,
maxRounds: options.maxRounds,
threadId: io.threadId,
});
let written = 0;
while (true) {
if (options.signal.aborted) {
@@ -83,7 +78,7 @@ export async function executeThread<M extends RoleMeta>(
return { returnCode: 130, summary: "thread aborted" };
}
if (steps.length >= options.maxRounds) {
if (written >= options.maxRounds) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return {
returnCode: 0,
@@ -91,49 +86,24 @@ export async function executeThread<M extends RoleMeta>(
};
}
const ctx: ThreadContext<M> = {
threadId: io.threadId,
start,
steps,
};
const iterResult = await gen.next();
const next = def.moderator(ctx);
if (!isRoleNext(next)) {
logger("M5FZ2K8H", `thread ${io.threadId} moderator returned END`);
return { returnCode: 0, summary: "completed: moderator returned END" };
if (iterResult.done) {
logger("F3HN8QKP", `thread ${io.threadId} generator finished`);
return iterResult.value;
}
const roleFn = def.roles[next];
if (roleFn === undefined) {
logger("K2P8QX9W", `thread ${io.threadId} unknown role ${next}`);
return { returnCode: 1, summary: `unknown role: ${next}` };
}
if (options.signal.aborted) {
logger("V8JX4NP3", `thread ${io.threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
}
const result = await roleFn(ctx);
written++;
const step = iterResult.value;
const ts = Date.now();
const step: RoleStep<M> = {
role: next,
content: result.content,
meta: result.meta,
timestamp: ts,
} as RoleStep<M>;
await appendDataLine(io.dataJsonlPath, {
role: step.role,
content: step.content,
meta: step.meta,
timestamp: step.timestamp,
timestamp: ts,
});
steps = [...steps, step];
logger("N7BW4YHQ", `thread ${io.threadId} completed role ${next}`);
logger("N7BW4YHQ", `thread ${io.threadId} wrote role ${step.role}`);
if (options.signal.aborted) {
logger("V8JX4NP4", `thread ${io.threadId} aborted`);
+5
View File
@@ -6,6 +6,7 @@ export {
encodeUint64AsCrockford,
} from "./base32.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export { createRoleModerator } from "./create-role-moderator.js";
export {
type ExecuteThreadIo,
type ExecuteThreadOptions,
@@ -40,12 +41,16 @@ export {
type Moderator,
type Role,
type RoleMeta,
type RoleOutput,
type RoleResult,
type RoleStep,
START,
type StartStep,
type ThreadContext,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
type WorkflowResult,
} from "./types.js";
export { generateUlid } from "./ulid.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
+26
View File
@@ -5,6 +5,32 @@ export const END = "__end__" as const;
/** Maps role names → their meta types. Single generic drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */
export type RoleOutput = {
role: string;
content: string;
meta: Record<string, unknown>;
};
/** What the workflow AsyncGenerator returns when done. */
export type WorkflowResult = {
returnCode: number;
summary: string;
};
/** Options passed to a workflow bundle's default-export function (engine-provided). */
export type WorkflowFnOptions = {
isDryRun: boolean;
maxRounds: number;
threadId: string;
};
/** Bundle contract — default export is a function returning an AsyncGenerator. */
export type WorkflowFn = (
prompt: string,
options: WorkflowFnOptions,
) => AsyncGenerator<RoleOutput, WorkflowResult>;
/** Typed output of a Role execution. */
export type RoleResult<Meta extends Record<string, unknown>> = {
content: string;
+22 -21
View File
@@ -5,13 +5,14 @@ import { pathToFileURL } from "node:url";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { createLogger } from "./logger.js";
import type { RoleMeta, WorkflowDefinition } from "./types.js";
import type { WorkflowFn } from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
type RunCommand = {
type: "run";
threadId: string;
workflowName: string;
prompt: string;
options: { isDryRun: boolean; maxRounds: number };
};
@@ -38,9 +39,14 @@ function parseControlPayload(payload: unknown): ControlCommand | null {
}
if (type === "run") {
const threadId = rec.threadId;
const workflowName = rec.workflowName;
const prompt = rec.prompt;
const options = rec.options;
if (typeof threadId !== "string" || typeof prompt !== "string") {
if (
typeof threadId !== "string" ||
typeof workflowName !== "string" ||
typeof prompt !== "string"
) {
return null;
}
if (options === null || typeof options !== "object") {
@@ -55,6 +61,7 @@ function parseControlPayload(payload: unknown): ControlCommand | null {
return {
type: "run",
threadId,
workflowName,
prompt,
options: { isDryRun, maxRounds },
};
@@ -77,21 +84,8 @@ function parseCommandLine(line: string): ControlCommand | null {
return parseControlPayload(parsed);
}
function isWorkflowDefinitionLike(value: unknown): value is WorkflowDefinition<RoleMeta> {
if (value === null || typeof value !== "object") {
return false;
}
const rec = value as Record<string, unknown>;
if (typeof rec.name !== "string") {
return false;
}
if (rec.roles === null || typeof rec.roles !== "object") {
return false;
}
if (typeof rec.moderator !== "function") {
return false;
}
return true;
function isWorkflowFnLike(value: unknown): value is WorkflowFn {
return typeof value === "function";
}
async function readLineFromSocket(socket: Socket): Promise<string | null> {
@@ -146,15 +140,15 @@ async function main(): Promise<void> {
const modUnknown: unknown = await import(pathToFileURL(bundlePath).href);
const modRec = modUnknown as Record<string, unknown>;
const defaultExport = modRec.default;
if (!isWorkflowDefinitionLike(defaultExport)) {
if (!isWorkflowFnLike(defaultExport)) {
bootLog(
"T4BW9YJX",
"workflow bundle default export must be a WorkflowDefinition { name, roles, moderator }",
"workflow bundle default export must be a function (AsyncGenerator workflow)",
);
process.exit(2);
return;
}
const def = defaultExport;
const workflowFn = defaultExport;
const controllers = new Map<string, AbortController>();
let activeThreads = 0;
@@ -231,7 +225,14 @@ async function main(): Promise<void> {
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
await executeThread(def, cmd.prompt, { ...cmd.options, signal: ac.signal }, io, logger);
await executeThread(
workflowFn,
cmd.workflowName,
cmd.prompt,
{ ...cmd.options, signal: ac.signal },
io,
logger,
);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);