This repository has been archived on 2026-06-08. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
workflow/packages/cli-workflow/src/thread-scan.ts
T
xingyue 8fe26417cf feat(cli): add --latest, --debug, --role flags to live command (#37 Phase 2)
- --latest: auto-find most recent thread by start timestamp
- --debug: display .info.jsonl debug log with tags
- --role: filter output to specific role
- Add live-argv.ts for flag parsing
- Add fixtures and test coverage for all flags

Testing: #50
2026-05-07 21:44:19 +08:00

210 lines
5.6 KiB
TypeScript

import { readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
export type RunningThreadRow = {
threadId: string;
hash: string;
workflowName: string | null;
};
export type HistoricalThreadRow = {
threadId: string;
hash: string;
workflowName: string | null;
};
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
}
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
}
const ts = (parsed as Record<string, unknown>).timestamp;
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
}
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
}
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
}
const name = (parsed as Record<string, unknown>).name;
return typeof name === "string" ? name : null;
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const out: RunningThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".running")) {
continue;
}
const threadId = fileName.slice(0, -".running".length);
const dataPath = join(dir, `${threadId}.data.jsonl`);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
out.push({ threadId, hash, workflowName });
}
}
out.sort((a, b) => {
const ha = `${a.hash}/${a.threadId}`;
const hb = `${b.hash}/${b.threadId}`;
return ha.localeCompare(hb);
});
return out;
}
/**
* Historical threads discovered via `*.data.jsonl`.
* When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned.
*/
export async function listHistoricalThreads(
storageRoot: string,
workflowNameFilter: string | null,
): Promise<HistoricalThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const out: HistoricalThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".data.jsonl")) {
continue;
}
const threadId = fileName.slice(0, -".data.jsonl".length);
const dataPath = join(dir, fileName);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({ threadId, hash, workflowName });
}
}
out.sort((a, b) => {
const ha = `${a.hash}/${a.threadId}`;
const hb = `${b.hash}/${b.threadId}`;
return ha.localeCompare(hb);
});
return out;
}
/**
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
* falling back to file `mtime` when the timestamp is missing.
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
*/
export async function findLatestThreadDataPath(
storageRoot: string,
): Promise<{ threadId: string; dataPath: string } | null> {
const threads = await listHistoricalThreads(storageRoot, null);
if (threads.length === 0) {
return null;
}
let best: {
threadId: string;
dataPath: string;
primary: number;
secondary: number;
} | null = null;
for (const t of threads) {
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
let mtimeMs = 0;
try {
const st = await stat(dataPath);
mtimeMs = st.mtimeMs;
} catch {
continue;
}
const startTs = await readThreadStartTimestampMs(dataPath);
const primary = startTs !== null ? startTs : mtimeMs;
const secondary = mtimeMs;
if (
best === null ||
primary > best.primary ||
(primary === best.primary && secondary > best.secondary)
) {
best = { threadId: t.threadId, dataPath, primary, secondary };
}
}
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
}
export async function resolveThreadDataPath(
storageRoot: string,
threadId: string,
): Promise<string | null> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return null;
}
const hashes = await readdir(logsRoot);
for (const hash of hashes) {
const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`);
if (await pathExists(candidate)) {
return candidate;
}
}
return null;
}