diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 7163150..f6c7d3d 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -6,11 +6,9 @@ import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import { getGlobalCasDir } from "@uncaged/workflow"; import { cmdCasPut } from "../src/commands/cas/put.js"; -import { cmdKill } from "../src/commands/thread/kill.js"; +import { cmdKill, cmdPause, cmdResume } from "../src/commands/thread/control.js"; import { cmdThreads } from "../src/commands/thread/list.js"; -import { cmdPause } from "../src/commands/thread/pause.js"; import { cmdPs } from "../src/commands/thread/ps.js"; -import { cmdResume } from "../src/commands/thread/resume.js"; import { cmdThreadRemove } from "../src/commands/thread/rm.js"; import { cmdRun } from "../src/commands/thread/run.js"; import { cmdThreadShow } from "../src/commands/thread/show.js"; diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 2d98217..17e5a3f 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -6,13 +6,11 @@ import { cmdCasPut } from "./commands/cas/put.js"; import { cmdCasRm } from "./commands/cas/rm.js"; import { cmdInitTemplate } from "./commands/init/template.js"; import { cmdInitWorkspace } from "./commands/init/workspace.js"; +import { cmdKill, cmdPause, cmdResume } from "./commands/thread/control.js"; import { cmdFork, parseForkArgv } from "./commands/thread/fork.js"; -import { cmdKill } from "./commands/thread/kill.js"; import { cmdThreads } from "./commands/thread/list.js"; import { cmdLive } from "./commands/thread/live.js"; -import { cmdPause } from "./commands/thread/pause.js"; import { cmdPs } from "./commands/thread/ps.js"; -import { cmdResume } from "./commands/thread/resume.js"; import { cmdThreadRemove } from "./commands/thread/rm.js"; import { cmdRun } from "./commands/thread/run.js"; import { cmdThreadShow } from "./commands/thread/show.js"; diff --git a/packages/cli-workflow/src/commands/thread/control.ts b/packages/cli-workflow/src/commands/thread/control.ts new file mode 100644 index 0000000..3aa85b8 --- /dev/null +++ b/packages/cli-workflow/src/commands/thread/control.ts @@ -0,0 +1,52 @@ +import type { Result } from "@uncaged/workflow"; + +import { + readWorkerCtl, + resolveRunningHashForThread, + sendWorkerTcpCommand, +} from "../../worker-spawn.js"; + +type ThreadControlAction = "kill" | "pause" | "resume"; + +async function cmdThreadControl( + storageRoot: string, + threadId: string, + action: ThreadControlAction, +): Promise> { + const hashResult = await resolveRunningHashForThread(storageRoot, threadId); + if (!hashResult.ok) { + return hashResult; + } + + const ctlResult = await readWorkerCtl(storageRoot, hashResult.value); + if (!ctlResult.ok) { + return ctlResult; + } + + return await sendWorkerTcpCommand( + ctlResult.value.port, + { type: action, threadId }, + { awaitResponseLine: true }, + ); +} + +export async function cmdKill( + storageRoot: string, + threadId: string, +): Promise> { + return cmdThreadControl(storageRoot, threadId, "kill"); +} + +export async function cmdPause( + storageRoot: string, + threadId: string, +): Promise> { + return cmdThreadControl(storageRoot, threadId, "pause"); +} + +export async function cmdResume( + storageRoot: string, + threadId: string, +): Promise> { + return cmdThreadControl(storageRoot, threadId, "resume"); +} diff --git a/packages/cli-workflow/src/commands/thread/index.ts b/packages/cli-workflow/src/commands/thread/index.ts index e9a3a92..cd0c5d9 100644 --- a/packages/cli-workflow/src/commands/thread/index.ts +++ b/packages/cli-workflow/src/commands/thread/index.ts @@ -1,5 +1,5 @@ +export { cmdKill, cmdPause, cmdResume } from "./control.js"; export { cmdFork, parseForkArgv } from "./fork.js"; -export { cmdKill } from "./kill.js"; export { cmdThreads } from "./list.js"; export type { LiveRoleRow } from "./live.js"; export { @@ -9,9 +9,7 @@ export { LIVE_CONTENT_MAX_LINES, renderLiveRoleStepLines, } from "./live.js"; -export { cmdPause } from "./pause.js"; export { cmdPs } from "./ps.js"; -export { cmdResume } from "./resume.js"; export { cmdThreadRemove } from "./rm.js"; export { cmdRun } from "./run.js"; export { cmdThreadShow } from "./show.js"; diff --git a/packages/cli-workflow/src/commands/thread/kill.ts b/packages/cli-workflow/src/commands/thread/kill.ts deleted file mode 100644 index 3ccb2fa..0000000 --- a/packages/cli-workflow/src/commands/thread/kill.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { join } from "node:path"; - -import { err, type Result } from "@uncaged/workflow"; - -import { readTextFileIfExists } from "../../fs-utils.js"; -import { - resolveRunningHashForThread, - sendWorkerTcpCommand, - type WorkerCtl, -} from "../../worker-spawn.js"; - -export async function cmdKill( - storageRoot: string, - threadId: string, -): Promise> { - const hashResult = await resolveRunningHashForThread(storageRoot, threadId); - if (!hashResult.ok) { - return hashResult; - } - - const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); - const ctlText = await readTextFileIfExists(ctlPath); - if (ctlText === null) { - return err(`worker control file missing for bundle hash ${hashResult.value}`); - } - - let ctl: WorkerCtl; - try { - ctl = JSON.parse(ctlText) as WorkerCtl; - } catch { - return err(`corrupt worker control file: ${ctlPath}`); - } - - if (typeof ctl.port !== "number" || ctl.port <= 0) { - return err(`invalid worker control file: ${ctlPath}`); - } - - return await sendWorkerTcpCommand( - ctl.port, - { type: "kill", threadId }, - { awaitResponseLine: true }, - ); -} diff --git a/packages/cli-workflow/src/commands/thread/pause.ts b/packages/cli-workflow/src/commands/thread/pause.ts deleted file mode 100644 index a8a414d..0000000 --- a/packages/cli-workflow/src/commands/thread/pause.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { join } from "node:path"; - -import { err, type Result } from "@uncaged/workflow"; - -import { readTextFileIfExists } from "../../fs-utils.js"; -import { - resolveRunningHashForThread, - sendWorkerTcpCommand, - type WorkerCtl, -} from "../../worker-spawn.js"; - -export async function cmdPause( - storageRoot: string, - threadId: string, -): Promise> { - const hashResult = await resolveRunningHashForThread(storageRoot, threadId); - if (!hashResult.ok) { - return hashResult; - } - - const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); - const ctlText = await readTextFileIfExists(ctlPath); - if (ctlText === null) { - return err(`worker control file missing for bundle hash ${hashResult.value}`); - } - - let ctl: WorkerCtl; - try { - ctl = JSON.parse(ctlText) as WorkerCtl; - } catch { - return err(`corrupt worker control file: ${ctlPath}`); - } - - if (typeof ctl.port !== "number" || ctl.port <= 0) { - return err(`invalid worker control file: ${ctlPath}`); - } - - return await sendWorkerTcpCommand( - ctl.port, - { type: "pause", threadId }, - { awaitResponseLine: true }, - ); -} diff --git a/packages/cli-workflow/src/commands/thread/resume.ts b/packages/cli-workflow/src/commands/thread/resume.ts deleted file mode 100644 index 890dbb8..0000000 --- a/packages/cli-workflow/src/commands/thread/resume.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { join } from "node:path"; - -import { err, type Result } from "@uncaged/workflow"; - -import { readTextFileIfExists } from "../../fs-utils.js"; -import { - resolveRunningHashForThread, - sendWorkerTcpCommand, - type WorkerCtl, -} from "../../worker-spawn.js"; - -export async function cmdResume( - storageRoot: string, - threadId: string, -): Promise> { - const hashResult = await resolveRunningHashForThread(storageRoot, threadId); - if (!hashResult.ok) { - return hashResult; - } - - const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); - const ctlText = await readTextFileIfExists(ctlPath); - if (ctlText === null) { - return err(`worker control file missing for bundle hash ${hashResult.value}`); - } - - let ctl: WorkerCtl; - try { - ctl = JSON.parse(ctlText) as WorkerCtl; - } catch { - return err(`corrupt worker control file: ${ctlPath}`); - } - - if (typeof ctl.port !== "number" || ctl.port <= 0) { - return err(`invalid worker control file: ${ctlPath}`); - } - - return await sendWorkerTcpCommand( - ctl.port, - { type: "resume", threadId }, - { awaitResponseLine: true }, - ); -} diff --git a/packages/cli-workflow/src/worker-spawn.ts b/packages/cli-workflow/src/worker-spawn.ts index 57a175a..35456e0 100644 --- a/packages/cli-workflow/src/worker-spawn.ts +++ b/packages/cli-workflow/src/worker-spawn.ts @@ -237,6 +237,30 @@ export async function sendWorkerTcpCommand( }); } +export async function readWorkerCtl( + storageRoot: string, + hash: string, +): Promise> { + const ctlPath = join(storageRoot, "workers", `${hash}.json`); + const ctlText = await readTextFileIfExists(ctlPath); + if (ctlText === null) { + return err(`worker control file missing for bundle hash ${hash}`); + } + + let ctl: WorkerCtl; + try { + ctl = JSON.parse(ctlText) as WorkerCtl; + } catch { + return err(`corrupt worker control file: ${ctlPath}`); + } + + if (typeof ctl.port !== "number" || ctl.port <= 0) { + return err(`invalid worker control file: ${ctlPath}`); + } + + return ok(ctl); +} + export async function resolveRunningHashForThread( storageRoot: string, threadId: string,