From 521d9087197b630d30eeebf416454052469c6a5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sun, 24 May 2026 04:51:26 +0000 Subject: [PATCH] feat(cli): add background thread execution and running threads query This commit implements issue #456, adding two related capabilities to the uwf CLI: 1. **Background execution mode** for `uwf thread step` (via `--background` flag) - Spawns agent execution in a detached background process - Returns immediately with thread ID and background status - Maintains marker files to track running processes - Supports `--count` option to run multiple steps in background - Prevents concurrent execution of the same thread 2. **Running threads query** command (`uwf thread running`) - Lists all threads currently executing in background - Returns thread ID, workflow, current role, PID, and start time - Automatically filters out stale markers (dead processes) - Empty list when no threads are running **Key changes:** - **workflow-protocol**: Added `RunningThreadItem`, `RunningThreadsOutput` types Updated `StepOutput` to include `background: boolean | null` field - **cli-workflow/background**: New module for process management - Marker file creation/deletion (atomic operations) - PID liveness checking - Stale marker cleanup - Running threads query - **cli-workflow/commands/thread**: - Updated `cmdThreadStep` to support `--background` and `--_background-worker` flags - Added `cmdThreadStepBackground` for spawning detached processes - Added `cmdThreadRunning` to list running threads - Updated `cmdThreadKill` to terminate background processes - **cli-workflow/cli**: Added CLI routing for new commands and flags **Integration:** - `uwf thread kill` now terminates background processes before archiving - Foreground execution checks for existing background process and fails if found - Background worker creates/cleans up marker files automatically - Marker files stored in `~/.uncaged/workflow/running/*.json` Co-Authored-By: Claude Opus 4.6 --- .../cli-workflow/src/background/background.ts | 147 ++++++++++++++++++ packages/cli-workflow/src/background/index.ts | 11 ++ packages/cli-workflow/src/background/types.ts | 9 ++ packages/cli-workflow/src/cli.ts | 60 +++++-- packages/cli-workflow/src/commands/thread.ts | 130 ++++++++++++++-- packages/workflow-protocol/src/index.ts | 2 + packages/workflow-protocol/src/types.ts | 14 ++ 7 files changed, 351 insertions(+), 22 deletions(-) create mode 100644 packages/cli-workflow/src/background/background.ts create mode 100644 packages/cli-workflow/src/background/index.ts create mode 100644 packages/cli-workflow/src/background/types.ts diff --git a/packages/cli-workflow/src/background/background.ts b/packages/cli-workflow/src/background/background.ts new file mode 100644 index 0000000..ab41ba8 --- /dev/null +++ b/packages/cli-workflow/src/background/background.ts @@ -0,0 +1,147 @@ +import { mkdir, readdir, readFile, rename, rm, writeFile } from "node:fs/promises"; +import { join } from "node:path"; +import type { RunningThreadItem, ThreadId } from "@uncaged/workflow-protocol"; + +import type { RunningMarker } from "./types.js"; + +/** + * Get the path to the running markers directory. + */ +export function getRunningDir(storageRoot: string): string { + return join(storageRoot, "running"); +} + +/** + * Get the path to a specific thread's marker file. + */ +export function getMarkerPath(storageRoot: string, threadId: ThreadId): string { + return join(getRunningDir(storageRoot), `${threadId}.json`); +} + +/** + * Check if a PID is still running. + * Returns true if the process exists, false otherwise. + */ +export function isPidAlive(pid: number): boolean { + try { + // process.kill with signal 0 checks existence without killing + process.kill(pid, 0); + return true; + } catch { + // ESRCH means process doesn't exist + return false; + } +} + +/** + * Create a marker file for a running thread. + * Writes to a temp file in the same directory, then atomically renames. + */ +export async function createMarker(storageRoot: string, marker: RunningMarker): Promise { + const runningDir = getRunningDir(storageRoot); + await mkdir(runningDir, { recursive: true }); + + const markerPath = getMarkerPath(storageRoot, marker.thread); + const tempPath = join(runningDir, `.${marker.thread}-${process.pid}.tmp`); + + const content = JSON.stringify(marker, null, 2); + await writeFile(tempPath, content, "utf8"); + await rename(tempPath, markerPath); +} + +/** + * Delete a marker file for a thread. + */ +export async function deleteMarker(storageRoot: string, threadId: ThreadId): Promise { + const markerPath = getMarkerPath(storageRoot, threadId); + try { + await rm(markerPath); + } catch { + // Ignore errors if file doesn't exist + } +} + +/** + * Read a marker file. Returns null if file doesn't exist or is invalid. + */ +export async function readMarker( + storageRoot: string, + threadId: ThreadId, +): Promise { + const markerPath = getMarkerPath(storageRoot, threadId); + try { + const content = await readFile(markerPath, "utf8"); + const marker = JSON.parse(content) as RunningMarker; + return marker; + } catch { + return null; + } +} + +/** + * List all running threads, filtering out stale markers. + */ +export async function listRunningThreads(storageRoot: string): Promise { + const runningDir = getRunningDir(storageRoot); + + let files: string[]; + try { + files = await readdir(runningDir); + } catch { + // Directory doesn't exist or can't be read + return []; + } + + const results: RunningThreadItem[] = []; + + for (const filename of files) { + if (!filename.endsWith(".json")) { + continue; + } + + const threadId = filename.slice(0, -5) as ThreadId; + const marker = await readMarker(storageRoot, threadId); + + if (marker === null) { + // Invalid marker file + continue; + } + + if (!isPidAlive(marker.pid)) { + // Stale marker - process no longer exists + await deleteMarker(storageRoot, threadId); + continue; + } + + results.push({ + thread: marker.thread, + workflow: marker.workflow, + pid: marker.pid, + startedAt: marker.startedAt, + }); + } + + return results; +} + +/** + * Check if a thread is currently executing in the background. + * Returns the marker if running, null otherwise. + */ +export async function isThreadRunning( + storageRoot: string, + threadId: ThreadId, +): Promise { + const marker = await readMarker(storageRoot, threadId); + if (marker === null) { + return null; + } + + if (!isPidAlive(marker.pid)) { + // Stale marker + await deleteMarker(storageRoot, threadId); + return null; + } + + return marker; +} diff --git a/packages/cli-workflow/src/background/index.ts b/packages/cli-workflow/src/background/index.ts new file mode 100644 index 0000000..daf3a32 --- /dev/null +++ b/packages/cli-workflow/src/background/index.ts @@ -0,0 +1,11 @@ +export { + createMarker, + deleteMarker, + getMarkerPath, + getRunningDir, + isPidAlive, + isThreadRunning, + listRunningThreads, + readMarker, +} from "./background.js"; +export type { RunningMarker } from "./types.js"; diff --git a/packages/cli-workflow/src/background/types.ts b/packages/cli-workflow/src/background/types.ts new file mode 100644 index 0000000..1c1e777 --- /dev/null +++ b/packages/cli-workflow/src/background/types.ts @@ -0,0 +1,9 @@ +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; + +/** Marker file stored at ~/.uncaged/workflow/running/.json */ +export type RunningMarker = { + thread: ThreadId; + workflow: CasRef; + pid: number; + startedAt: number; +}; diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index a380e7d..3bff36d 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -22,6 +22,7 @@ import { cmdThreadKill, cmdThreadList, cmdThreadRead, + cmdThreadRunning, cmdThreadShow, cmdThreadStart, cmdThreadStep, @@ -114,19 +115,41 @@ thread .argument("", "Thread ULID") .option("--agent ", "Override agent command") .option("-c, --count ", "Number of steps to run (default: 1)") - .action((threadId: string, opts: { agent: string | undefined; count: string | undefined }) => { - const storageRoot = resolveStorageRoot(); - runAction(async () => { - const agentOverride = opts.agent ?? null; - const count = opts.count !== undefined ? Number(opts.count) : 1; - const results = await cmdThreadStep(storageRoot, threadId, agentOverride, count); - if (results.length === 1) { - writeOutput(results[0]); - } else { - writeOutput(results); - } - }); - }); + .option("--background", "Run in background and return immediately") + .option("--_background-worker", "Internal flag for background worker process", false) + .action( + ( + threadId: string, + opts: { + agent: string | undefined; + count: string | undefined; + background: boolean; + _backgroundWorker: boolean; + }, + ) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const agentOverride = opts.agent ?? null; + const count = opts.count !== undefined ? Number(opts.count) : 1; + const background = opts.background ?? false; + const backgroundWorker = opts._backgroundWorker ?? false; + + const results = await cmdThreadStep( + storageRoot, + threadId, + agentOverride, + count, + background, + backgroundWorker, + ); + if (results.length === 1) { + writeOutput(results[0]); + } else { + writeOutput(results); + } + }); + }, + ); thread .command("show") @@ -152,6 +175,17 @@ thread }); }); +thread + .command("running") + .description("List threads currently executing in the background") + .action(() => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadRunning(storageRoot); + writeOutput(result); + }); + }); + thread .command("kill") .description("Terminate and archive a thread") diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index eefb8ab..12d0f0c 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -1,4 +1,4 @@ -import { execFileSync } from "node:child_process"; +import { execFileSync, spawn } from "node:child_process"; import { access, readFile } from "node:fs/promises"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; @@ -10,6 +10,7 @@ import type { AgentConfig, CasRef, ModeratorContext, + RunningThreadsOutput, StartEntry, StartNodePayload, StartOutput, @@ -27,7 +28,12 @@ import type { import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util"; import { config as loadDotenv } from "dotenv"; import { parse, stringify } from "yaml"; - +import { + createMarker, + deleteMarker, + isThreadRunning, + listRunningThreads, +} from "../background/index.js"; import { appendThreadHistory, createUwfStore, @@ -52,6 +58,7 @@ const PL_AGENT_SPAWN = "R5J2W8N4"; const PL_AGENT_DONE = "C6P9E3H7"; const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_STEP_ERROR = "B8T5N1V6"; +const PL_BACKGROUND_START = "X7Q4W9M2"; function failStep(plog: ProcessLogger, message: string): never { plog.log(PL_STEP_ERROR, message, null); @@ -321,6 +328,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr thread: threadId, head: activeHead, done: false, + background: null, }; } @@ -331,6 +339,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr thread: threadId, head: hist.head, done: true, + background: null, }; } @@ -853,26 +862,60 @@ export async function cmdThreadStep( threadId: ThreadId, agentOverride: string | null, count: number, + background: boolean, + backgroundWorker: boolean, ): Promise { if (count < 1 || !Number.isInteger(count)) { fail(`--count must be a positive integer, got: ${count}`); } + // Check if thread is already running in background (unless we ARE the background worker) + if (!backgroundWorker) { + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker !== null) { + fail(`thread already executing in background (PID: ${runningMarker.pid})`); + } + } + const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId); const plog = createProcessLogger({ storageRoot, context: { thread: threadId, workflow: workflowHash }, }); - const results: StepOutput[] = []; - for (let i = 0; i < count; i++) { - const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog); - results.push(result); - if (result.done) { - break; + if (background && !backgroundWorker) { + // Spawn background process + return cmdThreadStepBackground(storageRoot, threadId, agentOverride, count, plog, workflowHash); + } + + // If we're the background worker, create marker before execution + let markerCreated = false; + if (backgroundWorker) { + await createMarker(storageRoot, { + thread: threadId, + workflow: workflowHash, + pid: process.pid, + startedAt: Date.now(), + }); + markerCreated = true; + } + + try { + const results: StepOutput[] = []; + for (let i = 0; i < count; i++) { + const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog); + results.push(result); + if (result.done) { + break; + } + } + return results; + } finally { + // Cleanup marker if we created one + if (markerCreated) { + await deleteMarker(storageRoot, threadId); } } - return results; } async function resolveActiveThreadWorkflowHash( @@ -889,6 +932,57 @@ async function resolveActiveThreadWorkflowHash( return chain.start.workflow; } +async function cmdThreadStepBackground( + storageRoot: string, + threadId: ThreadId, + agentOverride: string | null, + count: number, + plog: ProcessLogger, + workflowHash: CasRef, +): Promise { + // Get current head to return to caller + const index = await loadThreadsIndex(storageRoot); + const headHash = index[threadId]; + if (headHash === undefined) { + failStep(plog, `thread not active: ${threadId}`); + } + + // Spawn detached background process + const scriptPath = process.argv[1]; + if (scriptPath === undefined) { + failStep(plog, "unable to determine script path for background execution"); + } + + const args = ["thread", "step", threadId, "--count", String(count)]; + + if (agentOverride !== null) { + args.push("--agent", agentOverride); + } + + // Internal flag to signal the background worker to create/cleanup markers + args.push("--_background-worker"); + + plog.log(PL_BACKGROUND_START, `spawning background process count=${count}`, null); + + const child = spawn(scriptPath, args, { + detached: true, + stdio: "ignore", + }); + + child.unref(); + + // Return immediately with current state and background flag + return [ + { + workflow: workflowHash, + thread: threadId, + head: headHash, + done: false, + background: true, + }, + ]; +} + async function cmdThreadStepOnce( storageRoot: string, threadId: ThreadId, @@ -926,6 +1020,7 @@ async function cmdThreadStepOnce( thread: threadId, head: headHash, done: true, + background: null, }; } @@ -973,6 +1068,7 @@ async function cmdThreadStepOnce( thread: threadId, head: newHead, done, + background: null, }; } @@ -1109,6 +1205,17 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr fail(`thread not active: ${threadId}`); } + // Check if thread is running in background and terminate it + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker !== null) { + try { + process.kill(runningMarker.pid, "SIGTERM"); + } catch { + // Process may have already exited, ignore error + } + await deleteMarker(storageRoot, threadId); + } + const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, head); if (workflow === null) { @@ -1128,3 +1235,8 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr return { thread: threadId, archived: true }; } + +export async function cmdThreadRunning(storageRoot: string): Promise { + const threads = await listRunningThreads(storageRoot); + return { threads }; +} diff --git a/packages/workflow-protocol/src/index.ts b/packages/workflow-protocol/src/index.ts index fe4a019..99a11f3 100644 --- a/packages/workflow-protocol/src/index.ts +++ b/packages/workflow-protocol/src/index.ts @@ -15,6 +15,8 @@ export type { ProviderConfig, RoleDefinition, RoleName, + RunningThreadItem, + RunningThreadsOutput, Scenario, StartEntry, StartNodePayload, diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index fe256f4..ddd7f32 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -84,6 +84,7 @@ export type StepOutput = { thread: ThreadId; head: CasRef; done: boolean; + background: boolean | null; }; /** uwf thread steps — single step entry */ @@ -126,6 +127,19 @@ export type ThreadListItem = { head: CasRef; }; +/** uwf thread running — single running thread entry */ +export type RunningThreadItem = { + thread: ThreadId; + workflow: CasRef; + pid: number; + startedAt: number; +}; + +/** uwf thread running output */ +export type RunningThreadsOutput = { + threads: RunningThreadItem[]; +}; + // ── 4.6 配置 ──────────────────────────────────────────────────────── /** Alias types for config references */