Files
united-workforce/packages/cli-workflow/__tests__/thread-cli.test.ts
T
xiaoju 3467b772e6 refactor: named exports (run + descriptor), remove build pipeline
- Bundle contract: export const run + export const descriptor (no default export)
- add only accepts .esm.js, extracts descriptor via dynamic import → .yaml
- Removed: build-pipeline, generate-types, json-schema-to-ts
- Worker loads mod.run instead of mod.default
- Biome: no more noDefaultExport overrides for bundles
- 62 tests pass, biome clean

Closes #8
小橘 <xiaoju@shazhou.work>
2026-05-06 06:39:15 +00:00

354 lines
12 KiB
TypeScript

import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPause } from "../src/cmd-pause.js";
import { cmdPs } from "../src/cmd-ps.js";
import { cmdResume } from "../src/cmd-resume.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
const threadFixtureDescriptor = `export const descriptor = {
description: "thread-cli",
roles: {
planner: { description: "planner", schema: {} },
coder: { description: "coder", schema: {} },
first: { description: "first", schema: {} },
second: { description: "second", schema: {} },
only: { description: "only", schema: {} },
noop: { description: "noop", schema: {} },
},
};
`;
const fastBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" };
};
`;
const slowPlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
await new Promise((r) => setTimeout(r, 400));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" };
};
`;
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
await new Promise((r) => setTimeout(r, 600));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
return { returnCode: 0, summary: "done" };
};
`;
const pauseResumeBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "first", content: "f", meta: {} };
await new Promise((r) => setTimeout(r, 1500));
yield { role: "second", content: "s", meta: {} };
return { returnCode: 0, summary: "done" };
};
`;
const delayedFirstYieldBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
await new Promise((r) => setTimeout(r, 900));
yield { role: "only", content: "x", meta: {} };
return { returnCode: 0, summary: "done" };
};
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(
dataPath: string,
minLines: number,
maxAttempts: number,
): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (!(await pathExists(runningPath))) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
describe("cli thread commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-thread-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("run / threads / thread / thread rm", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
let threads = await cmdThreads(storageRoot, []);
for (
let attempt = 0;
attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId));
attempt++
) {
await new Promise((r) => setTimeout(r, 20));
threads = await cmdThreads(storageRoot, []);
}
expect(threads.ok).toBe(true);
if (!threads.ok) {
return;
}
expect(threads.value.some((l) => l.includes(threadId))).toBe(true);
const shown = await cmdThreadShow(storageRoot, threadId);
expect(shown.ok).toBe(true);
if (!shown.ok) {
return;
}
expect(shown.value.includes('"threadId"')).toBe(true);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
expect(await pathExists(dataPath)).toBe(false);
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], {
env,
encoding: "utf8",
});
expect(threads.status).toBe(0);
const ps = spawnSync(process.execPath, [cliEntryPath, "ps"], { env, encoding: "utf8" });
expect(ps.status).toBe(0);
});
test("ps lists running threads while planner role is in-flight", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, slowPlannerBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 50));
const psEarly = await cmdPs(storageRoot);
expect(psEarly.some((l) => l.includes(threadId))).toBe(true);
await new Promise((r) => setTimeout(r, 900));
const psLate = await cmdPs(storageRoot);
expect(psLate).toEqual(["(no running threads)"]);
});
test("kill stops thread after the in-flight role (before subsequent roles)", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, abortablePlannerBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 50));
const killed = await cmdKill(storageRoot, threadId);
expect(killed.ok).toBe(true);
await new Promise((r) => setTimeout(r, 900));
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const text = await readFile(dataPath, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause stops between yields and resume completes thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, pauseResumeBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
await waitUntilMinDataLines(dataPath, 2, 80);
expect(await countDataJsonlLines(dataPath)).toBe(2);
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(true);
await new Promise((r) => setTimeout(r, 400));
expect(await countDataJsonlLines(dataPath)).toBe(2);
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(true);
await waitUntilMinDataLines(dataPath, 3, 120);
expect(await countDataJsonlLines(dataPath)).toBe(3);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause on completed thread errors", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(false);
});
test("resume while thread is running but not paused errors", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, delayedFirstYieldBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 40));
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(false);
});
});