From dfbba0f58ccb507f3170e921dd0ed357d1711b57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 6 May 2026 05:45:01 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=204=20=E2=80=94=20fork=20threads?= =?UTF-8?q?=20+=20bun=20publish=20verified?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - fork-thread.ts: parse .data.jsonl, trim steps by role - cmd-fork.ts: --from-role or retry last step - engine: forkFrom lineage tracking, prefilled step replay - worker: accept steps in run IPC command - bun publish --dry-run: both packages pass - 53 tests pass, biome clean Closes #5 小橘 --- .../cli-workflow/__tests__/fork-cli.test.ts | 212 ++++++++++++++++ packages/cli-workflow/src/cli-dispatch.ts | 18 ++ packages/cli-workflow/src/cmd-fork.ts | 91 +++++++ packages/workflow/__tests__/engine.test.ts | 39 ++- .../workflow/__tests__/fork-thread.test.ts | 86 +++++++ packages/workflow/__tests__/worker.test.ts | 72 +++++- packages/workflow/src/engine.ts | 168 ++++++++----- packages/workflow/src/fork-thread.ts | 228 ++++++++++++++++++ packages/workflow/src/index.ts | 9 + packages/workflow/src/worker.ts | 94 +++++++- 10 files changed, 953 insertions(+), 64 deletions(-) create mode 100644 packages/cli-workflow/__tests__/fork-cli.test.ts create mode 100644 packages/cli-workflow/src/cmd-fork.ts create mode 100644 packages/workflow/__tests__/fork-thread.test.ts create mode 100644 packages/workflow/src/fork-thread.ts diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts new file mode 100644 index 0000000..c0e4b3a --- /dev/null +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -0,0 +1,212 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { cmdAdd } from "../src/cmd-add.js"; +import { cmdFork } from "../src/cmd-fork.js"; +import { cmdRun } from "../src/cmd-run.js"; +import { pathExists } from "../src/fs-utils.js"; + +/** Three-role workflow that respects `input.steps` for fork/resume. */ +const threeRoleBundleSource = `export default async function* (input) { + const has = (r) => input.steps.some((s) => s.role === r); + if (!has("planner")) { + yield { role: "planner", content: "p1", meta: { k: "planner" } }; + } + if (!has("coder")) { + yield { role: "coder", content: "c1", meta: { k: "coder" } }; + } + if (!has("reviewer")) { + yield { + role: "reviewer", + content: "rev-" + String(input.steps.length), + meta: { k: "reviewer" }, + }; + } + return { returnCode: 0, summary: "done" }; +} +`; + +async function countDataJsonlLines(dataPath: string): Promise { + try { + const text = await readFile(dataPath, "utf8"); + return text + .trim() + .split("\n") + .filter((l) => l !== "").length; + } catch { + return 0; + } +} + +async function waitUntilMinDataLines(dataPath: string, minLines: number): Promise { + for (let attempt = 0; attempt < 120; attempt++) { + if ((await countDataJsonlLines(dataPath)) >= minLines) { + return; + } + await new Promise((r) => setTimeout(r, 25)); + } +} + +async function waitUntilRunningAbsent(runningPath: string): Promise { + for (let attempt = 0; attempt < 120; attempt++) { + if (!(await pathExists(runningPath))) { + return; + } + await new Promise((r) => setTimeout(r, 25)); + } +} + +describe("cli fork", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-fork-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("fork --from-role planner continues with coder then reviewer", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, threeRoleBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + const hash = added.value.hash; + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + const sourceId = ran.value.threadId; + const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); + const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); + await waitUntilRunningAbsent(sourceRunning); + await waitUntilMinDataLines(sourceData, 4); + + const forked = await cmdFork(storageRoot, sourceId, "planner"); + expect(forked.ok).toBe(true); + if (!forked.ok) { + return; + } + const newId = forked.value.threadId; + const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); + const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); + await waitUntilRunningAbsent(newRunning); + await waitUntilMinDataLines(newData, 4); + + const text = await readFile(newData, "utf8"); + const lines = text + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(4); + const start = JSON.parse(lines[0] ?? "{}") as Record; + expect(start.threadId).toBe(newId); + expect(start.forkFrom).toEqual({ threadId: sourceId }); + + const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; + expect(last.role).toBe("reviewer"); + expect(last.content).toBe("rev-1"); + }); + + test("fork without --from-role retries last role", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, threeRoleBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + const hash = added.value.hash; + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + const sourceId = ran.value.threadId; + const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); + const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); + await waitUntilRunningAbsent(sourceRunning); + await waitUntilMinDataLines(sourceData, 4); + + const forked = await cmdFork(storageRoot, sourceId, null); + expect(forked.ok).toBe(true); + if (!forked.ok) { + return; + } + const newId = forked.value.threadId; + const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); + const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); + await waitUntilRunningAbsent(newRunning); + await waitUntilMinDataLines(newData, 4); + + const text = await readFile(newData, "utf8"); + const lines = text + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(4); + + const replayCoder = JSON.parse(lines[2] ?? "{}") as Record; + expect(replayCoder.role).toBe("coder"); + expect(replayCoder.content).toBe("c1"); + + const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; + expect(last.role).toBe("reviewer"); + expect(last.content).toBe("rev-2"); + }); + + test("fork rejects unknown role with available names", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, threeRoleBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + const sourceId = ran.value.threadId; + const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`); + const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`); + await waitUntilRunningAbsent(sourceRunning); + await waitUntilMinDataLines(sourceData, 4); + + const bad = await cmdFork(storageRoot, sourceId, "ghost-role"); + expect(bad.ok).toBe(false); + if (bad.ok) { + return; + } + expect(bad.error).toContain("ghost-role"); + expect(bad.error).toContain("planner"); + }); +}); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 1b41884..b35d004 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -1,5 +1,6 @@ import { printCliError, printCliLine } from "./cli-output.js"; import { cmdAdd, formatAddSuccess } from "./cmd-add.js"; +import { cmdFork, parseForkArgv } from "./cmd-fork.js"; import { cmdHistory } from "./cmd-history.js"; import { cmdKill } from "./cmd-kill.js"; import { cmdList, formatListLines } from "./cmd-list.js"; @@ -31,6 +32,7 @@ function usage(): string { " uncaged-workflow threads [name]", " uncaged-workflow thread ", " uncaged-workflow thread rm ", + " uncaged-workflow fork [--from-role ]", ].join("\n"); } @@ -258,6 +260,21 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis return dispatchThread(storageRoot, rest); } +async function dispatchFork(storageRoot: string, argv: string[]): Promise { + const parsed = parseForkArgv(argv); + if (!parsed.ok) { + printCliError(`${usage()}\n\nerror: ${parsed.error}`); + return 1; + } + const result = await cmdFork(storageRoot, parsed.value.threadId, parsed.value.fromRole); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(result.value.threadId); + return 0; +} + type DispatchFn = (storageRoot: string, argv: string[]) => Promise; const COMMAND_TABLE: Record = { @@ -274,6 +291,7 @@ const COMMAND_TABLE: Record = { resume: dispatchResume, threads: dispatchThreads, thread: dispatchThreadBranch, + fork: dispatchFork, }; export async function runCli(storageRoot: string, argv: string[]): Promise { diff --git a/packages/cli-workflow/src/cmd-fork.ts b/packages/cli-workflow/src/cmd-fork.ts new file mode 100644 index 0000000..b16db17 --- /dev/null +++ b/packages/cli-workflow/src/cmd-fork.ts @@ -0,0 +1,91 @@ +import { join } from "node:path"; + +import { buildForkPlan, err, generateUlid, ok, type Result } from "@uncaged/workflow"; + +import { pathExists, readTextFileIfExists } from "./fs-utils.js"; +import { resolveThreadDataPath } from "./thread-scan.js"; +import { ensureWorkerForHash, sendWorkerTcpCommand } from "./worker-spawn.js"; + +export function parseForkArgv( + argv: string[], +): Result<{ threadId: string; fromRole: string | null }, string> { + if (argv.length === 0) { + return err("fork requires "); + } + const threadId = argv[0]; + if (threadId === undefined || threadId === "") { + return err("fork requires "); + } + let fromRole: string | null = null; + for (let i = 1; i < argv.length; i++) { + const a = argv[i]; + if (a === "--from-role") { + const r = argv[i + 1]; + if (r === undefined || r === "") { + return err("--from-role requires a role name"); + } + fromRole = r; + i++; + continue; + } + return err(`unexpected argument: ${a}`); + } + return ok({ threadId, fromRole }); +} + +export async function cmdFork( + storageRoot: string, + threadId: string, + fromRole: string | null, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const text = await readTextFileIfExists(dataPath); + if (text === null) { + return err(`thread data missing: ${threadId}`); + } + + const plan = buildForkPlan(text, fromRole); + if (!plan.ok) { + return plan; + } + + const bundlePath = join(storageRoot, "bundles", `${plan.value.hash}.esm.js`); + if (!(await pathExists(bundlePath))) { + return err(`bundle file missing for thread hash ${plan.value.hash}`); + } + + const worker = await ensureWorkerForHash(storageRoot, plan.value.hash, bundlePath); + if (!worker.ok) { + return worker; + } + + const newThreadId = generateUlid(Date.now()); + const stepsOnWire = plan.value.historicalSteps.map((s) => ({ + role: s.role, + content: s.content, + meta: s.meta, + timestamp: s.timestamp, + })); + + const sent = await sendWorkerTcpCommand( + worker.value.port, + { + type: "run", + threadId: newThreadId, + workflowName: plan.value.workflowName, + prompt: plan.value.prompt, + options: plan.value.runOptions, + steps: stepsOnWire, + forkSourceThreadId: plan.value.sourceThreadId, + }, + { awaitResponseLine: false }, + ); + if (!sent.ok) { + return sent; + } + + return ok({ threadId: newThreadId }); +} diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index ce0e7b6..f08cb96 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -52,7 +52,14 @@ describe("executeThread", () => { demoWorkflow, "demo-flow", { prompt: "Fix the login redirect bug in #3", steps: [] }, - { isDryRun: false, maxRounds: 5, signal: ac.signal, awaitAfterEachYield: async () => {} }, + { + isDryRun: false, + maxRounds: 5, + signal: ac.signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, + }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, ); @@ -115,6 +122,7 @@ describe("executeThread", () => { const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); + const histTs = 9_000_000; const result = await executeThread( demoWorkflow, "demo-flow", @@ -128,7 +136,21 @@ describe("executeThread", () => { }, ], }, - { isDryRun: false, maxRounds: 5, signal: ac.signal, awaitAfterEachYield: async () => {} }, + { + isDryRun: false, + maxRounds: 5, + signal: ac.signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: "01SRC1111111111111111111", + prefilledDiskSteps: [ + { + role: "planner", + content: "plan-body", + meta: { plan: "do-it", files: ["a.ts"] }, + timestamp: histTs, + }, + ], + }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, ); @@ -140,9 +162,16 @@ describe("executeThread", () => { .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(2); + expect(lines.length).toBe(3); - const role1 = JSON.parse(lines[1] ?? "{}") as Record; + const start = JSON.parse(lines[0] ?? "{}") as Record; + expect(start.forkFrom).toEqual({ threadId: "01SRC1111111111111111111" }); + + const role0 = JSON.parse(lines[1] ?? "{}") as Record; + expect(role0.role).toBe("planner"); + expect(role0.timestamp).toBe(histTs); + + const role1 = JSON.parse(lines[2] ?? "{}") as Record; expect(role1.role).toBe("coder"); expect(role1.content).toBe("code-body"); } finally { @@ -171,6 +200,8 @@ describe("executeThread", () => { maxRounds: 0, signal: ac.signal, awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, diff --git a/packages/workflow/__tests__/fork-thread.test.ts b/packages/workflow/__tests__/fork-thread.test.ts new file mode 100644 index 0000000..25a140b --- /dev/null +++ b/packages/workflow/__tests__/fork-thread.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, test } from "bun:test"; + +import { + buildForkPlan, + parseThreadDataJsonl, + selectForkHistoricalSteps, +} from "../src/fork-thread.js"; + +const sampleDataJsonl = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"isDryRun":false,"maxRounds":5}},"timestamp":100} +{"role":"planner","content":"p","meta":{},"timestamp":101} +{"role":"coder","content":"c","meta":{},"timestamp":102} +{"role":"reviewer","content":"r","meta":{},"timestamp":103} +`; + +describe("fork-thread", () => { + test("parseThreadDataJsonl reads start + role steps", () => { + const r = parseThreadDataJsonl(sampleDataJsonl); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.start.workflowName).toBe("demo"); + expect(r.value.start.hash).toBe("C9NMV6V2TQT81"); + expect(r.value.start.threadId).toBe("01AAA1111111111111111111"); + expect(r.value.start.prompt).toBe("hi"); + expect(r.value.roleSteps.length).toBe(3); + expect(r.value.roleSteps[0]?.role).toBe("planner"); + }); + + test("selectForkHistoricalSteps: --from-role keeps through first matching role", () => { + const parsed = parseThreadDataJsonl(sampleDataJsonl); + expect(parsed.ok).toBe(true); + if (!parsed.ok) { + return; + } + const sel = selectForkHistoricalSteps(parsed.value.roleSteps, "planner"); + expect(sel.ok).toBe(true); + if (!sel.ok) { + return; + } + expect(sel.value.length).toBe(1); + expect(sel.value[0]?.role).toBe("planner"); + }); + + test("selectForkHistoricalSteps: retry last drops final step", () => { + const parsed = parseThreadDataJsonl(sampleDataJsonl); + expect(parsed.ok).toBe(true); + if (!parsed.ok) { + return; + } + const sel = selectForkHistoricalSteps(parsed.value.roleSteps, null); + expect(sel.ok).toBe(true); + if (!sel.ok) { + return; + } + expect(sel.value.map((s) => s.role)).toEqual(["planner", "coder"]); + }); + + test("selectForkHistoricalSteps: unknown role lists available names", () => { + const parsed = parseThreadDataJsonl(sampleDataJsonl); + expect(parsed.ok).toBe(true); + if (!parsed.ok) { + return; + } + const sel = selectForkHistoricalSteps(parsed.value.roleSteps, "nope"); + expect(sel.ok).toBe(false); + if (sel.ok) { + return; + } + expect(sel.error).toContain("planner"); + expect(sel.error).toContain("coder"); + expect(sel.error).toContain("reviewer"); + }); + + test("buildForkPlan composes worker payload", () => { + const r = buildForkPlan(sampleDataJsonl, "planner"); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.sourceThreadId).toBe("01AAA1111111111111111111"); + expect(r.value.workflowName).toBe("demo"); + expect(r.value.historicalSteps.length).toBe(1); + expect(r.value.historicalSteps[0]?.timestamp).toBe(101); + }); +}); diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 74e5c9b..c4200ca 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -8,8 +8,13 @@ import { join } from "node:path"; import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; const bundleSource = `export default async function* (input) { - yield { role: "planner", content: "p", meta: { plan: input.prompt } }; - yield { role: "coder", content: "c", meta: { diff: "y" } }; + const has = (r) => input.steps.some((s) => s.role === r); + if (!has("planner")) { + yield { role: "planner", content: "p", meta: { plan: input.prompt } }; + } + if (!has("coder")) { + yield { role: "coder", content: "c", meta: { diff: "y" } }; + } return { returnCode: 0, summary: "completed: moderator returned END" }; } `; @@ -111,4 +116,67 @@ describe("worker process", () => { await rm(root, { recursive: true, force: true }); } }, 15_000); + + test("run with historical steps + forkSourceThreadId replays then continues", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-worker-fork-")); + try { + const hash = "C9NMV6V2TQT81"; + await mkdir(join(root, "bundles"), { recursive: true }); + const bundlePath = join(root, "bundles", `${hash}.esm.js`); + await writeFile(bundlePath, bundleSource, "utf8"); + + const scriptPath = getWorkerHostScriptPath(); + const child = spawn(process.execPath, [scriptPath, bundlePath, root, hash], { + stdio: ["ignore", "pipe", "inherit"], + }); + + if (child.stdout === null) { + throw new Error("missing stdout"); + } + + const port = await readReadyPort(child); + + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const srcId = "01SRCMMMMMMMMMMMMMMMMMMMM"; + await sendJson(port, { + type: "run", + threadId, + workflowName: "demo-flow", + prompt: "hello", + options: { isDryRun: false, maxRounds: 5 }, + steps: [ + { + role: "planner", + content: "p-old", + meta: { plan: "z" }, + timestamp: 555, + }, + ], + forkSourceThreadId: srcId, + }); + + const exitCode: number = await new Promise((resolve) => { + child.on("exit", (code) => resolve(code ?? 1)); + }); + + expect(exitCode).toBe(0); + + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const text = await readFile(dataPath, "utf8"); + const lines = text + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(3); + const start = JSON.parse(lines[0] ?? "{}") as Record; + expect(start.forkFrom).toEqual({ threadId: srcId }); + const replay = JSON.parse(lines[1] ?? "{}") as Record; + expect(replay.role).toBe("planner"); + expect(replay.timestamp).toBe(555); + const coder = JSON.parse(lines[2] ?? "{}") as Record; + expect(coder.role).toBe("coder"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }, 15_000); }); diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 037da90..e3277bb 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -2,7 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import type { LogFn } from "./logger.js"; -import type { ThreadInput, WorkflowFn, WorkflowResult } from "./types.js"; +import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js"; export type ExecuteThreadIo = { threadId: string; @@ -11,12 +11,27 @@ export type ExecuteThreadIo = { infoJsonlPath: string; }; +/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */ +export type PrefilledDiskStep = { + role: string; + content: string; + meta: Record; + timestamp: number; +}; + export type ExecuteThreadOptions = { isDryRun: boolean; maxRounds: number; signal: AbortSignal; /** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */ awaitAfterEachYield: () => Promise; + /** When non-null, written into the start record so tooling can trace lineage. */ + forkSourceThreadId: string | null; + /** + * Written to `.data.jsonl` immediately after the start record, before the generator runs. + * Must match `input.steps` length and order when present. + */ + prefilledDiskSteps: PrefilledDiskStep[] | null; }; async function appendDataLine(path: string, record: unknown): Promise { @@ -24,6 +39,70 @@ async function appendDataLine(path: string, record: unknown): Promise { await appendFile(path, line, "utf8"); } +async function driveWorkflowGenerator(params: { + fn: WorkflowFn; + input: ThreadInput; + bundleOptions: WorkflowFnOptions; + executeOptions: ExecuteThreadOptions; + dataJsonlPath: string; + threadId: string; + logger: LogFn; +}): Promise { + const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger } = params; + const gen = fn(input, bundleOptions); + let written = 0; + + while (true) { + if (executeOptions.signal.aborted) { + logger("V8JX4NP2", `thread ${threadId} aborted`); + return { returnCode: 130, summary: "thread aborted" }; + } + + if (written >= executeOptions.maxRounds) { + logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`); + return { + returnCode: 0, + summary: `completed: reached maxRounds (${executeOptions.maxRounds})`, + }; + } + + const iterResult = await gen.next(); + + if (iterResult.done) { + logger("F3HN8QKP", `thread ${threadId} generator finished`); + return iterResult.value; + } + + written++; + const step = iterResult.value; + const ts = Date.now(); + await appendDataLine(dataJsonlPath, { + role: step.role, + content: step.content, + meta: step.meta, + timestamp: ts, + }); + + logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`); + + await Promise.race([ + executeOptions.awaitAfterEachYield(), + new Promise((resolve) => { + if (executeOptions.signal.aborted) { + resolve(); + return; + } + executeOptions.signal.addEventListener("abort", () => resolve(), { once: true }); + }), + ]); + + if (executeOptions.signal.aborted) { + logger("V8JX4NP4", `thread ${threadId} aborted`); + return { returnCode: 130, summary: "thread aborted" }; + } + } +} + /** * Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records, * debug lines via `logger` to `.info.jsonl`. @@ -39,8 +118,15 @@ export async function executeThread( await mkdir(dirname(io.dataJsonlPath), { recursive: true }); await mkdir(dirname(io.infoJsonlPath), { recursive: true }); + const prefilled = options.prefilledDiskSteps; + if (prefilled !== null && prefilled.length !== input.steps.length) { + throw new Error( + `prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`, + ); + } + const nowMs = Date.now(); - const startRecord = { + const startRecord: Record = { name: workflowName, hash: io.hash, threadId: io.threadId, @@ -53,11 +139,25 @@ export async function executeThread( }, timestamp: nowMs, }; + if (options.forkSourceThreadId !== null) { + startRecord.forkFrom = { threadId: options.forkSourceThreadId }; + } await appendDataLine(io.dataJsonlPath, startRecord); logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`); + if (prefilled !== null) { + for (const row of prefilled) { + await appendDataLine(io.dataJsonlPath, { + role: row.role, + content: row.content, + meta: row.meta, + timestamp: row.timestamp, + }); + } + } + if (options.maxRounds <= 0) { logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); return { @@ -66,60 +166,18 @@ export async function executeThread( }; } - const gen = fn(input, { + const bundleOptions: WorkflowFnOptions = { isDryRun: options.isDryRun, maxRounds: options.maxRounds, + }; + + return await driveWorkflowGenerator({ + fn, + input, + bundleOptions, + executeOptions: options, + dataJsonlPath: io.dataJsonlPath, + threadId: io.threadId, + logger, }); - - let written = 0; - - while (true) { - if (options.signal.aborted) { - logger("V8JX4NP2", `thread ${io.threadId} aborted`); - return { returnCode: 130, summary: "thread aborted" }; - } - - if (written >= options.maxRounds) { - logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); - return { - returnCode: 0, - summary: `completed: reached maxRounds (${options.maxRounds})`, - }; - } - - const iterResult = await gen.next(); - - if (iterResult.done) { - logger("F3HN8QKP", `thread ${io.threadId} generator finished`); - return iterResult.value; - } - - written++; - const step = iterResult.value; - const ts = Date.now(); - await appendDataLine(io.dataJsonlPath, { - role: step.role, - content: step.content, - meta: step.meta, - timestamp: ts, - }); - - logger("N7BW4YHQ", `thread ${io.threadId} wrote role ${step.role}`); - - await Promise.race([ - options.awaitAfterEachYield(), - new Promise((resolve) => { - if (options.signal.aborted) { - resolve(); - return; - } - options.signal.addEventListener("abort", () => resolve(), { once: true }); - }), - ]); - - if (options.signal.aborted) { - logger("V8JX4NP4", `thread ${io.threadId} aborted`); - return { returnCode: 130, summary: "thread aborted" }; - } - } } diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts new file mode 100644 index 0000000..4447eae --- /dev/null +++ b/packages/workflow/src/fork-thread.ts @@ -0,0 +1,228 @@ +import { err, ok, type Result } from "./result.js"; +import type { RoleOutput } from "./types.js"; + +/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ +export type ForkHistoricalStep = RoleOutput & { timestamp: number }; + +export type ParsedThreadStartRecord = { + workflowName: string; + hash: string; + threadId: string; + prompt: string; + isDryRun: boolean; + maxRounds: number; +}; + +function parseRoleLine( + obj: Record, + lineIndex: number, +): Result { + const role = obj.role; + const content = obj.content; + const meta = obj.meta; + const timestamp = obj.timestamp; + if (typeof role !== "string") { + return err(`invalid role record at line ${lineIndex}: missing role`); + } + if (typeof content !== "string") { + return err(`invalid role record at line ${lineIndex}: missing content`); + } + if (meta === null || typeof meta !== "object") { + return err(`invalid role record at line ${lineIndex}: missing meta`); + } + if (typeof timestamp !== "number") { + return err(`invalid role record at line ${lineIndex}: missing timestamp`); + } + return ok({ + role, + content, + meta: meta as Record, + timestamp, + }); +} + +function parseStartRecordLine(firstLine: string): Result { + let startParsed: unknown; + try { + startParsed = JSON.parse(firstLine) as unknown; + } catch { + return err("invalid JSON on line 1 (start record)"); + } + if (startParsed === null || typeof startParsed !== "object") { + return err("invalid start record shape"); + } + const startRec = startParsed as Record; + const name = startRec.name; + const hash = startRec.hash; + const threadId = startRec.threadId; + const parameters = startRec.parameters; + if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") { + return err("start record missing name, hash, or threadId"); + } + if (parameters === null || typeof parameters !== "object") { + return err("start record missing parameters"); + } + const paramsRec = parameters as Record; + const prompt = paramsRec.prompt; + const options = paramsRec.options; + if (typeof prompt !== "string") { + return err("start record missing parameters.prompt"); + } + if (options === null || typeof options !== "object") { + return err("start record missing parameters.options"); + } + const optRec = options as Record; + const isDryRun = optRec.isDryRun; + const maxRounds = optRec.maxRounds; + if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") { + return err("start record missing parameters.options.isDryRun or maxRounds"); + } + + return ok({ + workflowName: name, + hash, + threadId, + prompt, + isDryRun, + maxRounds, + }); +} + +function parseFollowingRoleLines(lines: string[]): Result { + const roleSteps: ForkHistoricalStep[] = []; + for (let i = 1; i < lines.length; i++) { + const line = lines[i]; + if (line === undefined) { + break; + } + let rec: unknown; + try { + rec = JSON.parse(line) as unknown; + } catch { + return err(`invalid JSON at line ${i + 1}`); + } + if (rec === null || typeof rec !== "object") { + return err(`invalid record at line ${i + 1}`); + } + const parsed = parseRoleLine(rec as Record, i + 1); + if (!parsed.ok) { + return parsed; + } + roleSteps.push(parsed.value); + } + return ok(roleSteps); +} + +/** + * Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs. + */ +export function parseThreadDataJsonl(text: string): Result< + { + start: ParsedThreadStartRecord; + roleSteps: ForkHistoricalStep[]; + }, + string +> { + const lines = text + .split("\n") + .map((l) => l.trim()) + .filter((l) => l !== ""); + if (lines.length === 0) { + return err("thread data is empty"); + } + + const firstLine = lines[0]; + if (firstLine === undefined) { + return err("thread data is empty"); + } + + const start = parseStartRecordLine(firstLine); + if (!start.ok) { + return start; + } + + const roleSteps = parseFollowingRoleLines(lines); + if (!roleSteps.ok) { + return roleSteps; + } + + return ok({ + start: start.value, + roleSteps: roleSteps.value, + }); +} + +function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] { + const seen = new Set(); + const out: string[] = []; + for (const s of roleSteps) { + if (!seen.has(s.role)) { + seen.add(s.role); + out.push(s.role); + } + } + return out; +} + +/** + * Select historical steps for a fork: + * - `fromRole === null`: drop the last step (retry the last role). + * - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive). + */ +export function selectForkHistoricalSteps( + roleSteps: ForkHistoricalStep[], + fromRole: string | null, +): Result { + if (roleSteps.length === 0) { + return err("thread has no completed role steps to fork from"); + } + + if (fromRole === null) { + if (roleSteps.length === 1) { + return ok([]); + } + return ok(roleSteps.slice(0, -1)); + } + + const idx = roleSteps.findIndex((s) => s.role === fromRole); + if (idx < 0) { + const available = orderedUniqueRoles(roleSteps); + return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`); + } + return ok(roleSteps.slice(0, idx + 1)); +} + +export type ForkPlan = { + workflowName: string; + hash: string; + sourceThreadId: string; + prompt: string; + runOptions: { isDryRun: boolean; maxRounds: number }; + historicalSteps: ForkHistoricalStep[]; +}; + +/** + * Read `.data.jsonl` text and compute fork payload for the worker `run` command. + */ +export function buildForkPlan( + dataJsonlText: string, + fromRole: string | null, +): Result { + const parsed = parseThreadDataJsonl(dataJsonlText); + if (!parsed.ok) { + return parsed; + } + const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole); + if (!selected.ok) { + return selected; + } + const { start } = parsed.value; + return ok({ + workflowName: start.workflowName, + hash: start.hash, + sourceThreadId: start.threadId, + prompt: start.prompt, + runOptions: { isDryRun: start.isDryRun, maxRounds: start.maxRounds }, + historicalSteps: selected.value, + }); +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 22edeec..f2375a1 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -11,7 +11,16 @@ export { type ExecuteThreadIo, type ExecuteThreadOptions, executeThread, + type PrefilledDiskStep, } from "./engine.js"; +export { + buildForkPlan, + type ForkHistoricalStep, + type ForkPlan, + type ParsedThreadStartRecord, + parseThreadDataJsonl, + selectForkHistoricalSteps, +} from "./fork-thread.js"; export { hashWorkflowBundleBytes } from "./hash.js"; export { type CreateLoggerOptions, diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 32de5f3..3dc4d6c 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -2,12 +2,12 @@ import { mkdir, unlink, writeFile } from "node:fs/promises"; import { createServer, type Socket } from "node:net"; import { dirname, join } from "node:path"; import { pathToFileURL } from "node:url"; - +import type { PrefilledDiskStep } from "./engine.js"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; import { createLogger } from "./logger.js"; import { err, ok, type Result } from "./result.js"; import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; -import type { WorkflowFn } from "./types.js"; +import type { RoleOutput, WorkflowFn } from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); @@ -17,6 +17,10 @@ type RunCommand = { workflowName: string; prompt: string; options: { isDryRun: boolean; maxRounds: number }; + steps: RoleOutput[]; + /** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */ + stepTimestamps: number[] | null; + forkSourceThreadId: string | null; }; type KillCommand = { @@ -41,6 +45,59 @@ type ThreadHandle = { pauseGate: ThreadPauseGate; }; +function parseRoleOutputRecord(obj: Record): RoleOutput | null { + const role = obj.role; + const content = obj.content; + const meta = obj.meta; + if (typeof role !== "string" || typeof content !== "string") { + return null; + } + if (meta === null || typeof meta !== "object") { + return null; + } + return { role, content, meta: meta as Record }; +} + +function parseRunStepsPayload(rec: Record): { + steps: RoleOutput[]; + stepTimestamps: number[] | null; +} | null { + const raw = rec.steps; + if (raw === undefined || raw === null) { + return { steps: [], stepTimestamps: null }; + } + if (!Array.isArray(raw)) { + return null; + } + const steps: RoleOutput[] = []; + const timestamps: number[] = []; + let anyTimestamp = false; + for (const item of raw) { + if (item === null || typeof item !== "object") { + return null; + } + const o = item as Record; + const out = parseRoleOutputRecord(o); + if (out === null) { + return null; + } + steps.push(out); + const ts = o.timestamp; + if (ts === undefined) { + timestamps.push(0); + } else if (typeof ts === "number") { + timestamps.push(ts); + anyTimestamp = true; + } else { + return null; + } + } + return { + steps, + stepTimestamps: anyTimestamp ? timestamps : null, + }; +} + function parseRunControlPayload(rec: Record): RunCommand | null { const threadId = rec.threadId; const workflowName = rec.workflowName; @@ -62,12 +119,27 @@ function parseRunControlPayload(rec: Record): RunCommand | null if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") { return null; } + const parsedSteps = parseRunStepsPayload(rec); + if (parsedSteps === null) { + return null; + } + const rawFork = rec.forkSourceThreadId; + let forkSourceThreadId: string | null = null; + if (rawFork !== undefined && rawFork !== null) { + if (typeof rawFork !== "string" || rawFork === "") { + return null; + } + forkSourceThreadId = rawFork; + } return { type: "run", threadId, workflowName, prompt, options: { isDryRun, maxRounds }, + steps: parsedSteps.steps, + stepTimestamps: parsedSteps.stepTimestamps, + forkSourceThreadId, }; } @@ -305,14 +377,30 @@ async function main(): Promise { const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); + const baseTs = Date.now(); + let prefilledDiskSteps: PrefilledDiskStep[] | null = null; + if (cmd.steps.length > 0) { + prefilledDiskSteps = cmd.steps.map((step, i) => { + const ts = cmd.stepTimestamps?.[i]; + return { + role: step.role, + content: step.content, + meta: step.meta, + timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i, + }; + }); + } + await executeThread( workflowFn, cmd.workflowName, - { prompt: cmd.prompt, steps: [] }, + { prompt: cmd.prompt, steps: cmd.steps }, { ...cmd.options, signal: ac.signal, awaitAfterEachYield: () => pauseGate.awaitAfterYield(), + forkSourceThreadId: cmd.forkSourceThreadId, + prefilledDiskSteps, }, io, logger,