fix: detect crashed threads as failed instead of stuck running

- resolveThreadListStatus() checks CAS chain for __end__ node
- Stale .running markers no longer cause false 'running' status
- Distinguish 'failed' (returnCode != 0) from 'completed'
- Worker signal handlers (SIGINT/SIGTERM) clean up .running files
- listRunningThreads filters out terminated threads with stale markers

Fixes #170

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-09 12:28:33 +00:00
parent d0803019b5
commit d6fe3f844c
4 changed files with 122 additions and 9 deletions
@@ -10,6 +10,7 @@ import type { ResolvedThreadRecord } from "../../thread-scan.js";
import {
listHistoricalThreads,
listRunningThreads,
resolveThreadListStatus,
resolveThreadRecord,
} from "../../thread-scan.js";
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
@@ -61,7 +62,12 @@ async function buildThreadDetailRecords(
const returnCode = fr.payload.meta.returnCode;
const summary = fr.payload.meta.summary;
if (typeof returnCode === "number" && typeof summary === "string") {
records.push({ type: "workflow-result", returnCode, content: summary, timestamp: fr.payload.timestamp });
records.push({
type: "workflow-result",
returnCode,
content: summary,
timestamp: fr.payload.timestamp,
});
}
continue;
}
@@ -92,8 +98,8 @@ export function createThreadRoutes(storageRoot: string): Hono {
const threads = await Promise.all(
rows.map(async (r) => {
const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`);
const isRunning = await pathExists(runningPath);
const status = r.source === "history" ? "completed" : isRunning ? "running" : "active";
const runningMarkerPresent = await pathExists(runningPath);
const status = await resolveThreadListStatus(storageRoot, r, runningMarkerPresent);
return {
threadId: r.threadId,
workflow: r.workflowName,
+74 -4
View File
@@ -5,7 +5,9 @@ import {
readThreadsIndex,
type ThreadHistoryEntry,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
@@ -98,6 +100,8 @@ export type HistoricalThreadRow = {
source: "active" | "history";
/** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */
activityTs: number;
/** Current CAS head (`threads.json` / history row). */
head: string;
};
export type ResolvedThreadRecord = {
@@ -172,6 +176,73 @@ export async function resolveThreadRecord(
return null;
}
export type ThreadHeadTerminal =
| { kind: "non-terminal" }
| { kind: "terminal"; returnCode: number };
/** True when the newest frame at `headHash` is `__end__` (workflow finished in CAS). */
export async function readThreadTerminalFromHead(
storageRoot: string,
headHash: string,
): Promise<ThreadHeadTerminal> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, headHash);
const newest = frames[0];
if (newest === undefined) {
return { kind: "non-terminal" };
}
if (newest.payload.role !== END) {
return { kind: "non-terminal" };
}
const rc = newest.payload.meta.returnCode;
if (typeof rc !== "number") {
return { kind: "terminal", returnCode: 1 };
}
return { kind: "terminal", returnCode: rc };
}
export type ThreadListStatus = "running" | "active" | "completed" | "failed";
/** Combines `.running` marker with CAS head: stale markers do not imply `running`. */
export async function resolveThreadListStatus(
storageRoot: string,
row: HistoricalThreadRow,
runningMarkerPresent: boolean,
): Promise<ThreadListStatus> {
const terminal = await readThreadTerminalFromHead(storageRoot, row.head);
if (terminal.kind === "terminal") {
return terminal.returnCode !== 0 ? "failed" : "completed";
}
if (row.source === "history") {
return "completed";
}
if (runningMarkerPresent) {
return "running";
}
return "active";
}
async function appendRunningThreadRowIfLive(
storageRoot: string,
hash: string,
threadId: string,
out: RunningThreadRow[],
): Promise<void> {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null && resolved.bundleHash !== hash) {
return;
}
if (resolved !== null) {
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return;
}
}
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
@@ -196,10 +267,7 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
continue;
}
const threadId = fileName.slice(0, -".running".length);
const resolved = await resolveThreadRecord(storageRoot, threadId);
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
await appendRunningThreadRowIfLive(storageRoot, hash, threadId, out);
}
}
@@ -253,6 +321,7 @@ export async function listHistoricalThreads(
workflowName,
source: "active",
activityTs: entry.updatedAt,
head: entry.head,
});
}
@@ -287,6 +356,7 @@ export async function listHistoricalThreads(
workflowName,
source: "history",
activityTs: e.completedAt,
head: e.head,
});
}
}
+20 -1
View File
@@ -6,6 +6,7 @@ import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
import { readThreadTerminalFromHead, resolveThreadRecord } from "./thread-scan.js";
export type WorkerCtl = {
pid: number;
@@ -269,7 +270,25 @@ export async function resolveRunningHashForThread(
if (!(await pathExists(logsRoot))) {
return err(`thread not running (no logs dir): ${threadId}`);
}
const hashes = await readdir(logsRoot);
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null) {
const runningPath = join(logsRoot, resolved.bundleHash, `${threadId}.running`);
if (!(await pathExists(runningPath))) {
return err(`thread not running: ${threadId}`);
}
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return err(`thread not running: ${threadId}`);
}
return ok(resolved.bundleHash);
}
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch {
return err(`thread not running: ${threadId}`);
}
for (const hash of hashes) {
const runningPath = join(logsRoot, hash, `${threadId}.running`);
if (await pathExists(runningPath)) {
+19 -1
View File
@@ -1,3 +1,4 @@
import { unlinkSync } from "node:fs";
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
@@ -382,6 +383,23 @@ async function main(): Promise<void> {
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
function cleanupAllRunningMarkersSync(): void {
for (const threadId of threads.keys()) {
try {
unlinkSync(join(storageRoot, "logs", hash, `${threadId}.running`));
} catch {
// ignore missing file or other fs errors
}
}
}
for (const sig of ["SIGINT", "SIGTERM"] as const) {
process.on(sig, () => {
cleanupAllRunningMarkersSync();
process.exit(sig === "SIGINT" ? 130 : 143);
});
}
const cas = createCasStore(getGlobalCasDir(storageRoot));
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
@@ -498,8 +516,8 @@ async function main(): Promise<void> {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
threads.delete(threadId);
bumpDone();
socket?.end();
}