Merge pull request 'feat(cli): thread step --background + thread running' (#457) from fix/456-thread-step-background into main

This commit is contained in:
2026-05-24 05:33:56 +00:00
7 changed files with 351 additions and 22 deletions
@@ -0,0 +1,147 @@
import { mkdir, readdir, readFile, rename, rm, writeFile } from "node:fs/promises";
import { join } from "node:path";
import type { RunningThreadItem, ThreadId } from "@uncaged/workflow-protocol";
import type { RunningMarker } from "./types.js";
/**
* Get the path to the running markers directory.
*/
export function getRunningDir(storageRoot: string): string {
return join(storageRoot, "running");
}
/**
* Get the path to a specific thread's marker file.
*/
export function getMarkerPath(storageRoot: string, threadId: ThreadId): string {
return join(getRunningDir(storageRoot), `${threadId}.json`);
}
/**
* Check if a PID is still running.
* Returns true if the process exists, false otherwise.
*/
export function isPidAlive(pid: number): boolean {
try {
// process.kill with signal 0 checks existence without killing
process.kill(pid, 0);
return true;
} catch {
// ESRCH means process doesn't exist
return false;
}
}
/**
* Create a marker file for a running thread.
* Writes to a temp file in the same directory, then atomically renames.
*/
export async function createMarker(storageRoot: string, marker: RunningMarker): Promise<void> {
const runningDir = getRunningDir(storageRoot);
await mkdir(runningDir, { recursive: true });
const markerPath = getMarkerPath(storageRoot, marker.thread);
const tempPath = join(runningDir, `.${marker.thread}-${process.pid}.tmp`);
const content = JSON.stringify(marker, null, 2);
await writeFile(tempPath, content, "utf8");
await rename(tempPath, markerPath);
}
/**
* Delete a marker file for a thread.
*/
export async function deleteMarker(storageRoot: string, threadId: ThreadId): Promise<void> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
await rm(markerPath);
} catch {
// Ignore errors if file doesn't exist
}
}
/**
* Read a marker file. Returns null if file doesn't exist or is invalid.
*/
export async function readMarker(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const markerPath = getMarkerPath(storageRoot, threadId);
try {
const content = await readFile(markerPath, "utf8");
const marker = JSON.parse(content) as RunningMarker;
return marker;
} catch {
return null;
}
}
/**
* List all running threads, filtering out stale markers.
*/
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadItem[]> {
const runningDir = getRunningDir(storageRoot);
let files: string[];
try {
files = await readdir(runningDir);
} catch {
// Directory doesn't exist or can't be read
return [];
}
const results: RunningThreadItem[] = [];
for (const filename of files) {
if (!filename.endsWith(".json")) {
continue;
}
const threadId = filename.slice(0, -5) as ThreadId;
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
// Invalid marker file
continue;
}
if (!isPidAlive(marker.pid)) {
// Stale marker - process no longer exists
await deleteMarker(storageRoot, threadId);
continue;
}
results.push({
thread: marker.thread,
workflow: marker.workflow,
pid: marker.pid,
startedAt: marker.startedAt,
});
}
return results;
}
/**
* Check if a thread is currently executing in the background.
* Returns the marker if running, null otherwise.
*/
export async function isThreadRunning(
storageRoot: string,
threadId: ThreadId,
): Promise<RunningMarker | null> {
const marker = await readMarker(storageRoot, threadId);
if (marker === null) {
return null;
}
if (!isPidAlive(marker.pid)) {
// Stale marker
await deleteMarker(storageRoot, threadId);
return null;
}
return marker;
}
@@ -0,0 +1,11 @@
export {
createMarker,
deleteMarker,
getMarkerPath,
getRunningDir,
isPidAlive,
isThreadRunning,
listRunningThreads,
readMarker,
} from "./background.js";
export type { RunningMarker } from "./types.js";
@@ -0,0 +1,9 @@
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
/** Marker file stored at ~/.uncaged/workflow/running/<thread-id>.json */
export type RunningMarker = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
+47 -13
View File
@@ -22,6 +22,7 @@ import {
cmdThreadKill, cmdThreadKill,
cmdThreadList, cmdThreadList,
cmdThreadRead, cmdThreadRead,
cmdThreadRunning,
cmdThreadShow, cmdThreadShow,
cmdThreadStart, cmdThreadStart,
cmdThreadStep, cmdThreadStep,
@@ -114,19 +115,41 @@ thread
.argument("<thread-id>", "Thread ULID") .argument("<thread-id>", "Thread ULID")
.option("--agent <cmd>", "Override agent command") .option("--agent <cmd>", "Override agent command")
.option("-c, --count <number>", "Number of steps to run (default: 1)") .option("-c, --count <number>", "Number of steps to run (default: 1)")
.action((threadId: string, opts: { agent: string | undefined; count: string | undefined }) => { .option("--background", "Run in background and return immediately")
const storageRoot = resolveStorageRoot(); .option("--_background-worker", "Internal flag for background worker process", false)
runAction(async () => { .action(
const agentOverride = opts.agent ?? null; (
const count = opts.count !== undefined ? Number(opts.count) : 1; threadId: string,
const results = await cmdThreadStep(storageRoot, threadId, agentOverride, count); opts: {
if (results.length === 1) { agent: string | undefined;
writeOutput(results[0]); count: string | undefined;
} else { background: boolean;
writeOutput(results); _backgroundWorker: boolean;
} },
}); ) => {
}); const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const count = opts.count !== undefined ? Number(opts.count) : 1;
const background = opts.background ?? false;
const backgroundWorker = opts._backgroundWorker ?? false;
const results = await cmdThreadStep(
storageRoot,
threadId,
agentOverride,
count,
background,
backgroundWorker,
);
if (results.length === 1) {
writeOutput(results[0]);
} else {
writeOutput(results);
}
});
},
);
thread thread
.command("show") .command("show")
@@ -152,6 +175,17 @@ thread
}); });
}); });
thread
.command("running")
.description("List threads currently executing in the background")
.action(() => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadRunning(storageRoot);
writeOutput(result);
});
});
thread thread
.command("kill") .command("kill")
.description("Terminate and archive a thread") .description("Terminate and archive a thread")
+121 -9
View File
@@ -1,4 +1,4 @@
import { execFileSync } from "node:child_process"; import { execFileSync, spawn } from "node:child_process";
import { access, readFile } from "node:fs/promises"; import { access, readFile } from "node:fs/promises";
import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path";
import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas";
@@ -10,6 +10,7 @@ import type {
AgentConfig, AgentConfig,
CasRef, CasRef,
ModeratorContext, ModeratorContext,
RunningThreadsOutput,
StartEntry, StartEntry,
StartNodePayload, StartNodePayload,
StartOutput, StartOutput,
@@ -27,7 +28,12 @@ import type {
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util"; import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv"; import { config as loadDotenv } from "dotenv";
import { parse, stringify } from "yaml"; import { parse, stringify } from "yaml";
import {
createMarker,
deleteMarker,
isThreadRunning,
listRunningThreads,
} from "../background/index.js";
import { import {
appendThreadHistory, appendThreadHistory,
createUwfStore, createUwfStore,
@@ -52,6 +58,7 @@ const PL_AGENT_SPAWN = "R5J2W8N4";
const PL_AGENT_DONE = "C6P9E3H7"; const PL_AGENT_DONE = "C6P9E3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6"; const PL_STEP_ERROR = "B8T5N1V6";
const PL_BACKGROUND_START = "X7Q4W9M2";
function failStep(plog: ProcessLogger, message: string): never { function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null); plog.log(PL_STEP_ERROR, message, null);
@@ -321,6 +328,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId, thread: threadId,
head: activeHead, head: activeHead,
done: false, done: false,
background: null,
}; };
} }
@@ -331,6 +339,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
thread: threadId, thread: threadId,
head: hist.head, head: hist.head,
done: true, done: true,
background: null,
}; };
} }
@@ -853,26 +862,60 @@ export async function cmdThreadStep(
threadId: ThreadId, threadId: ThreadId,
agentOverride: string | null, agentOverride: string | null,
count: number, count: number,
background: boolean,
backgroundWorker: boolean,
): Promise<StepOutput[]> { ): Promise<StepOutput[]> {
if (count < 1 || !Number.isInteger(count)) { if (count < 1 || !Number.isInteger(count)) {
fail(`--count must be a positive integer, got: ${count}`); fail(`--count must be a positive integer, got: ${count}`);
} }
// Check if thread is already running in background (unless we ARE the background worker)
if (!backgroundWorker) {
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
fail(`thread already executing in background (PID: ${runningMarker.pid})`);
}
}
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId); const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
const plog = createProcessLogger({ const plog = createProcessLogger({
storageRoot, storageRoot,
context: { thread: threadId, workflow: workflowHash }, context: { thread: threadId, workflow: workflowHash },
}); });
const results: StepOutput[] = []; if (background && !backgroundWorker) {
for (let i = 0; i < count; i++) { // Spawn background process
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog); return cmdThreadStepBackground(storageRoot, threadId, agentOverride, count, plog, workflowHash);
results.push(result); }
if (result.done) {
break; // If we're the background worker, create marker before execution
let markerCreated = false;
if (backgroundWorker) {
await createMarker(storageRoot, {
thread: threadId,
workflow: workflowHash,
pid: process.pid,
startedAt: Date.now(),
});
markerCreated = true;
}
try {
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
}
}
return results;
} finally {
// Cleanup marker if we created one
if (markerCreated) {
await deleteMarker(storageRoot, threadId);
} }
} }
return results;
} }
async function resolveActiveThreadWorkflowHash( async function resolveActiveThreadWorkflowHash(
@@ -889,6 +932,57 @@ async function resolveActiveThreadWorkflowHash(
return chain.start.workflow; return chain.start.workflow;
} }
async function cmdThreadStepBackground(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
count: number,
plog: ProcessLogger,
workflowHash: CasRef,
): Promise<StepOutput[]> {
// Get current head to return to caller
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
// Spawn detached background process
const scriptPath = process.argv[1];
if (scriptPath === undefined) {
failStep(plog, "unable to determine script path for background execution");
}
const args = ["thread", "step", threadId, "--count", String(count)];
if (agentOverride !== null) {
args.push("--agent", agentOverride);
}
// Internal flag to signal the background worker to create/cleanup markers
args.push("--_background-worker");
plog.log(PL_BACKGROUND_START, `spawning background process count=${count}`, null);
const child = spawn(scriptPath, args, {
detached: true,
stdio: "ignore",
});
child.unref();
// Return immediately with current state and background flag
return [
{
workflow: workflowHash,
thread: threadId,
head: headHash,
done: false,
background: true,
},
];
}
async function cmdThreadStepOnce( async function cmdThreadStepOnce(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
@@ -926,6 +1020,7 @@ async function cmdThreadStepOnce(
thread: threadId, thread: threadId,
head: headHash, head: headHash,
done: true, done: true,
background: null,
}; };
} }
@@ -973,6 +1068,7 @@ async function cmdThreadStepOnce(
thread: threadId, thread: threadId,
head: newHead, head: newHead,
done, done,
background: null,
}; };
} }
@@ -1109,6 +1205,17 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
try {
process.kill(runningMarker.pid, "SIGTERM");
} catch {
// Process may have already exited, ignore error
}
await deleteMarker(storageRoot, threadId);
}
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, head); const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) { if (workflow === null) {
@@ -1128,3 +1235,8 @@ export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Pr
return { thread: threadId, archived: true }; return { thread: threadId, archived: true };
} }
export async function cmdThreadRunning(storageRoot: string): Promise<RunningThreadsOutput> {
const threads = await listRunningThreads(storageRoot);
return { threads };
}
+2
View File
@@ -15,6 +15,8 @@ export type {
ProviderConfig, ProviderConfig,
RoleDefinition, RoleDefinition,
RoleName, RoleName,
RunningThreadItem,
RunningThreadsOutput,
Scenario, Scenario,
StartEntry, StartEntry,
StartNodePayload, StartNodePayload,
+14
View File
@@ -84,6 +84,7 @@ export type StepOutput = {
thread: ThreadId; thread: ThreadId;
head: CasRef; head: CasRef;
done: boolean; done: boolean;
background: boolean | null;
}; };
/** uwf thread steps — single step entry */ /** uwf thread steps — single step entry */
@@ -126,6 +127,19 @@ export type ThreadListItem = {
head: CasRef; head: CasRef;
}; };
/** uwf thread running — single running thread entry */
export type RunningThreadItem = {
thread: ThreadId;
workflow: CasRef;
pid: number;
startedAt: number;
};
/** uwf thread running output */
export type RunningThreadsOutput = {
threads: RunningThreadItem[];
};
// ── 4.6 配置 ──────────────────────────────────────────────────────── // ── 4.6 配置 ────────────────────────────────────────────────────────
/** Alias types for config references */ /** Alias types for config references */