From d6fe3f844cfa80426d95ef8258756b124973488f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 9 May 2026 12:28:33 +0000 Subject: [PATCH] fix: detect crashed threads as failed instead of stuck running MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - resolveThreadListStatus() checks CAS chain for __end__ node - Stale .running markers no longer cause false 'running' status - Distinguish 'failed' (returnCode != 0) from 'completed' - Worker signal handlers (SIGINT/SIGTERM) clean up .running files - listRunningThreads filters out terminated threads with stale markers Fixes #170 小橘 --- .../src/commands/serve/routes-thread.ts | 12 ++- packages/cli-workflow/src/thread-scan.ts | 78 ++++++++++++++++++- packages/cli-workflow/src/worker-spawn.ts | 21 ++++- .../workflow-execute/src/engine/worker.ts | 20 ++++- 4 files changed, 122 insertions(+), 9 deletions(-) diff --git a/packages/cli-workflow/src/commands/serve/routes-thread.ts b/packages/cli-workflow/src/commands/serve/routes-thread.ts index e2955f1..03cc402 100644 --- a/packages/cli-workflow/src/commands/serve/routes-thread.ts +++ b/packages/cli-workflow/src/commands/serve/routes-thread.ts @@ -10,6 +10,7 @@ import type { ResolvedThreadRecord } from "../../thread-scan.js"; import { listHistoricalThreads, listRunningThreads, + resolveThreadListStatus, resolveThreadRecord, } from "../../thread-scan.js"; import { cmdKill, cmdPause, cmdResume } from "../thread/control.js"; @@ -61,7 +62,12 @@ async function buildThreadDetailRecords( const returnCode = fr.payload.meta.returnCode; const summary = fr.payload.meta.summary; if (typeof returnCode === "number" && typeof summary === "string") { - records.push({ type: "workflow-result", returnCode, content: summary, timestamp: fr.payload.timestamp }); + records.push({ + type: "workflow-result", + returnCode, + content: summary, + timestamp: fr.payload.timestamp, + }); } continue; } @@ -92,8 +98,8 @@ export function createThreadRoutes(storageRoot: string): Hono { const threads = await Promise.all( rows.map(async (r) => { const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`); - const isRunning = await pathExists(runningPath); - const status = r.source === "history" ? "completed" : isRunning ? "running" : "active"; + const runningMarkerPresent = await pathExists(runningPath); + const status = await resolveThreadListStatus(storageRoot, r, runningMarkerPresent); return { threadId: r.threadId, workflow: r.workflowName, diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts index 17b9659..6e161de 100644 --- a/packages/cli-workflow/src/thread-scan.ts +++ b/packages/cli-workflow/src/thread-scan.ts @@ -5,7 +5,9 @@ import { readThreadsIndex, type ThreadHistoryEntry, type ThreadIndex, + walkStateFramesNewestFirst, } from "@uncaged/workflow-execute"; +import { END } from "@uncaged/workflow-runtime"; import { getGlobalCasDir } from "@uncaged/workflow-util"; import { pathExists, readTextFileIfExists } from "./fs-utils.js"; @@ -98,6 +100,8 @@ export type HistoricalThreadRow = { source: "active" | "history"; /** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */ activityTs: number; + /** Current CAS head (`threads.json` / history row). */ + head: string; }; export type ResolvedThreadRecord = { @@ -172,6 +176,73 @@ export async function resolveThreadRecord( return null; } +export type ThreadHeadTerminal = + | { kind: "non-terminal" } + | { kind: "terminal"; returnCode: number }; + +/** True when the newest frame at `headHash` is `__end__` (workflow finished in CAS). */ +export async function readThreadTerminalFromHead( + storageRoot: string, + headHash: string, +): Promise { + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, headHash); + const newest = frames[0]; + if (newest === undefined) { + return { kind: "non-terminal" }; + } + if (newest.payload.role !== END) { + return { kind: "non-terminal" }; + } + const rc = newest.payload.meta.returnCode; + if (typeof rc !== "number") { + return { kind: "terminal", returnCode: 1 }; + } + return { kind: "terminal", returnCode: rc }; +} + +export type ThreadListStatus = "running" | "active" | "completed" | "failed"; + +/** Combines `.running` marker with CAS head: stale markers do not imply `running`. */ +export async function resolveThreadListStatus( + storageRoot: string, + row: HistoricalThreadRow, + runningMarkerPresent: boolean, +): Promise { + const terminal = await readThreadTerminalFromHead(storageRoot, row.head); + if (terminal.kind === "terminal") { + return terminal.returnCode !== 0 ? "failed" : "completed"; + } + if (row.source === "history") { + return "completed"; + } + if (runningMarkerPresent) { + return "running"; + } + return "active"; +} + +async function appendRunningThreadRowIfLive( + storageRoot: string, + hash: string, + threadId: string, + out: RunningThreadRow[], +): Promise { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved !== null && resolved.bundleHash !== hash) { + return; + } + if (resolved !== null) { + const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head); + if (terminal.kind === "terminal") { + return; + } + } + const workflowName = + resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null; + out.push({ threadId, hash, workflowName }); +} + /** Threads currently executing — identified via `.running` markers. */ export async function listRunningThreads(storageRoot: string): Promise { const logsRoot = join(storageRoot, "logs"); @@ -196,10 +267,7 @@ export async function listRunningThreads(storageRoot: string): Promise { let activeThreads = 0; let shutdownTimer: ReturnType | null = null; + function cleanupAllRunningMarkersSync(): void { + for (const threadId of threads.keys()) { + try { + unlinkSync(join(storageRoot, "logs", hash, `${threadId}.running`)); + } catch { + // ignore missing file or other fs errors + } + } + } + + for (const sig of ["SIGINT", "SIGTERM"] as const) { + process.on(sig, () => { + cleanupAllRunningMarkersSync(); + process.exit(sig === "SIGINT" ? 130 : 143); + }); + } + const cas = createCasStore(getGlobalCasDir(storageRoot)); const workerCtlPath = join(storageRoot, "workers", `${hash}.json`); @@ -498,8 +516,8 @@ async function main(): Promise { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); } finally { - threads.delete(threadId); await unlink(runningPath).catch(() => {}); + threads.delete(threadId); bumpDone(); socket?.end(); }