import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { spawnSync } from "node:child_process"; import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import { cmdAdd } from "../src/cmd-add.js"; import { cmdKill } from "../src/cmd-kill.js"; import { cmdPause } from "../src/cmd-pause.js"; import { cmdPs } from "../src/cmd-ps.js"; import { cmdResume } from "../src/cmd-resume.js"; import { cmdRun } from "../src/cmd-run.js"; import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; import { cmdThreads } from "../src/cmd-threads.js"; import { pathExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; const threadFixtureDescriptor = `export const descriptor = { description: "thread-cli", roles: { planner: { description: "planner", schema: {} }, coder: { description: "coder", schema: {} }, first: { description: "first", schema: {} }, second: { description: "second", schema: {} }, only: { description: "only", schema: {} }, noop: { description: "noop", schema: {} }, }, }; `; const fastBundleSource = `${threadFixtureDescriptor} export const run = async function* (input) { yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; }; `; const slowPlannerBundleSource = `${threadFixtureDescriptor} export const run = async function* (input) { await new Promise((r) => setTimeout(r, 400)); yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; }; `; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); const abortablePlannerBundleSource = `${threadFixtureDescriptor} export const run = async function* (input) { await new Promise((r) => setTimeout(r, 600)); yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; yield { role: "coder", content: "code", meta: { diff: "y" } }; return { returnCode: 0, summary: "done" }; }; `; const pauseResumeBundleSource = `${threadFixtureDescriptor} export const run = async function* (input) { yield { role: "first", content: "f", meta: {} }; await new Promise((r) => setTimeout(r, 1500)); yield { role: "second", content: "s", meta: {} }; return { returnCode: 0, summary: "done" }; }; `; const delayedFirstYieldBundleSource = `${threadFixtureDescriptor} export const run = async function* (input) { await new Promise((r) => setTimeout(r, 900)); yield { role: "only", content: "x", meta: {} }; return { returnCode: 0, summary: "done" }; }; `; async function countDataJsonlLines(dataPath: string): Promise { try { const text = await readFile(dataPath, "utf8"); return text .trim() .split("\n") .filter((l) => l !== "").length; } catch { return 0; } } async function waitUntilMinDataLines( dataPath: string, minLines: number, maxAttempts: number, ): Promise { for (let attempt = 0; attempt < maxAttempts; attempt++) { if ((await countDataJsonlLines(dataPath)) >= minLines) { return; } await new Promise((r) => setTimeout(r, 25)); } } async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise { for (let attempt = 0; attempt < maxAttempts; attempt++) { if (!(await pathExists(runningPath))) { return; } await new Promise((r) => setTimeout(r, 25)); } } describe("cli thread commands", () => { let prevEnv: string | undefined; let storageRoot: string; beforeEach(async () => { prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-thread-")); process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; }); afterEach(async () => { if (prevEnv === undefined) { delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; } else { process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; } await rm(storageRoot, { recursive: true, force: true }); }); test("run / threads / thread / thread rm", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, fastBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; let threads = await cmdThreads(storageRoot, []); for ( let attempt = 0; attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId)); attempt++ ) { await new Promise((r) => setTimeout(r, 20)); threads = await cmdThreads(storageRoot, []); } expect(threads.ok).toBe(true); if (!threads.ok) { return; } expect(threads.value.some((l) => l.includes(threadId))).toBe(true); const shown = await cmdThreadShow(storageRoot, threadId); expect(shown.ok).toBe(true); if (!shown.ok) { return; } expect(shown.value.includes('"threadId"')).toBe(true); const removed = await cmdThreadRemove(storageRoot, threadId); expect(removed.ok).toBe(true); const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); expect(await pathExists(dataPath)).toBe(false); }); test("cli entrypoint dispatches threads / ps (spawn)", () => { const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], { env, encoding: "utf8", }); expect(threads.status).toBe(0); const ps = spawnSync(process.execPath, [cliEntryPath, "ps"], { env, encoding: "utf8" }); expect(ps.status).toBe(0); }); test("ps lists running threads while planner role is in-flight", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, slowPlannerBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; await new Promise((r) => setTimeout(r, 50)); const psEarly = await cmdPs(storageRoot); expect(psEarly.some((l) => l.includes(threadId))).toBe(true); await new Promise((r) => setTimeout(r, 900)); const psLate = await cmdPs(storageRoot); expect(psLate).toEqual(["(no running threads)"]); }); test("kill stops thread after the in-flight role (before subsequent roles)", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, abortablePlannerBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; await new Promise((r) => setTimeout(r, 50)); const killed = await cmdKill(storageRoot, threadId); expect(killed.ok).toBe(true); await new Promise((r) => setTimeout(r, 900)); const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); const text = await readFile(dataPath, "utf8"); const lines = text .trim() .split("\n") .filter((l) => l !== ""); expect(lines.length).toBe(2); const runningPath = join(dirname(dataPath), `${threadId}.running`); expect(await pathExists(runningPath)).toBe(false); }); test("pause stops between yields and resume completes thread", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, pauseResumeBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); await waitUntilMinDataLines(dataPath, 2, 80); expect(await countDataJsonlLines(dataPath)).toBe(2); const paused = await cmdPause(storageRoot, threadId); expect(paused.ok).toBe(true); await new Promise((r) => setTimeout(r, 400)); expect(await countDataJsonlLines(dataPath)).toBe(2); const resumed = await cmdResume(storageRoot, threadId); expect(resumed.ok).toBe(true); await waitUntilMinDataLines(dataPath, 3, 120); expect(await countDataJsonlLines(dataPath)).toBe(3); const runningPath = join(dirname(dataPath), `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); expect(await pathExists(runningPath)).toBe(false); }); test("pause on completed thread errors", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, fastBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); const runningPath = join(dirname(dataPath), `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); expect(await pathExists(runningPath)).toBe(false); const paused = await cmdPause(storageRoot, threadId); expect(paused.ok).toBe(false); }); test("resume while thread is running but not paused errors", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile(bundlePath, delayedFirstYieldBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); expect(added.ok).toBe(true); if (!added.ok) { return; } const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); expect(ran.ok).toBe(true); if (!ran.ok) { return; } const threadId = ran.value.threadId; await new Promise((r) => setTimeout(r, 40)); const resumed = await cmdResume(storageRoot, threadId); expect(resumed.ok).toBe(false); }); });