feat: Phase 3 — version history/rollback + pause/resume threads

- CLI: history, rollback, pause, resume commands
- Registry: rollbackWorkflowToHistoryHash
- Engine: awaitAfterEachYield hook for pause gate
- Worker: ThreadPauseGate with Promise-based latch
- TCP IPC: bidirectional response for kill/pause/resume
- 44 tests pass, biome clean

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-06 05:36:33 +00:00
parent 9943f21f5c
commit 0becafeb44
18 changed files with 992 additions and 104 deletions
@@ -1,11 +1,15 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdHistory } from "../src/cmd-history.js";
import { cmdList, formatListLines } from "../src/cmd-list.js";
import { cmdRemove } from "../src/cmd-remove.js";
import { cmdRollback } from "../src/cmd-rollback.js";
import { cmdShow } from "../src/cmd-show.js";
describe("cli workflow commands", () => {
@@ -87,4 +91,159 @@ export default async function* (input) {
const r = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(r.ok).toBe(false);
});
test("history lists current + prior versions sorted by time descending", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `export default async function* (input) {
yield { role: "a", content: "v1", meta: {} };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `export default async function* (input) {
yield { role: "a", content: "v2", meta: {} };
return { returnCode: 0, summary: "v2" };
}
`;
await writeFile(bundlePath, v1, "utf8");
const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add1.ok).toBe(true);
await new Promise((r) => setTimeout(r, 15));
await writeFile(bundlePath, v2, "utf8");
const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add2.ok).toBe(true);
const hist = await cmdHistory(storageRoot, "solve-issue");
expect(hist.ok).toBe(true);
if (!hist.ok) {
return;
}
expect(hist.value.length).toBe(2);
const dates = hist.value.map((line) => {
const parts = line.split("\t");
return Date.parse(parts[1] ?? "");
});
expect(Number.isFinite(dates[0])).toBe(true);
expect(Number.isFinite(dates[1])).toBe(true);
expect(dates[0] >= dates[1]).toBe(true);
expect(hist.value.some((l) => l.endsWith("(current)"))).toBe(true);
});
test("rollback swaps registry head with a history hash", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `export default async function* (input) {
yield { role: "a", content: "v1", meta: {} };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `export default async function* (input) {
yield { role: "a", content: "v2", meta: {} };
return { returnCode: 0, summary: "v2" };
}
`;
await writeFile(bundlePath, v1, "utf8");
const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add1.ok).toBe(true);
if (!add1.ok) {
return;
}
const hash1 = add1.value.hash;
await writeFile(bundlePath, v2, "utf8");
const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add2.ok).toBe(true);
if (!add2.ok) {
return;
}
const hash2 = add2.value.hash;
const rb = await cmdRollback(storageRoot, "solve-issue", null);
expect(rb.ok).toBe(true);
const reg = await readWorkflowRegistry(storageRoot);
expect(reg.ok).toBe(true);
if (!reg.ok) {
return;
}
const entry = getRegisteredWorkflow(reg.value, "solve-issue");
expect(entry).not.toBeNull();
if (entry === null) {
return;
}
expect(entry.hash).toBe(hash1);
expect(entry.history.some((h) => h.hash === hash2)).toBe(true);
});
test("rollback rejects a hash that is not in history", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`export default async function* (input) {
yield { role: "a", content: "x", meta: {} };
return { returnCode: 0, summary: "x" };
}
`,
"utf8",
);
const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add1.ok).toBe(true);
await writeFile(
bundlePath,
`export default async function* (input) {
yield { role: "a", content: "y", meta: {} };
return { returnCode: 0, summary: "y" };
}
`,
"utf8",
);
const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add2.ok).toBe(true);
const bad = await cmdRollback(storageRoot, "solve-issue", "0000000000000");
expect(bad.ok).toBe(false);
});
test("rollback rejects missing bundle file for target hash", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`export default async function* (input) {
yield { role: "a", content: "x", meta: {} };
return { returnCode: 0, summary: "x" };
}
`,
"utf8",
);
const add1 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add1.ok).toBe(true);
if (!add1.ok) {
return;
}
const hash1 = add1.value.hash;
await writeFile(
bundlePath,
`export default async function* (input) {
yield { role: "a", content: "y", meta: {} };
return { returnCode: 0, summary: "y" };
}
`,
"utf8",
);
const add2 = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(add2.ok).toBe(true);
if (!add2.ok) {
return;
}
await unlink(join(storageRoot, "bundles", `${hash1}.esm.js`));
const rb = await cmdRollback(storageRoot, "solve-issue", hash1);
expect(rb.ok).toBe(false);
});
});
@@ -7,7 +7,9 @@ import { fileURLToPath } from "node:url";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPause } from "../src/cmd-pause.js";
import { cmdPs } from "../src/cmd-ps.js";
import { cmdResume } from "../src/cmd-resume.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
@@ -38,6 +40,55 @@ const abortablePlannerBundleSource = `export default async function* (input) {
}
`;
const pauseResumeBundleSource = `export default async function* (input) {
yield { role: "first", content: "f", meta: {} };
await new Promise((r) => setTimeout(r, 1500));
yield { role: "second", content: "s", meta: {} };
return { returnCode: 0, summary: "done" };
}
`;
const delayedFirstYieldBundleSource = `export default async function* (input) {
await new Promise((r) => setTimeout(r, 900));
yield { role: "only", content: "x", meta: {} };
return { returnCode: 0, summary: "done" };
}
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(
dataPath: string,
minLines: number,
maxAttempts: number,
): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (!(await pathExists(runningPath))) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
describe("cli thread commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -186,4 +237,99 @@ describe("cli thread commands", () => {
const runningPath = join(dirname(dataPath), `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause stops between yields and resume completes thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, pauseResumeBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
await waitUntilMinDataLines(dataPath, 2, 80);
expect(await countDataJsonlLines(dataPath)).toBe(2);
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(true);
await new Promise((r) => setTimeout(r, 400));
expect(await countDataJsonlLines(dataPath)).toBe(2);
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(true);
await waitUntilMinDataLines(dataPath, 3, 120);
expect(await countDataJsonlLines(dataPath)).toBe(3);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause on completed thread errors", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(false);
});
test("resume while thread is running but not paused errors", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, delayedFirstYieldBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 40));
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(false);
});
});
+102 -34
View File
@@ -1,9 +1,13 @@
import { printCliError, printCliLine } from "./cli-output.js";
import { cmdAdd, formatAddSuccess } from "./cmd-add.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
import { cmdPause } from "./cmd-pause.js";
import { cmdPs } from "./cmd-ps.js";
import { cmdRemove } from "./cmd-remove.js";
import { cmdResume } from "./cmd-resume.js";
import { cmdRollback } from "./cmd-rollback.js";
import { cmdRun } from "./cmd-run.js";
import { cmdShow, formatShowYaml } from "./cmd-show.js";
import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js";
@@ -20,6 +24,10 @@ function usage(): string {
" uncaged-workflow run <name> [--prompt <text>] [--dry-run] [--max-rounds N]",
" uncaged-workflow ps",
" uncaged-workflow kill <thread-id>",
" uncaged-workflow history <name>",
" uncaged-workflow rollback <name> [hash]",
" uncaged-workflow pause <thread-id>",
" uncaged-workflow resume <thread-id>",
" uncaged-workflow threads [name]",
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
@@ -137,6 +145,69 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise<number
return 0;
}
async function dispatchHistory(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: history requires <name>`);
return 1;
}
const result = await cmdHistory(storageRoot, name);
if (!result.ok) {
printCliError(result.error);
return 1;
}
for (const line of result.value) {
printCliLine(line);
}
return 0;
}
async function dispatchRollback(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 2) {
printCliError(`${usage()}\n\nerror: rollback requires <name> [hash]`);
return 1;
}
const hashArg = argv[1];
const result = await cmdRollback(storageRoot, name, hashArg === undefined ? null : hashArg);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`rolled back workflow "${name}"`);
return 0;
}
async function dispatchPause(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: pause requires <thread-id>`);
return 1;
}
const result = await cmdPause(storageRoot, threadId);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`pause sent for thread ${threadId}`);
return 0;
}
async function dispatchResume(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: resume requires <thread-id>`);
return 1;
}
const result = await cmdResume(storageRoot, threadId);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`resume sent for thread ${threadId}`);
return 0;
}
async function dispatchThreads(storageRoot: string, argv: string[]): Promise<number> {
const result = await cmdThreads(storageRoot, argv);
if (!result.ok) {
@@ -179,6 +250,32 @@ async function dispatchThreadRm(storageRoot: string, argv: string[]): Promise<nu
return 0;
}
async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promise<number> {
const sub = rest[0];
if (sub === "rm") {
return dispatchThreadRm(storageRoot, rest.slice(1));
}
return dispatchThread(storageRoot, rest);
}
type DispatchFn = (storageRoot: string, argv: string[]) => Promise<number>;
const COMMAND_TABLE: Record<string, DispatchFn> = {
add: dispatchAdd,
list: dispatchList,
show: dispatchShow,
remove: dispatchRemove,
run: dispatchRun,
ps: dispatchPs,
kill: dispatchKill,
history: dispatchHistory,
rollback: dispatchRollback,
pause: dispatchPause,
resume: dispatchResume,
threads: dispatchThreads,
thread: dispatchThreadBranch,
};
export async function runCli(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length === 0) {
printCliError(usage());
@@ -190,39 +287,10 @@ export async function runCli(storageRoot: string, argv: string[]): Promise<numbe
return 1;
}
const rest = argv.slice(1);
if (command === "add") {
return dispatchAdd(storageRoot, rest);
const dispatch = COMMAND_TABLE[command];
if (dispatch === undefined) {
printCliError(`${usage()}\n\nerror: unknown command ${command}`);
return 1;
}
if (command === "list") {
return dispatchList(storageRoot, rest);
}
if (command === "show") {
return dispatchShow(storageRoot, rest);
}
if (command === "remove") {
return dispatchRemove(storageRoot, rest);
}
if (command === "run") {
return dispatchRun(storageRoot, rest);
}
if (command === "ps") {
return dispatchPs(storageRoot, rest);
}
if (command === "kill") {
return dispatchKill(storageRoot, rest);
}
if (command === "threads") {
return dispatchThreads(storageRoot, rest);
}
if (command === "thread") {
const sub = rest[0];
if (sub === "rm") {
return dispatchThreadRm(storageRoot, rest.slice(1));
}
return dispatchThread(storageRoot, rest);
}
printCliError(`${usage()}\n\nerror: unknown command ${command}`);
return 1;
return dispatch(storageRoot, rest);
}
+43
View File
@@ -0,0 +1,43 @@
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
} from "@uncaged/workflow";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdHistory(
storageRoot: string,
name: string,
): Promise<Result<string[], string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const entry = getRegisteredWorkflow(reg.value, name);
if (entry === null) {
return err(`workflow not registered: ${name}`);
}
type Row = { hash: string; timestamp: number; isCurrent: boolean };
const rows: Row[] = [
{ hash: entry.hash, timestamp: entry.timestamp, isCurrent: true },
...entry.history.map((h) => ({ hash: h.hash, timestamp: h.timestamp, isCurrent: false })),
];
rows.sort((a, b) => b.timestamp - a.timestamp);
const lines = rows.map((r) => {
const date = new Date(r.timestamp).toISOString();
const suffix = r.isCurrent ? "\t(current)" : "";
return `${r.hash}\t${date}${suffix}`;
});
return ok(lines);
}
+5 -1
View File
@@ -35,5 +35,9 @@ export async function cmdKill(
return err(`invalid worker control file: ${ctlPath}`);
}
return await sendWorkerTcpCommand(ctl.port, { type: "kill", threadId });
return await sendWorkerTcpCommand(
ctl.port,
{ type: "kill", threadId },
{ awaitResponseLine: true },
);
}
+43
View File
@@ -0,0 +1,43 @@
import { join } from "node:path";
import { err, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import {
resolveRunningHashForThread,
sendWorkerTcpCommand,
type WorkerCtl,
} from "./worker-spawn.js";
export async function cmdPause(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const hashResult = await resolveRunningHashForThread(storageRoot, threadId);
if (!hashResult.ok) {
return hashResult;
}
const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`);
const ctlText = await readTextFileIfExists(ctlPath);
if (ctlText === null) {
return err(`worker control file missing for bundle hash ${hashResult.value}`);
}
let ctl: WorkerCtl;
try {
ctl = JSON.parse(ctlText) as WorkerCtl;
} catch {
return err(`corrupt worker control file: ${ctlPath}`);
}
if (typeof ctl.port !== "number" || ctl.port <= 0) {
return err(`invalid worker control file: ${ctlPath}`);
}
return await sendWorkerTcpCommand(
ctl.port,
{ type: "pause", threadId },
{ awaitResponseLine: true },
);
}
+43
View File
@@ -0,0 +1,43 @@
import { join } from "node:path";
import { err, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import {
resolveRunningHashForThread,
sendWorkerTcpCommand,
type WorkerCtl,
} from "./worker-spawn.js";
export async function cmdResume(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const hashResult = await resolveRunningHashForThread(storageRoot, threadId);
if (!hashResult.ok) {
return hashResult;
}
const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`);
const ctlText = await readTextFileIfExists(ctlPath);
if (ctlText === null) {
return err(`worker control file missing for bundle hash ${hashResult.value}`);
}
let ctl: WorkerCtl;
try {
ctl = JSON.parse(ctlText) as WorkerCtl;
} catch {
return err(`corrupt worker control file: ${ctlPath}`);
}
if (typeof ctl.port !== "number" || ctl.port <= 0) {
return err(`invalid worker control file: ${ctlPath}`);
}
return await sendWorkerTcpCommand(
ctl.port,
{ type: "resume", threadId },
{ awaitResponseLine: true },
);
}
+55
View File
@@ -0,0 +1,55 @@
import { join } from "node:path";
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
rollbackWorkflowToHistoryHash,
writeWorkflowRegistry,
} from "@uncaged/workflow";
import { pathExists } from "./fs-utils.js";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdRollback(
storageRoot: string,
name: string,
hash: string | null,
): Promise<Result<void, string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const entry = getRegisteredWorkflow(reg.value, name);
if (entry === null) {
return err(`workflow not registered: ${name}`);
}
const rolled = rollbackWorkflowToHistoryHash(entry, hash);
if (!rolled.ok) {
return err(rolled.error.message);
}
const bundlePath = join(storageRoot, "bundles", `${rolled.value.hash}.esm.js`);
if (!(await pathExists(bundlePath))) {
return err(`bundle file not found for hash ${rolled.value.hash}`);
}
const nextRegistry = {
workflows: { ...reg.value.workflows, [name]: rolled.value },
};
const written = await writeWorkflowRegistry(storageRoot, nextRegistry);
if (!written.ok) {
return err(written.error.message);
}
return ok(undefined);
}
+11 -7
View File
@@ -40,13 +40,17 @@ export async function cmdRun(
}
const threadId = generateUlid(Date.now());
const sent = await sendWorkerTcpCommand(worker.value.port, {
type: "run",
threadId,
workflowName: name,
prompt,
options: { isDryRun, maxRounds },
});
const sent = await sendWorkerTcpCommand(
worker.value.port,
{
type: "run",
threadId,
workflowName: name,
prompt,
options: { isDryRun, maxRounds },
},
{ awaitResponseLine: false },
);
if (!sent.ok) {
return sent;
}
+72 -6
View File
@@ -143,30 +143,96 @@ export async function ensureWorkerForHash(
return ok({ port: spawned.value.port });
}
export type SendWorkerTcpOptions = {
awaitResponseLine: boolean;
};
function parseWorkerControlResponseLine(line: string): Result<void, string> {
let parsed: unknown;
try {
parsed = JSON.parse(line.trim()) as unknown;
} catch {
return err("invalid JSON in worker response");
}
if (parsed === null || typeof parsed !== "object") {
return err("invalid worker response shape");
}
const rec = parsed as Record<string, unknown>;
if (rec.ok === true) {
return ok(undefined);
}
if (rec.ok === false) {
const message = rec.error;
if (typeof message === "string") {
return err(message);
}
return err("worker error response missing error string");
}
return err("invalid worker response: missing ok field");
}
export async function sendWorkerTcpCommand(
port: number,
payload: unknown,
options: SendWorkerTcpOptions = { awaitResponseLine: false },
): Promise<Result<void, string>> {
return await new Promise((resolve) => {
let settled = false;
let buf = "";
const socket = createConnection({ host: "127.0.0.1", port }, () => {
socket.write(`${JSON.stringify(payload)}\n`);
socket.end();
if (!options.awaitResponseLine) {
socket.end();
}
});
function finish(result: Result<void, string>): void {
if (settled) {
return;
}
settled = true;
if (options.awaitResponseLine && socket.writable) {
socket.end();
}
resolve(result);
}
function tryFinishFromBuffer(): void {
if (!options.awaitResponseLine) {
return;
}
const nl = buf.indexOf("\n");
if (nl < 0) {
return;
}
finish(parseWorkerControlResponseLine(buf.slice(0, nl)));
}
socket.on("data", (chunk: Buffer | string) => {
if (!options.awaitResponseLine) {
return;
}
buf += typeof chunk === "string" ? chunk : chunk.toString("utf8");
tryFinishFromBuffer();
});
socket.on("error", (e) => {
if (settled) {
return;
}
settled = true;
const message = e instanceof Error ? e.message : String(e);
resolve(err(`failed to send worker command: ${message}`));
finish(err(`failed to send worker command: ${message}`));
});
socket.on("close", () => {
if (settled) {
if (options.awaitResponseLine) {
tryFinishFromBuffer();
if (!settled) {
finish(err("worker closed without control response"));
}
return;
}
settled = true;
resolve(ok(undefined));
finish(ok(undefined));
});
});
}