diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl index 5076f72..96fc47d 100644 --- a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl @@ -1,4 +1,4 @@ -{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} -{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} -{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000} +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963400000} +{"role":"planner","contentHash":"FF7YQ5W3S2EV6","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} +{"role":"coder","contentHash":"EN34XX1W4WAFJ","meta":{},"refs":[],"timestamp":1714963202000} {"returnCode":0,"summary":"fixture completed"} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl new file mode 100644 index 0000000..d9e2d29 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl @@ -0,0 +1,2 @@ +{"tag":"DEBUGTAG1","content":"bundle loaded","timestamp":1714963400050} +{"tag":"DEBUGTAG2","content":"multi\nline","timestamp":1714963400500} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl index 7ad5878..fb9cdec 100644 --- a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl @@ -1,2 +1,2 @@ {"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} -{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000} +{"role":"planner","contentHash":"P6M9FHE1GSBN0","meta":{"x":1},"refs":[],"timestamp":1714963201000} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..63ec7df --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,2 @@ +{"name":"demo-live-old","hash":"C9NMV6V2TQT81","threadId":"01LIVEOLDER01DDDDDDDDDDDDG","parameters":{"prompt":"old","options":{"maxRounds":5,"depth":0}},"timestamp":1714963000000} +{"returnCode":0,"summary":"older thread"} diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 6f9c88b..a0fe69d 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -134,10 +134,10 @@ describe("cli fork", () => { expect(start.threadId).toBe(newId); expect(start.forkFrom).toEqual({ threadId: sourceId }); - const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; - expect(last.role).toBe("reviewer"); + const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; + expect(lastRoleLine.role).toBe("reviewer"); const cas = createCasStore(getGlobalCasDir(storageRoot)); - expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1"); + expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1"); }); test("fork without --from-role retries last role", async () => { @@ -187,9 +187,9 @@ describe("cli fork", () => { const cas = createCasStore(getGlobalCasDir(storageRoot)); expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1"); - const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; - expect(last.role).toBe("reviewer"); - expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2"); + const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; + expect(lastRoleLine.role).toBe("reviewer"); + expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2"); }); test("fork rejects unknown role with available names", async () => { diff --git a/packages/cli-workflow/__tests__/live.test.ts b/packages/cli-workflow/__tests__/live.test.ts index 38d9ca3..7d44cda 100644 --- a/packages/cli-workflow/__tests__/live.test.ts +++ b/packages/cli-workflow/__tests__/live.test.ts @@ -5,22 +5,37 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; +import { createCasStore, getGlobalCasDir, putContentMerkleNode } from "@uncaged/workflow"; + import { + formatLiveDebugLine, formatLiveTimeLabel, LIVE_CONTENT_MAX_LINES, type LiveRoleRow, renderLiveRoleStepLines, } from "../src/cmd-live.js"; +import { parseLiveArgv } from "../src/live-argv.js"; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url)); +/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */ +const LIVE_FIXTURE_PLANNER_BODY = + "alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11"; + describe("live helpers", () => { test("formatLiveTimeLabel pads HH:MM:SS", () => { const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime()); expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/); }); + test("formatLiveDebugLine flattens newlines in message", () => { + const line = formatLiveDebugLine(0, "TAG1", "a\nb"); + expect(line).toContain("[TAG1]"); + expect(line).toContain("a b"); + expect(line).not.toContain("\n"); + }); + test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => { const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`); const row: LiveRoleRow = { @@ -37,6 +52,31 @@ describe("live helpers", () => { }); }); +describe("parseLiveArgv", () => { + test("parses thread id and flags in any order", () => { + const a = parseLiveArgv(["01ABC", "--debug", "--role", "planner"]); + expect(a.ok).toBe(true); + if (a.ok) { + expect(a.value.threadId).toBe("01ABC"); + expect(a.value.latest).toBe(false); + expect(a.value.debug).toBe(true); + expect(a.value.role).toBe("planner"); + } + const b = parseLiveArgv(["--latest", "--role", "x"]); + expect(b.ok).toBe(true); + if (b.ok) { + expect(b.value.latest).toBe(true); + expect(b.value.threadId).toBe(null); + expect(b.value.role).toBe("x"); + } + }); + + test("rejects --latest with thread id", () => { + const r = parseLiveArgv(["--latest", "01ABC"]); + expect(r.ok).toBe(false); + }); +}); + describe("live CLI", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -50,10 +90,23 @@ describe("live CLI", () => { join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), + ); await cp( join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), + ); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY); + await putContentMerkleNode(cas, "patch"); + await putContentMerkleNode(cas, "still running"); }); afterEach(async () => { @@ -100,6 +153,135 @@ describe("live CLI", () => { expect(stdout).toContain("fixture completed"); }); + test("--latest tails the newest thread by start timestamp", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("fixture completed"); + expect(stdout).not.toContain("older thread"); + }); + + test("--debug prints .info.jsonl records after data output", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("[DEBUGTAG1]"); + expect(stdout).toContain("bundle loaded"); + expect(stdout).toContain("[DEBUGTAG2]"); + expect(stdout).toContain("multi line"); + }); + + test("--role filters out non-matching roles", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).not.toContain("patch"); + expect(stdout).toContain("completed: returnCode=0"); + }); + + test("--latest --debug --role combine", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "--latest", "--debug", "--role", "planner"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("[DEBUGTAG1]"); + expect(stdout).toContain("planner"); + expect(stdout).not.toContain("patch"); + expect(stdout).toContain("fixture completed"); + }); + test("unknown thread id exits 1", () => { const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], { @@ -126,7 +308,11 @@ describe("live CLI", () => { await new Promise((r) => setTimeout(r, 120)); const prior = await readFile(dataPath, "utf8"); - await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8"); + await writeFile( + dataPath, + `${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`, + "utf8", + ); const stdout = await new Promise((resolve, reject) => { let buf = ""; @@ -151,3 +337,33 @@ describe("live CLI", () => { expect(stdout).toContain("caught up"); }); }); + +describe("live --latest with empty storage", () => { + let prevEnv: string | undefined; + let emptyRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + emptyRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-empty-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = emptyRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(emptyRoot, { recursive: true, force: true }); + }); + + test("exits 1 when no threads exist", () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: emptyRoot }; + const r = spawnSync(process.execPath, [cliEntryPath, "live", "--latest"], { + env, + encoding: "utf8", + }); + expect(r.status).toBe(1); + expect(String(r.stderr ?? "")).toContain("no threads"); + }); +}); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 47828c0..7f676c2 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -17,6 +17,7 @@ import { cmdRun } from "./cmd-run.js"; import { cmdShow, formatShowYaml } from "./cmd-show.js"; import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js"; import { cmdThreads } from "./cmd-threads.js"; +import { parseLiveArgv } from "./live-argv.js"; import { parseRunArgv } from "./run-argv.js"; export function formatCliUsage(): string { @@ -29,7 +30,8 @@ export function formatCliUsage(): string { " uncaged-workflow run [--prompt ] [--max-rounds N]", " uncaged-workflow ps", " uncaged-workflow kill ", - " uncaged-workflow live ", + " uncaged-workflow live [--debug] [--role ]", + " uncaged-workflow live --latest [--debug] [--role ]", " uncaged-workflow history ", " uncaged-workflow rollback [hash]", " uncaged-workflow pause ", @@ -193,12 +195,12 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise { - const threadId = argv[0]; - if (threadId === undefined || argv.length > 1) { - printCliError(`${usage()}\n\nerror: live requires `); + const parsed = parseLiveArgv(argv); + if (!parsed.ok) { + printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`); return 1; } - return cmdLive(storageRoot, threadId); + return cmdLive(storageRoot, parsed.value); } async function dispatchHistory(storageRoot: string, argv: string[]): Promise { diff --git a/packages/cli-workflow/src/cmd-live.ts b/packages/cli-workflow/src/cmd-live.ts index 468fae3..ccbfcba 100644 --- a/packages/cli-workflow/src/cmd-live.ts +++ b/packages/cli-workflow/src/cmd-live.ts @@ -1,14 +1,21 @@ import { watch } from "node:fs"; import { readFile } from "node:fs/promises"; +import { dirname, join } from "node:path"; import { + type CasStore, + createCasStore, + getContentMerklePayload, + getGlobalCasDir, tryParseRoleStepRecord, tryParseWorkflowResultRecord, - type WorkflowResult, + type WorkflowCompletion, } from "@uncaged/workflow"; import { printCliError, printCliLine } from "./cli-output.js"; -import { resolveThreadDataPath } from "./thread-scan.js"; +import { pathExists } from "./fs-utils.js"; +import type { ParsedLiveArgv } from "./live-argv.js"; +import { findLatestThreadDataPath, resolveThreadDataPath } from "./thread-scan.js"; export const LIVE_CONTENT_MAX_LINES = 10; @@ -38,6 +45,18 @@ function highlightLiveRole(name: string): string { return `\x1b[1m\x1b[36m${name}\x1b[0m`; } +function dimGreyLine(line: string): string { + if (!shouldUseColor()) { + return line; + } + return `\x1b[2m\x1b[90m${line}\x1b[0m`; +} + +export function formatLiveDebugLine(timestampMs: number, tag: string, message: string): string { + const label = `[${formatLiveTimeLabel(timestampMs)}] [${tag}] ${message.replace(/\n/g, " ")}`; + return dimGreyLine(label); +} + export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] { const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`; const lines: string[] = [header]; @@ -54,7 +73,7 @@ export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): return lines; } -function printSummary(result: WorkflowResult): void { +function printSummary(result: WorkflowCompletion): void { printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`); } @@ -65,10 +84,36 @@ type LiveSessionState = { contentOffset: number; }; -function handleJsonlLine( +type InfoLiveState = { + carry: string; + contentOffset: number; +}; + +function tryParseInfoRecord(obj: Record): { + tag: string; + content: string; + timestamp: number; +} | null { + const tag = obj.tag; + const content = obj.content; + const timestamp = obj.timestamp; + if ( + typeof tag !== "string" || + typeof content !== "string" || + typeof timestamp !== "number" || + !Number.isFinite(timestamp) + ) { + return null; + } + return { tag, content, timestamp }; +} + +async function handleJsonlLine( rawLine: string, state: LiveSessionState, -): { parseError: string | null; workflowResult: WorkflowResult | null } { + roleFilter: string | null, + cas: CasStore, +): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> { const trimmed = rawLine.trim(); if (trimmed === "") { return { parseError: null, workflowResult: null }; @@ -104,9 +149,17 @@ function handleJsonlLine( }; } + if (roleFilter !== null && roleRow.role !== roleFilter) { + return { parseError: null, workflowResult: null }; + } + + const payload = await getContentMerklePayload(cas, roleRow.contentHash); + const content = + payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`; + const row: LiveRoleRow = { role: roleRow.role, - content: roleRow.content, + content, meta: roleRow.meta, timestamp: roleRow.timestamp, }; @@ -116,7 +169,12 @@ function handleJsonlLine( return { parseError: null, workflowResult: null }; } -async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise { +async function pumpNewContent( + dataPath: string, + state: LiveSessionState, + roleFilter: string | null, + cas: CasStore, +): Promise { let text: string; try { text = await readFile(dataPath, "utf8"); @@ -137,7 +195,7 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis state.carry = parts.pop() ?? ""; for (const line of parts) { - const { parseError, workflowResult } = handleJsonlLine(line, state); + const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas); if (parseError !== null) { printCliError(parseError); return 1; @@ -151,12 +209,76 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis return null; } -function watchLiveFile(params: { - dataPath: string; - state: LiveSessionState; - signal: AbortSignal; -}): Promise { - const { dataPath, state, signal } = params; +async function pumpNewInfoContent(infoPath: string, state: InfoLiveState): Promise { + let text: string; + try { + text = await readFile(infoPath, "utf8"); + } catch { + return; + } + + if (text.length < state.contentOffset) { + state.contentOffset = 0; + state.carry = ""; + } + + const chunk = text.slice(state.contentOffset); + state.contentOffset = text.length; + state.carry += chunk; + + const parts = state.carry.split("\n"); + state.carry = parts.pop() ?? ""; + + for (const line of parts) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + let rec: unknown; + try { + rec = JSON.parse(trimmed) as unknown; + } catch { + continue; + } + if (rec === null || typeof rec !== "object") { + continue; + } + const parsed = tryParseInfoRecord(rec as Record); + if (parsed === null) { + continue; + } + printCliLine(formatLiveDebugLine(parsed.timestamp, parsed.tag, parsed.content)); + } +} + +type WatchPumpTask = { + path: string; + pump: () => Promise; +}; + +async function runWatchPumpStep( + settled: () => boolean, + pump: () => Promise, + closeAll: () => void, + finish: (code: number) => void, +): Promise { + if (settled()) { + return; + } + try { + const code = await pump(); + if (code !== null) { + closeAll(); + finish(code); + } + } catch (e) { + closeAll(); + throw e instanceof Error ? e : new Error(String(e)); + } +} + +function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }): Promise { + const { tasks, signal } = params; return new Promise((resolve, reject) => { let settled = false; @@ -168,65 +290,138 @@ function watchLiveFile(params: { resolve(code); }; - /** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */ - let pumpChain: Promise = Promise.resolve(); + const pumpChains = new Map>(); + for (const t of tasks) { + pumpChains.set(t.path, Promise.resolve()); + } - const watcher = watch(dataPath, (eventType) => { - if (eventType === "rename") { - return; + const watchers: ReturnType[] = []; + + const closeAll = (): void => { + for (const w of watchers) { + w.close(); } - schedulePump(); - }); + }; - watcher.on("error", (err: Error) => { - watcher.close(); - reject(err); - }); + function schedulePump(path: string, pump: () => Promise): void { + const prev = pumpChains.get(path) ?? Promise.resolve(); + const next = (async () => { + await prev; + await runWatchPumpStep(() => settled, pump, closeAll, finish); + })(); + pumpChains.set(path, next); + } + + for (const { path, pump } of tasks) { + const watcher = watch(path, (eventType) => { + if (eventType === "rename") { + return; + } + schedulePump(path, pump); + }); + watchers.push(watcher); + watcher.on("error", (err: Error) => { + closeAll(); + reject(err); + }); + } const onAbort = (): void => { - watcher.close(); + closeAll(); finish(0); }; signal.addEventListener("abort", onAbort, { once: true }); - async function drainQueuedPump(): Promise { - if (settled) { - return; - } - try { - const code = await pumpNewContent(dataPath, state); - if (code !== null) { - watcher.close(); - finish(code); - } - } catch (e) { - watcher.close(); - reject(e instanceof Error ? e : new Error(String(e))); - } + for (const { path, pump } of tasks) { + schedulePump(path, pump); } - - function schedulePump(): void { - pumpChain = pumpChain.then(() => drainQueuedPump()); - } - - schedulePump(); }); } -export async function cmdLive(storageRoot: string, threadId: string): Promise { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - printCliError(`thread not found: ${threadId}`); +type LiveThreadTarget = { + threadId: string; + dataPath: string; +}; + +async function resolveLiveThreadTarget( + storageRoot: string, + parsed: ParsedLiveArgv, +): Promise { + if (parsed.latest) { + const found = await findLatestThreadDataPath(storageRoot); + if (found === null) { + printCliError("live: no threads found"); + return null; + } + return found; + } + + const id = parsed.threadId; + if (id === null) { + printCliError("live: internal error: missing thread id"); + return null; + } + const resolved = await resolveThreadDataPath(storageRoot, id); + if (resolved === null) { + printCliError(`thread not found: ${id}`); + return null; + } + return { threadId: id, dataPath: resolved }; +} + +async function buildLiveWatchTasks(params: { + dataPath: string; + infoPath: string; + debug: boolean; + dataState: LiveSessionState; + infoState: InfoLiveState; + roleFilter: string | null; + cas: CasStore; +}): Promise { + const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params; + const tasks: WatchPumpTask[] = [ + { + path: dataPath, + pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas), + }, + ]; + + if (debug && (await pathExists(infoPath))) { + tasks.push({ + path: infoPath, + pump: async () => { + await pumpNewInfoContent(infoPath, infoState); + return null; + }, + }); + } + + return tasks; +} + +export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Promise { + const target = await resolveLiveThreadTarget(storageRoot, parsed); + if (target === null) { return 1; } - const state: LiveSessionState = { + const { threadId, dataPath } = target; + const roleFilter = parsed.role; + const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`); + const cas = createCasStore(getGlobalCasDir(storageRoot)); + + const dataState: LiveSessionState = { sawStart: false, completed: false, carry: "", contentOffset: 0, }; + const infoState: InfoLiveState = { + carry: "", + contentOffset: 0, + }; + const controller = new AbortController(); const onSigInt = (): void => { controller.abort(); @@ -234,16 +429,30 @@ export async function cmdLive(storageRoot: string, threadId: string): Promise { + const a = argv[i]; + if (a === "--latest") { + s.latest = true; + return ok(i + 1); + } + if (a === "--debug") { + s.debug = true; + return ok(i + 1); + } + if (a === "--role") { + const v = argv[i + 1]; + if (v === undefined || v.startsWith("--")) { + return err("missing value for --role"); + } + s.role = v; + return ok(i + 2); + } + if (a.startsWith("--")) { + return err(`unknown live flag: ${a}`); + } + if (s.threadId !== null) { + return err("unexpected extra argument"); + } + s.threadId = a; + return ok(i + 1); +} + +export function parseLiveArgv(argv: string[]): Result { + const s: LiveArgvScan = { + latest: false, + debug: false, + role: null, + threadId: null, + }; + + let i = 0; + while (i < argv.length) { + const step = applyLiveArgvToken(argv, i, s); + if (!step.ok) { + return step; + } + i = step.value; + } + + if (s.latest && s.threadId !== null) { + return err("live --latest does not take "); + } + if (!s.latest && s.threadId === null) { + return err("live requires or --latest"); + } + + return ok({ + threadId: s.threadId, + latest: s.latest, + debug: s.debug, + role: s.role, + }); +} diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts index df06731..0cf2951 100644 --- a/packages/cli-workflow/src/thread-scan.ts +++ b/packages/cli-workflow/src/thread-scan.ts @@ -1,4 +1,4 @@ -import { readdir } from "node:fs/promises"; +import { readdir, stat } from "node:fs/promises"; import { join } from "node:path"; import { pathExists, readTextFileIfExists } from "./fs-utils.js"; @@ -15,6 +15,28 @@ export type HistoricalThreadRow = { workflowName: string | null; }; +async function readThreadStartTimestampMs(dataPath: string): Promise { + const text = await readTextFileIfExists(dataPath); + if (text === null) { + return null; + } + const firstLine = text.split("\n")[0]; + if (firstLine === undefined || firstLine.trim() === "") { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(firstLine) as unknown; + } catch { + return null; + } + if (parsed === null || typeof parsed !== "object") { + return null; + } + const ts = (parsed as Record).timestamp; + return typeof ts === "number" && Number.isFinite(ts) ? ts : null; +} + async function readWorkflowNameFromDataJsonl(dataPath: string): Promise { const text = await readTextFileIfExists(dataPath); if (text === null) { @@ -124,6 +146,50 @@ export async function listHistoricalThreads( return out; } +/** + * Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`, + * falling back to file `mtime` when the timestamp is missing. + * Tie-breaker: larger `mtime` wins when start timestamps are equal. + */ +export async function findLatestThreadDataPath( + storageRoot: string, +): Promise<{ threadId: string; dataPath: string } | null> { + const threads = await listHistoricalThreads(storageRoot, null); + if (threads.length === 0) { + return null; + } + + let best: { + threadId: string; + dataPath: string; + primary: number; + secondary: number; + } | null = null; + + for (const t of threads) { + const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`); + let mtimeMs = 0; + try { + const st = await stat(dataPath); + mtimeMs = st.mtimeMs; + } catch { + continue; + } + const startTs = await readThreadStartTimestampMs(dataPath); + const primary = startTs !== null ? startTs : mtimeMs; + const secondary = mtimeMs; + if ( + best === null || + primary > best.primary || + (primary === best.primary && secondary > best.secondary) + ) { + best = { threadId: t.threadId, dataPath, primary, secondary }; + } + } + + return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath }; +} + export async function resolveThreadDataPath( storageRoot: string, threadId: string, diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index 445381f..8a6aa76 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -1,6 +1,6 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; -import type { RoleOutput, WorkflowResult } from "./types.js"; +import type { RoleOutput, WorkflowCompletion } from "./types.js"; /** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ export type ForkHistoricalStep = RoleOutput & { timestamp: number }; @@ -14,8 +14,10 @@ export type ParsedThreadStartRecord = { depth: number; }; -/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */ -export function tryParseWorkflowResultRecord(obj: Record): WorkflowResult | null { +/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */ +export function tryParseWorkflowResultRecord( + obj: Record, +): WorkflowCompletion | null { if (obj.role !== undefined) { return null; } diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 574aaf4..0edbbde 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -422,7 +422,7 @@ async function main(): Promise { } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); - const failure: WorkflowResult = { returnCode: 1, summary: message }; + const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" }; await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {}); } finally { threads.delete(threadId);