diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts index cd5b85e..fcdaf40 100644 --- a/packages/cli-workflow/__tests__/commands.test.ts +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -1,11 +1,15 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow"; + import { cmdAdd } from "../src/cmd-add.js"; +import { cmdHistory } from "../src/cmd-history.js"; import { cmdList, formatListLines } from "../src/cmd-list.js"; import { cmdRemove } from "../src/cmd-remove.js"; +import { cmdRollback } from "../src/cmd-rollback.js"; import { cmdShow } from "../src/cmd-show.js"; describe("cli workflow commands", () => { @@ -87,4 +91,159 @@ export default async function* (input) { const r = await cmdAdd(storageRoot, "solve-issue", bundlePath); expect(r.ok).toBe(false); }); + + test("history lists current + prior versions sorted by time descending", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + const v1 = `export default async function* (input) { + yield { role: "a", content: "v1", meta: {} }; + return { returnCode: 0, summary: "v1" }; +} +`; + const v2 = `export default async function* (input) { + yield { role: "a", content: "v2", meta: {} }; + return { returnCode: 0, summary: "v2" }; +} +`; + await writeFile(bundlePath, v1, "utf8"); + const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add1.ok).toBe(true); + await new Promise((r) => setTimeout(r, 15)); + await writeFile(bundlePath, v2, "utf8"); + const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add2.ok).toBe(true); + + const hist = await cmdHistory(storageRoot, "solve-issue"); + expect(hist.ok).toBe(true); + if (!hist.ok) { + return; + } + expect(hist.value.length).toBe(2); + const dates = hist.value.map((line) => { + const parts = line.split("\t"); + return Date.parse(parts[1] ?? ""); + }); + expect(Number.isFinite(dates[0])).toBe(true); + expect(Number.isFinite(dates[1])).toBe(true); + expect(dates[0] >= dates[1]).toBe(true); + expect(hist.value.some((l) => l.endsWith("(current)"))).toBe(true); + }); + + test("rollback swaps registry head with a history hash", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + const v1 = `export default async function* (input) { + yield { role: "a", content: "v1", meta: {} }; + return { returnCode: 0, summary: "v1" }; +} +`; + const v2 = `export default async function* (input) { + yield { role: "a", content: "v2", meta: {} }; + return { returnCode: 0, summary: "v2" }; +} +`; + await writeFile(bundlePath, v1, "utf8"); + const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add1.ok).toBe(true); + if (!add1.ok) { + return; + } + const hash1 = add1.value.hash; + await writeFile(bundlePath, v2, "utf8"); + const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add2.ok).toBe(true); + if (!add2.ok) { + return; + } + const hash2 = add2.value.hash; + + const rb = await cmdRollback(storageRoot, "solve-issue", null); + expect(rb.ok).toBe(true); + + const reg = await readWorkflowRegistry(storageRoot); + expect(reg.ok).toBe(true); + if (!reg.ok) { + return; + } + const entry = getRegisteredWorkflow(reg.value, "solve-issue"); + expect(entry).not.toBeNull(); + if (entry === null) { + return; + } + expect(entry.hash).toBe(hash1); + expect(entry.history.some((h) => h.hash === hash2)).toBe(true); + }); + + test("rollback rejects a hash that is not in history", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile( + bundlePath, + `export default async function* (input) { + yield { role: "a", content: "x", meta: {} }; + return { returnCode: 0, summary: "x" }; +} +`, + "utf8", + ); + const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add1.ok).toBe(true); + await writeFile( + bundlePath, + `export default async function* (input) { + yield { role: "a", content: "y", meta: {} }; + return { returnCode: 0, summary: "y" }; +} +`, + "utf8", + ); + const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add2.ok).toBe(true); + + const bad = await cmdRollback(storageRoot, "solve-issue", "0000000000000"); + expect(bad.ok).toBe(false); + }); + + test("rollback rejects missing bundle file for target hash", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile( + bundlePath, + `export default async function* (input) { + yield { role: "a", content: "x", meta: {} }; + return { returnCode: 0, summary: "x" }; +} +`, + "utf8", + ); + const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add1.ok).toBe(true); + if (!add1.ok) { + return; + } + const hash1 = add1.value.hash; + await writeFile( + bundlePath, + `export default async function* (input) { + yield { role: "a", content: "y", meta: {} }; + return { returnCode: 0, summary: "y" }; +} +`, + "utf8", + ); + const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(add2.ok).toBe(true); + if (!add2.ok) { + return; + } + + await unlink(join(storageRoot, "bundles", `${hash1}.esm.js`)); + + const rb = await cmdRollback(storageRoot, "solve-issue", hash1); + expect(rb.ok).toBe(false); + }); }); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 664fc41..ad9be4f 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -7,7 +7,9 @@ import { fileURLToPath } from "node:url"; import { cmdAdd } from "../src/cmd-add.js"; import { cmdKill } from "../src/cmd-kill.js"; +import { cmdPause } from "../src/cmd-pause.js"; import { cmdPs } from "../src/cmd-ps.js"; +import { cmdResume } from "../src/cmd-resume.js"; import { cmdRun } from "../src/cmd-run.js"; import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; import { cmdThreads } from "../src/cmd-threads.js"; @@ -38,6 +40,55 @@ const abortablePlannerBundleSource = `export default async function* (input) { } `; +const pauseResumeBundleSource = `export default async function* (input) { + yield { role: "first", content: "f", meta: {} }; + await new Promise((r) => setTimeout(r, 1500)); + yield { role: "second", content: "s", meta: {} }; + return { returnCode: 0, summary: "done" }; +} +`; + +const delayedFirstYieldBundleSource = `export default async function* (input) { + await new Promise((r) => setTimeout(r, 900)); + yield { role: "only", content: "x", meta: {} }; + return { returnCode: 0, summary: "done" }; +} +`; + +async function countDataJsonlLines(dataPath: string): Promise { + try { + const text = await readFile(dataPath, "utf8"); + return text + .trim() + .split("\n") + .filter((l) => l !== "").length; + } catch { + return 0; + } +} + +async function waitUntilMinDataLines( + dataPath: string, + minLines: number, + maxAttempts: number, +): Promise { + for (let attempt = 0; attempt < maxAttempts; attempt++) { + if ((await countDataJsonlLines(dataPath)) >= minLines) { + return; + } + await new Promise((r) => setTimeout(r, 25)); + } +} + +async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise { + for (let attempt = 0; attempt < maxAttempts; attempt++) { + if (!(await pathExists(runningPath))) { + return; + } + await new Promise((r) => setTimeout(r, 25)); + } +} + describe("cli thread commands", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -186,4 +237,99 @@ describe("cli thread commands", () => { const runningPath = join(dirname(dataPath), `${threadId}.running`); expect(await pathExists(runningPath)).toBe(false); }); + + test("pause stops between yields and resume completes thread", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, pauseResumeBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + + await waitUntilMinDataLines(dataPath, 2, 80); + expect(await countDataJsonlLines(dataPath)).toBe(2); + + const paused = await cmdPause(storageRoot, threadId); + expect(paused.ok).toBe(true); + + await new Promise((r) => setTimeout(r, 400)); + expect(await countDataJsonlLines(dataPath)).toBe(2); + + const resumed = await cmdResume(storageRoot, threadId); + expect(resumed.ok).toBe(true); + + await waitUntilMinDataLines(dataPath, 3, 120); + expect(await countDataJsonlLines(dataPath)).toBe(3); + + const runningPath = join(dirname(dataPath), `${threadId}.running`); + await waitUntilRunningFileAbsent(runningPath, 100); + expect(await pathExists(runningPath)).toBe(false); + }); + + test("pause on completed thread errors", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, fastBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + const runningPath = join(dirname(dataPath), `${threadId}.running`); + + await waitUntilRunningFileAbsent(runningPath, 100); + expect(await pathExists(runningPath)).toBe(false); + + const paused = await cmdPause(storageRoot, threadId); + expect(paused.ok).toBe(false); + }); + + test("resume while thread is running but not paused errors", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, delayedFirstYieldBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + await new Promise((r) => setTimeout(r, 40)); + + const resumed = await cmdResume(storageRoot, threadId); + expect(resumed.ok).toBe(false); + }); }); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 5671d57..1b41884 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -1,9 +1,13 @@ import { printCliError, printCliLine } from "./cli-output.js"; import { cmdAdd, formatAddSuccess } from "./cmd-add.js"; +import { cmdHistory } from "./cmd-history.js"; import { cmdKill } from "./cmd-kill.js"; import { cmdList, formatListLines } from "./cmd-list.js"; +import { cmdPause } from "./cmd-pause.js"; import { cmdPs } from "./cmd-ps.js"; import { cmdRemove } from "./cmd-remove.js"; +import { cmdResume } from "./cmd-resume.js"; +import { cmdRollback } from "./cmd-rollback.js"; import { cmdRun } from "./cmd-run.js"; import { cmdShow, formatShowYaml } from "./cmd-show.js"; import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js"; @@ -20,6 +24,10 @@ function usage(): string { " uncaged-workflow run [--prompt ] [--dry-run] [--max-rounds N]", " uncaged-workflow ps", " uncaged-workflow kill ", + " uncaged-workflow history ", + " uncaged-workflow rollback [hash]", + " uncaged-workflow pause ", + " uncaged-workflow resume ", " uncaged-workflow threads [name]", " uncaged-workflow thread ", " uncaged-workflow thread rm ", @@ -137,6 +145,69 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise { + const name = argv[0]; + if (name === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: history requires `); + return 1; + } + const result = await cmdHistory(storageRoot, name); + if (!result.ok) { + printCliError(result.error); + return 1; + } + for (const line of result.value) { + printCliLine(line); + } + return 0; +} + +async function dispatchRollback(storageRoot: string, argv: string[]): Promise { + const name = argv[0]; + if (name === undefined || argv.length > 2) { + printCliError(`${usage()}\n\nerror: rollback requires [hash]`); + return 1; + } + const hashArg = argv[1]; + const result = await cmdRollback(storageRoot, name, hashArg === undefined ? null : hashArg); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`rolled back workflow "${name}"`); + return 0; +} + +async function dispatchPause(storageRoot: string, argv: string[]): Promise { + const threadId = argv[0]; + if (threadId === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: pause requires `); + return 1; + } + const result = await cmdPause(storageRoot, threadId); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`pause sent for thread ${threadId}`); + return 0; +} + +async function dispatchResume(storageRoot: string, argv: string[]): Promise { + const threadId = argv[0]; + if (threadId === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: resume requires `); + return 1; + } + const result = await cmdResume(storageRoot, threadId); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`resume sent for thread ${threadId}`); + return 0; +} + async function dispatchThreads(storageRoot: string, argv: string[]): Promise { const result = await cmdThreads(storageRoot, argv); if (!result.ok) { @@ -179,6 +250,32 @@ async function dispatchThreadRm(storageRoot: string, argv: string[]): Promise { + const sub = rest[0]; + if (sub === "rm") { + return dispatchThreadRm(storageRoot, rest.slice(1)); + } + return dispatchThread(storageRoot, rest); +} + +type DispatchFn = (storageRoot: string, argv: string[]) => Promise; + +const COMMAND_TABLE: Record = { + add: dispatchAdd, + list: dispatchList, + show: dispatchShow, + remove: dispatchRemove, + run: dispatchRun, + ps: dispatchPs, + kill: dispatchKill, + history: dispatchHistory, + rollback: dispatchRollback, + pause: dispatchPause, + resume: dispatchResume, + threads: dispatchThreads, + thread: dispatchThreadBranch, +}; + export async function runCli(storageRoot: string, argv: string[]): Promise { if (argv.length === 0) { printCliError(usage()); @@ -190,39 +287,10 @@ export async function runCli(storageRoot: string, argv: string[]): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const entry = getRegisteredWorkflow(reg.value, name); + if (entry === null) { + return err(`workflow not registered: ${name}`); + } + + type Row = { hash: string; timestamp: number; isCurrent: boolean }; + const rows: Row[] = [ + { hash: entry.hash, timestamp: entry.timestamp, isCurrent: true }, + ...entry.history.map((h) => ({ hash: h.hash, timestamp: h.timestamp, isCurrent: false })), + ]; + rows.sort((a, b) => b.timestamp - a.timestamp); + + const lines = rows.map((r) => { + const date = new Date(r.timestamp).toISOString(); + const suffix = r.isCurrent ? "\t(current)" : ""; + return `${r.hash}\t${date}${suffix}`; + }); + return ok(lines); +} diff --git a/packages/cli-workflow/src/cmd-kill.ts b/packages/cli-workflow/src/cmd-kill.ts index 75f23b9..2bad4eb 100644 --- a/packages/cli-workflow/src/cmd-kill.ts +++ b/packages/cli-workflow/src/cmd-kill.ts @@ -35,5 +35,9 @@ export async function cmdKill( return err(`invalid worker control file: ${ctlPath}`); } - return await sendWorkerTcpCommand(ctl.port, { type: "kill", threadId }); + return await sendWorkerTcpCommand( + ctl.port, + { type: "kill", threadId }, + { awaitResponseLine: true }, + ); } diff --git a/packages/cli-workflow/src/cmd-pause.ts b/packages/cli-workflow/src/cmd-pause.ts new file mode 100644 index 0000000..a54f8aa --- /dev/null +++ b/packages/cli-workflow/src/cmd-pause.ts @@ -0,0 +1,43 @@ +import { join } from "node:path"; + +import { err, type Result } from "@uncaged/workflow"; + +import { readTextFileIfExists } from "./fs-utils.js"; +import { + resolveRunningHashForThread, + sendWorkerTcpCommand, + type WorkerCtl, +} from "./worker-spawn.js"; + +export async function cmdPause( + storageRoot: string, + threadId: string, +): Promise> { + const hashResult = await resolveRunningHashForThread(storageRoot, threadId); + if (!hashResult.ok) { + return hashResult; + } + + const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); + const ctlText = await readTextFileIfExists(ctlPath); + if (ctlText === null) { + return err(`worker control file missing for bundle hash ${hashResult.value}`); + } + + let ctl: WorkerCtl; + try { + ctl = JSON.parse(ctlText) as WorkerCtl; + } catch { + return err(`corrupt worker control file: ${ctlPath}`); + } + + if (typeof ctl.port !== "number" || ctl.port <= 0) { + return err(`invalid worker control file: ${ctlPath}`); + } + + return await sendWorkerTcpCommand( + ctl.port, + { type: "pause", threadId }, + { awaitResponseLine: true }, + ); +} diff --git a/packages/cli-workflow/src/cmd-resume.ts b/packages/cli-workflow/src/cmd-resume.ts new file mode 100644 index 0000000..c8264dc --- /dev/null +++ b/packages/cli-workflow/src/cmd-resume.ts @@ -0,0 +1,43 @@ +import { join } from "node:path"; + +import { err, type Result } from "@uncaged/workflow"; + +import { readTextFileIfExists } from "./fs-utils.js"; +import { + resolveRunningHashForThread, + sendWorkerTcpCommand, + type WorkerCtl, +} from "./worker-spawn.js"; + +export async function cmdResume( + storageRoot: string, + threadId: string, +): Promise> { + const hashResult = await resolveRunningHashForThread(storageRoot, threadId); + if (!hashResult.ok) { + return hashResult; + } + + const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); + const ctlText = await readTextFileIfExists(ctlPath); + if (ctlText === null) { + return err(`worker control file missing for bundle hash ${hashResult.value}`); + } + + let ctl: WorkerCtl; + try { + ctl = JSON.parse(ctlText) as WorkerCtl; + } catch { + return err(`corrupt worker control file: ${ctlPath}`); + } + + if (typeof ctl.port !== "number" || ctl.port <= 0) { + return err(`invalid worker control file: ${ctlPath}`); + } + + return await sendWorkerTcpCommand( + ctl.port, + { type: "resume", threadId }, + { awaitResponseLine: true }, + ); +} diff --git a/packages/cli-workflow/src/cmd-rollback.ts b/packages/cli-workflow/src/cmd-rollback.ts new file mode 100644 index 0000000..ba034d8 --- /dev/null +++ b/packages/cli-workflow/src/cmd-rollback.ts @@ -0,0 +1,55 @@ +import { join } from "node:path"; + +import { + err, + getRegisteredWorkflow, + ok, + type Result, + readWorkflowRegistry, + rollbackWorkflowToHistoryHash, + writeWorkflowRegistry, +} from "@uncaged/workflow"; + +import { pathExists } from "./fs-utils.js"; +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdRollback( + storageRoot: string, + name: string, + hash: string | null, +): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const entry = getRegisteredWorkflow(reg.value, name); + if (entry === null) { + return err(`workflow not registered: ${name}`); + } + + const rolled = rollbackWorkflowToHistoryHash(entry, hash); + if (!rolled.ok) { + return err(rolled.error.message); + } + + const bundlePath = join(storageRoot, "bundles", `${rolled.value.hash}.esm.js`); + if (!(await pathExists(bundlePath))) { + return err(`bundle file not found for hash ${rolled.value.hash}`); + } + + const nextRegistry = { + workflows: { ...reg.value.workflows, [name]: rolled.value }, + }; + const written = await writeWorkflowRegistry(storageRoot, nextRegistry); + if (!written.ok) { + return err(written.error.message); + } + + return ok(undefined); +} diff --git a/packages/cli-workflow/src/cmd-run.ts b/packages/cli-workflow/src/cmd-run.ts index 7e25924..82d985f 100644 --- a/packages/cli-workflow/src/cmd-run.ts +++ b/packages/cli-workflow/src/cmd-run.ts @@ -40,13 +40,17 @@ export async function cmdRun( } const threadId = generateUlid(Date.now()); - const sent = await sendWorkerTcpCommand(worker.value.port, { - type: "run", - threadId, - workflowName: name, - prompt, - options: { isDryRun, maxRounds }, - }); + const sent = await sendWorkerTcpCommand( + worker.value.port, + { + type: "run", + threadId, + workflowName: name, + prompt, + options: { isDryRun, maxRounds }, + }, + { awaitResponseLine: false }, + ); if (!sent.ok) { return sent; } diff --git a/packages/cli-workflow/src/worker-spawn.ts b/packages/cli-workflow/src/worker-spawn.ts index 677f85d..82ae41c 100644 --- a/packages/cli-workflow/src/worker-spawn.ts +++ b/packages/cli-workflow/src/worker-spawn.ts @@ -143,30 +143,96 @@ export async function ensureWorkerForHash( return ok({ port: spawned.value.port }); } +export type SendWorkerTcpOptions = { + awaitResponseLine: boolean; +}; + +function parseWorkerControlResponseLine(line: string): Result { + let parsed: unknown; + try { + parsed = JSON.parse(line.trim()) as unknown; + } catch { + return err("invalid JSON in worker response"); + } + if (parsed === null || typeof parsed !== "object") { + return err("invalid worker response shape"); + } + const rec = parsed as Record; + if (rec.ok === true) { + return ok(undefined); + } + if (rec.ok === false) { + const message = rec.error; + if (typeof message === "string") { + return err(message); + } + return err("worker error response missing error string"); + } + return err("invalid worker response: missing ok field"); +} + export async function sendWorkerTcpCommand( port: number, payload: unknown, + options: SendWorkerTcpOptions = { awaitResponseLine: false }, ): Promise> { return await new Promise((resolve) => { let settled = false; + let buf = ""; const socket = createConnection({ host: "127.0.0.1", port }, () => { socket.write(`${JSON.stringify(payload)}\n`); - socket.end(); + if (!options.awaitResponseLine) { + socket.end(); + } }); + + function finish(result: Result): void { + if (settled) { + return; + } + settled = true; + if (options.awaitResponseLine && socket.writable) { + socket.end(); + } + resolve(result); + } + + function tryFinishFromBuffer(): void { + if (!options.awaitResponseLine) { + return; + } + const nl = buf.indexOf("\n"); + if (nl < 0) { + return; + } + finish(parseWorkerControlResponseLine(buf.slice(0, nl))); + } + + socket.on("data", (chunk: Buffer | string) => { + if (!options.awaitResponseLine) { + return; + } + buf += typeof chunk === "string" ? chunk : chunk.toString("utf8"); + tryFinishFromBuffer(); + }); + socket.on("error", (e) => { if (settled) { return; } - settled = true; const message = e instanceof Error ? e.message : String(e); - resolve(err(`failed to send worker command: ${message}`)); + finish(err(`failed to send worker command: ${message}`)); }); + socket.on("close", () => { - if (settled) { + if (options.awaitResponseLine) { + tryFinishFromBuffer(); + if (!settled) { + finish(err("worker closed without control response")); + } return; } - settled = true; - resolve(ok(undefined)); + finish(ok(undefined)); }); }); } diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index af2da5d..ce0e7b6 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -52,7 +52,7 @@ describe("executeThread", () => { demoWorkflow, "demo-flow", { prompt: "Fix the login redirect bug in #3", steps: [] }, - { isDryRun: false, maxRounds: 5, signal: ac.signal }, + { isDryRun: false, maxRounds: 5, signal: ac.signal, awaitAfterEachYield: async () => {} }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, ); @@ -128,7 +128,7 @@ describe("executeThread", () => { }, ], }, - { isDryRun: false, maxRounds: 5, signal: ac.signal }, + { isDryRun: false, maxRounds: 5, signal: ac.signal, awaitAfterEachYield: async () => {} }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, ); @@ -166,7 +166,12 @@ describe("executeThread", () => { demoWorkflow, "demo-flow", { prompt: "hello", steps: [] }, - { isDryRun: false, maxRounds: 0, signal: ac.signal }, + { + isDryRun: false, + maxRounds: 0, + signal: ac.signal, + awaitAfterEachYield: async () => {}, + }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, logger, ); diff --git a/packages/workflow/__tests__/registry.test.ts b/packages/workflow/__tests__/registry.test.ts index c5612d8..5806829 100644 --- a/packages/workflow/__tests__/registry.test.ts +++ b/packages/workflow/__tests__/registry.test.ts @@ -6,6 +6,7 @@ import { join } from "node:path"; import { readWorkflowRegistry, registerWorkflowVersion, + rollbackWorkflowToHistoryHash, unregisterWorkflow, writeWorkflowRegistry, } from "../src/registry.js"; @@ -66,6 +67,38 @@ describe("workflow registry", () => { await rm(dir, { recursive: true, force: true }); }); + test("rollbackWorkflowToHistoryHash swaps head with a prior version", () => { + let reg = registerWorkflowVersion({ workflows: {} }, "solve-issue", "H1", 100); + reg = registerWorkflowVersion(reg, "solve-issue", "H2", 200); + reg = registerWorkflowVersion(reg, "solve-issue", "H3", 300); + const entry = reg.workflows["solve-issue"]; + expect(entry).toBeDefined(); + if (entry === undefined) { + return; + } + expect(entry.hash).toBe("H3"); + expect(entry.history.map((h) => h.hash)).toEqual(["H2", "H1"]); + + const toH2 = rollbackWorkflowToHistoryHash(entry, null); + expect(toH2.ok).toBe(true); + if (!toH2.ok) { + return; + } + expect(toH2.value.hash).toBe("H2"); + expect(toH2.value.history.map((h) => h.hash)).toEqual(["H3", "H1"]); + + const toH1 = rollbackWorkflowToHistoryHash(toH2.value, "H1"); + expect(toH1.ok).toBe(true); + if (!toH1.ok) { + return; + } + expect(toH1.value.hash).toBe("H1"); + expect(toH1.value.history.map((h) => h.hash)).toEqual(["H2", "H3"]); + + const bad = rollbackWorkflowToHistoryHash(toH1.value, "NONE"); + expect(bad.ok).toBe(false); + }); + test("parse errors on invalid shape", async () => { const dir = join(tmpdir(), `wf-reg3-${process.pid}-${Date.now()}`); await mkdir(dir, { recursive: true }); diff --git a/packages/workflow/__tests__/thread-pause-gate.test.ts b/packages/workflow/__tests__/thread-pause-gate.test.ts new file mode 100644 index 0000000..b703fde --- /dev/null +++ b/packages/workflow/__tests__/thread-pause-gate.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, test } from "bun:test"; + +import { createThreadPauseGate } from "../src/thread-pause-gate.js"; + +describe("createThreadPauseGate", () => { + test("pause blocks awaitAfterYield until resume", async () => { + const gate = createThreadPauseGate(); + gate.pause(); + + let progressed = false; + const wait = (async () => { + await gate.awaitAfterYield(); + progressed = true; + })(); + + await new Promise((r) => setTimeout(r, 30)); + expect(progressed).toBe(false); + + gate.resume(); + await wait; + expect(progressed).toBe(true); + }); + + test("duplicate pause and resume are rejected", () => { + const gate = createThreadPauseGate(); + expect(gate.pause().ok).toBe(true); + expect(gate.pause().ok).toBe(false); + expect(gate.resume().ok).toBe(true); + expect(gate.resume().ok).toBe(false); + }); +}); diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 0d145f0..037da90 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -15,6 +15,8 @@ export type ExecuteThreadOptions = { isDryRun: boolean; maxRounds: number; signal: AbortSignal; + /** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */ + awaitAfterEachYield: () => Promise; }; async function appendDataLine(path: string, record: unknown): Promise { @@ -104,6 +106,17 @@ export async function executeThread( logger("N7BW4YHQ", `thread ${io.threadId} wrote role ${step.role}`); + await Promise.race([ + options.awaitAfterEachYield(), + new Promise((resolve) => { + if (options.signal.aborted) { + resolve(); + return; + } + options.signal.addEventListener("abort", () => resolve(), { once: true }); + }), + ]); + if (options.signal.aborted) { logger("V8JX4NP4", `thread ${io.threadId} aborted`); return { returnCode: 130, summary: "thread aborted" }; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 79fc6b5..22edeec 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -25,6 +25,7 @@ export { parseWorkflowRegistryYaml, readWorkflowRegistry, registerWorkflowVersion, + rollbackWorkflowToHistoryHash, stringifyWorkflowRegistryYaml, unregisterWorkflow, type WorkflowHistoryEntry, @@ -35,6 +36,7 @@ export { } from "./registry.js"; export { err, ok, type Result } from "./result.js"; export { getDefaultWorkflowStorageRoot } from "./storage-root.js"; +export { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; export { type AgentFn, END, diff --git a/packages/workflow/src/registry.ts b/packages/workflow/src/registry.ts index 5516bc8..9f8267b 100644 --- a/packages/workflow/src/registry.ts +++ b/packages/workflow/src/registry.ts @@ -107,6 +107,41 @@ export function registerWorkflowVersion( }; } +/** + * Roll back `entry` to a hash listed in `entry.history`. + * When `targetHash` is null, uses the most recent history entry (`history[0]`). + * Current head is prepended to history; the selected entry becomes the new head. + */ +export function rollbackWorkflowToHistoryHash( + entry: WorkflowRegistryEntry, + targetHash: string | null, +): Result { + const resolved = + targetHash !== null && targetHash !== "" + ? targetHash + : entry.history[0] !== undefined + ? entry.history[0].hash + : null; + if (resolved === null) { + return err(new Error("no history entry to rollback to")); + } + const idx = entry.history.findIndex((h) => h.hash === resolved); + if (idx < 0) { + return err(new Error(`hash not found in history: ${resolved}`)); + } + const selected = entry.history[idx]; + const newHistory: WorkflowHistoryEntry[] = [ + { hash: entry.hash, timestamp: entry.timestamp }, + ...entry.history.slice(0, idx), + ...entry.history.slice(idx + 1), + ]; + return ok({ + hash: selected.hash, + timestamp: selected.timestamp, + history: newHistory, + }); +} + export function unregisterWorkflow( registry: WorkflowRegistryFile, name: string, diff --git a/packages/workflow/src/thread-pause-gate.ts b/packages/workflow/src/thread-pause-gate.ts new file mode 100644 index 0000000..e2640c9 --- /dev/null +++ b/packages/workflow/src/thread-pause-gate.ts @@ -0,0 +1,54 @@ +import { err, ok, type Result } from "./result.js"; + +export type ThreadPauseGate = { + awaitAfterYield: () => Promise; + pause: () => Result; + resume: () => Result; + isPaused: () => boolean; +}; + +/** + * Pause/resume gate for workflow threads: after each generator yield the engine awaits + * {@link ThreadPauseGate.awaitAfterYield}. Calling {@link ThreadPauseGate.pause} makes the next + * await block until {@link ThreadPauseGate.resume}. + */ +export function createThreadPauseGate(): ThreadPauseGate { + let resumeResolver: (() => void) | null = null; + let chain: Promise = Promise.resolve(); + let paused = false; + + function awaitAfterYield(): Promise { + return chain; + } + + function pause(): Result { + if (paused) { + return err("thread already paused"); + } + paused = true; + chain = new Promise((resolve) => { + resumeResolver = resolve; + }); + return ok(undefined); + } + + function resume(): Result { + if (!paused) { + return err("thread not paused"); + } + paused = false; + const resolveFn = resumeResolver; + resumeResolver = null; + if (resolveFn !== null) { + resolveFn(); + } + chain = Promise.resolve(); + return ok(undefined); + } + + function isPaused(): boolean { + return paused; + } + + return { awaitAfterYield, pause, resume, isPaused }; +} diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index a0b7520..32de5f3 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -5,6 +5,8 @@ import { pathToFileURL } from "node:url"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; import { createLogger } from "./logger.js"; +import { err, ok, type Result } from "./result.js"; +import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; import type { WorkflowFn } from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); @@ -22,49 +24,84 @@ type KillCommand = { threadId: string; }; -type ControlCommand = RunCommand | KillCommand; +type PauseCommand = { + type: "pause"; + threadId: string; +}; + +type ResumeCommand = { + type: "resume"; + threadId: string; +}; + +type ControlCommand = RunCommand | KillCommand | PauseCommand | ResumeCommand; + +type ThreadHandle = { + abortController: AbortController; + pauseGate: ThreadPauseGate; +}; + +function parseRunControlPayload(rec: Record): RunCommand | null { + const threadId = rec.threadId; + const workflowName = rec.workflowName; + const prompt = rec.prompt; + const options = rec.options; + if ( + typeof threadId !== "string" || + typeof workflowName !== "string" || + typeof prompt !== "string" + ) { + return null; + } + if (options === null || typeof options !== "object") { + return null; + } + const optRec = options as Record; + const isDryRun = optRec.isDryRun; + const maxRounds = optRec.maxRounds; + if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") { + return null; + } + return { + type: "run", + threadId, + workflowName, + prompt, + options: { isDryRun, maxRounds }, + }; +} + +function parseLifecycleThreadPayload( + rec: Record, +): KillCommand | PauseCommand | ResumeCommand | null { + const type = rec.type; + const threadId = rec.threadId; + if (typeof threadId !== "string") { + return null; + } + if (type === "kill") { + return { type: "kill", threadId }; + } + if (type === "pause") { + return { type: "pause", threadId }; + } + if (type === "resume") { + return { type: "resume", threadId }; + } + return null; +} function parseControlPayload(payload: unknown): ControlCommand | null { if (payload === null || typeof payload !== "object") { return null; } const rec = payload as Record; - const type = rec.type; - if (type === "kill") { - const threadId = rec.threadId; - if (typeof threadId !== "string") { - return null; - } - return { type: "kill", threadId }; + const lifecycle = parseLifecycleThreadPayload(rec); + if (lifecycle !== null) { + return lifecycle; } - if (type === "run") { - const threadId = rec.threadId; - const workflowName = rec.workflowName; - const prompt = rec.prompt; - const options = rec.options; - if ( - typeof threadId !== "string" || - typeof workflowName !== "string" || - typeof prompt !== "string" - ) { - return null; - } - if (options === null || typeof options !== "object") { - return null; - } - const optRec = options as Record; - const isDryRun = optRec.isDryRun; - const maxRounds = optRec.maxRounds; - if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") { - return null; - } - return { - type: "run", - threadId, - workflowName, - prompt, - options: { isDryRun, maxRounds }, - }; + if (rec.type === "run") { + return parseRunControlPayload(rec); } return null; } @@ -88,6 +125,53 @@ function isWorkflowFnLike(value: unknown): value is WorkflowFn { return typeof value === "function"; } +function writeTcpResponse(socket: Socket | null, result: Result): void { + if (socket === null) { + return; + } + const body = result.ok ? { ok: true as const } : { ok: false as const, error: result.error }; + socket.end(`${JSON.stringify(body)}\n`); +} + +function dispatchThreadLifecycleCommand( + threads: Map, + socket: Socket | null, + cmd: KillCommand | PauseCommand | ResumeCommand, +): void { + const handle = threads.get(cmd.threadId); + if (handle === undefined) { + writeTcpResponse(socket, err(`thread not found: ${cmd.threadId}`)); + return; + } + switch (cmd.type) { + case "kill": + handle.abortController.abort(); + bootLog("P9XK2WNQ", `kill requested for thread ${cmd.threadId}`); + writeTcpResponse(socket, ok(undefined)); + return; + case "pause": { + const paused = handle.pauseGate.pause(); + if (!paused.ok) { + writeTcpResponse(socket, paused); + return; + } + bootLog("K7WQ2NXP", `pause requested for thread ${cmd.threadId}`); + writeTcpResponse(socket, ok(undefined)); + return; + } + case "resume": { + const resumed = handle.pauseGate.resume(); + if (!resumed.ok) { + writeTcpResponse(socket, resumed); + return; + } + bootLog("M4YT8HKR", `resume requested for thread ${cmd.threadId}`); + writeTcpResponse(socket, ok(undefined)); + return; + } + } +} + async function readLineFromSocket(socket: Socket): Promise { return await new Promise((resolve) => { let buf = ""; @@ -150,7 +234,7 @@ async function main(): Promise { } const workflowFn = defaultExport; - const controllers = new Map(); + const threads = new Map(); let activeThreads = 0; let shutdownTimer: ReturnType | null = null; @@ -185,13 +269,8 @@ async function main(): Promise { } async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise { - if (cmd.type === "kill") { - const ac = controllers.get(cmd.threadId); - if (ac !== undefined) { - ac.abort(); - bootLog("P9XK2WNQ", `kill requested for thread ${cmd.threadId}`); - } - socket?.end(); + if (cmd.type !== "run") { + dispatchThreadLifecycleCommand(threads, socket, cmd); return; } @@ -209,14 +288,15 @@ async function main(): Promise { infoJsonlPath, }; - const existing = controllers.get(threadId); + const existing = threads.get(threadId); if (existing !== undefined) { - existing.abort(); - controllers.delete(threadId); + existing.abortController.abort(); + threads.delete(threadId); } + const pauseGate = createThreadPauseGate(); const ac = new AbortController(); - controllers.set(threadId, ac); + threads.set(threadId, { abortController: ac, pauseGate }); try { await mkdir(dirname(runningPath), { recursive: true }); @@ -229,7 +309,11 @@ async function main(): Promise { workflowFn, cmd.workflowName, { prompt: cmd.prompt, steps: [] }, - { ...cmd.options, signal: ac.signal }, + { + ...cmd.options, + signal: ac.signal, + awaitAfterEachYield: () => pauseGate.awaitAfterYield(), + }, io, logger, ); @@ -237,7 +321,7 @@ async function main(): Promise { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); } finally { - controllers.delete(threadId); + threads.delete(threadId); await unlink(runningPath).catch(() => {}); bumpDone(); socket?.end(); @@ -270,8 +354,8 @@ async function main(): Promise { })(); }); - server.on("error", (err) => { - bootLog("W8YK4NPX", `worker server error: ${err.message}`); + server.on("error", (errObj) => { + bootLog("W8YK4NPX", `worker server error: ${errObj.message}`); process.exit(1); });