From 990200230be29b09652ef8e31486b9755518604d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Thu, 7 May 2026 21:30:22 +0800 Subject: [PATCH 1/2] feat(cli): add live command for real-time thread monitoring (#37 Phase 1) - Add cmd-live.ts: tail .data.jsonl with formatted output - Display role steps with timestamp, role name, truncated content, meta - fs.watch for running threads, auto-exit on completion - Write WorkflowResult to .data.jsonl in worker.ts for completion detection - Add live.test.ts with JSONL fixtures Testing: #49 --- .../01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl | 4 + .../01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl | 2 + .../cli-workflow/__tests__/fork-cli.test.ts | 14 +- packages/cli-workflow/__tests__/live.test.ts | 153 +++++++++++ .../cli-workflow/__tests__/thread-cli.test.ts | 6 +- packages/cli-workflow/src/cli-dispatch.ts | 12 + packages/cli-workflow/src/cmd-live.ts | 254 ++++++++++++++++++ .../workflow/__tests__/fork-thread.test.ts | 20 ++ packages/workflow/__tests__/worker.test.ts | 6 +- packages/workflow/src/fork-thread.ts | 53 +++- packages/workflow/src/index.ts | 2 + packages/workflow/src/worker.ts | 9 +- 12 files changed, 508 insertions(+), 27 deletions(-) create mode 100644 packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl create mode 100644 packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl create mode 100644 packages/cli-workflow/__tests__/live.test.ts create mode 100644 packages/cli-workflow/src/cmd-live.ts diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..5076f72 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,4 @@ +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} +{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} +{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000} +{"returnCode":0,"summary":"fixture completed"} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..7ad5878 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,2 @@ +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} +{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000} diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 9dbc64e..6f9c88b 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -111,7 +111,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const forked = await cmdFork(storageRoot, sourceId, "planner"); expect(forked.ok).toBe(true); @@ -122,14 +122,14 @@ describe("cli fork", () => { const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 4); + await waitUntilMinDataLines(newData, 5); const text = await readFile(newData, "utf8"); const lines = text .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(4); + expect(lines.length).toBe(5); const start = JSON.parse(lines[0] ?? "{}") as Record; expect(start.threadId).toBe(newId); expect(start.forkFrom).toEqual({ threadId: sourceId }); @@ -162,7 +162,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const forked = await cmdFork(storageRoot, sourceId, null); expect(forked.ok).toBe(true); @@ -173,14 +173,14 @@ describe("cli fork", () => { const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 4); + await waitUntilMinDataLines(newData, 5); const text = await readFile(newData, "utf8"); const lines = text .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(4); + expect(lines.length).toBe(5); const replayCoder = JSON.parse(lines[2] ?? "{}") as Record; expect(replayCoder.role).toBe("coder"); @@ -213,7 +213,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const bad = await cmdFork(storageRoot, sourceId, "ghost-role"); expect(bad.ok).toBe(false); diff --git a/packages/cli-workflow/__tests__/live.test.ts b/packages/cli-workflow/__tests__/live.test.ts new file mode 100644 index 0000000..38d9ca3 --- /dev/null +++ b/packages/cli-workflow/__tests__/live.test.ts @@ -0,0 +1,153 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { spawn, spawnSync } from "node:child_process"; +import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import { + formatLiveTimeLabel, + LIVE_CONTENT_MAX_LINES, + type LiveRoleRow, + renderLiveRoleStepLines, +} from "../src/cmd-live.js"; + +const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); +const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url)); + +describe("live helpers", () => { + test("formatLiveTimeLabel pads HH:MM:SS", () => { + const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime()); + expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/); + }); + + test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => { + const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`); + const row: LiveRoleRow = { + role: "r", + content: lines.join("\n"), + meta: { k: "v" }, + timestamp: 0, + }; + const out = renderLiveRoleStepLines(row, "r"); + const body = out.filter((l) => l.startsWith(" L")); + expect(body.length).toBe(LIVE_CONTENT_MAX_LINES); + expect(out.some((l) => l.includes("more line"))).toBe(true); + expect(out.some((l) => l.startsWith(" meta: "))).toBe(true); + }); +}); + +describe("live CLI", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true }); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), + ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), + ); + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("prints role steps and summary for a completed thread", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).toContain("coder"); + expect(stdout).toContain("meta:"); + expect(stdout).toContain('"phase":"plan"'); + expect(stdout).toContain("LINE10"); + expect(stdout).not.toContain("LINE11"); + expect(stdout).toContain("more line"); + expect(stdout).toContain("completed: returnCode=0"); + expect(stdout).toContain("fixture completed"); + }); + + test("unknown thread id exits 1", () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], { + env, + encoding: "utf8", + }); + expect(r.status).toBe(1); + expect(String(r.stderr ?? "")).toContain("thread not found"); + }); + + test("follows file until WorkflowResult is appended", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const dataPath = join( + storageRoot, + "logs", + "C9NMV6V2TQT81", + "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl", + ); + + const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + + await new Promise((r) => setTimeout(r, 120)); + const prior = await readFile(dataPath, "utf8"); + await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8"); + + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).toContain("completed: returnCode=0"); + expect(stdout).toContain("caught up"); + }); +}); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 8e76b04..779a205 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -323,7 +323,7 @@ describe("cli thread commands", () => { .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(2); + expect(lines.length).toBe(3); const runningPath = join(dirname(dataPath), `${threadId}.running`); expect(await pathExists(runningPath)).toBe(false); @@ -362,8 +362,8 @@ describe("cli thread commands", () => { const resumed = await cmdResume(storageRoot, threadId); expect(resumed.ok).toBe(true); - await waitUntilMinDataLines(dataPath, 3, 120); - expect(await countDataJsonlLines(dataPath)).toBe(3); + await waitUntilMinDataLines(dataPath, 4, 120); + expect(await countDataJsonlLines(dataPath)).toBe(4); const runningPath = join(dirname(dataPath), `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index d40fe07..47828c0 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -7,6 +7,7 @@ import { cmdHistory } from "./cmd-history.js"; import { cmdInitTemplate, cmdInitWorkspace } from "./cmd-init.js"; import { cmdKill } from "./cmd-kill.js"; import { cmdList, formatListLines } from "./cmd-list.js"; +import { cmdLive } from "./cmd-live.js"; import { cmdPause } from "./cmd-pause.js"; import { cmdPs } from "./cmd-ps.js"; import { cmdRemove } from "./cmd-remove.js"; @@ -28,6 +29,7 @@ export function formatCliUsage(): string { " uncaged-workflow run [--prompt ] [--max-rounds N]", " uncaged-workflow ps", " uncaged-workflow kill ", + " uncaged-workflow live ", " uncaged-workflow history ", " uncaged-workflow rollback [hash]", " uncaged-workflow pause ", @@ -190,6 +192,15 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise { + const threadId = argv[0]; + if (threadId === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: live requires `); + return 1; + } + return cmdLive(storageRoot, threadId); +} + async function dispatchHistory(storageRoot: string, argv: string[]): Promise { const name = argv[0]; if (name === undefined || argv.length > 1) { @@ -435,6 +446,7 @@ const COMMAND_TABLE: Record = { run: dispatchRun, ps: dispatchPs, kill: dispatchKill, + live: dispatchLive, history: dispatchHistory, rollback: dispatchRollback, pause: dispatchPause, diff --git a/packages/cli-workflow/src/cmd-live.ts b/packages/cli-workflow/src/cmd-live.ts new file mode 100644 index 0000000..468fae3 --- /dev/null +++ b/packages/cli-workflow/src/cmd-live.ts @@ -0,0 +1,254 @@ +import { watch } from "node:fs"; +import { readFile } from "node:fs/promises"; + +import { + tryParseRoleStepRecord, + tryParseWorkflowResultRecord, + type WorkflowResult, +} from "@uncaged/workflow"; + +import { printCliError, printCliLine } from "./cli-output.js"; +import { resolveThreadDataPath } from "./thread-scan.js"; + +export const LIVE_CONTENT_MAX_LINES = 10; + +export type LiveRoleRow = { + role: string; + content: string; + meta: Record; + timestamp: number; +}; + +export function formatLiveTimeLabel(timestampMs: number): string { + const d = new Date(timestampMs); + const hh = String(d.getHours()).padStart(2, "0"); + const mm = String(d.getMinutes()).padStart(2, "0"); + const ss = String(d.getSeconds()).padStart(2, "0"); + return `${hh}:${mm}:${ss}`; +} + +function shouldUseColor(): boolean { + return process.stdout.isTTY === true && process.env.NO_COLOR === undefined; +} + +function highlightLiveRole(name: string): string { + if (!shouldUseColor()) { + return name; + } + return `\x1b[1m\x1b[36m${name}\x1b[0m`; +} + +export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] { + const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`; + const lines: string[] = [header]; + const parts = row.content.split("\n"); + const shown = parts.slice(0, LIVE_CONTENT_MAX_LINES); + for (const ln of shown) { + lines.push(` ${ln}`); + } + const omitted = parts.length - shown.length; + if (omitted > 0) { + lines.push(` … (${omitted} more line${omitted === 1 ? "" : "s"})`); + } + lines.push(` meta: ${JSON.stringify(row.meta)}`); + return lines; +} + +function printSummary(result: WorkflowResult): void { + printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`); +} + +type LiveSessionState = { + sawStart: boolean; + completed: boolean; + carry: string; + contentOffset: number; +}; + +function handleJsonlLine( + rawLine: string, + state: LiveSessionState, +): { parseError: string | null; workflowResult: WorkflowResult | null } { + const trimmed = rawLine.trim(); + if (trimmed === "") { + return { parseError: null, workflowResult: null }; + } + + let rec: unknown; + try { + rec = JSON.parse(trimmed) as unknown; + } catch { + return { parseError: "invalid JSON in thread data file", workflowResult: null }; + } + if (rec === null || typeof rec !== "object") { + return { parseError: "invalid record in thread data file", workflowResult: null }; + } + const obj = rec as Record; + + if (!state.sawStart) { + state.sawStart = true; + return { parseError: null, workflowResult: null }; + } + + const wf = tryParseWorkflowResultRecord(obj); + if (wf !== null) { + state.completed = true; + return { parseError: null, workflowResult: wf }; + } + + const roleRow = tryParseRoleStepRecord(obj); + if (roleRow === null) { + return { + parseError: "unrecognized record in thread data (expected role step or result)", + workflowResult: null, + }; + } + + const row: LiveRoleRow = { + role: roleRow.role, + content: roleRow.content, + meta: roleRow.meta, + timestamp: roleRow.timestamp, + }; + for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) { + printCliLine(outLine); + } + return { parseError: null, workflowResult: null }; +} + +async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise { + let text: string; + try { + text = await readFile(dataPath, "utf8"); + } catch { + return null; + } + + if (text.length < state.contentOffset) { + state.contentOffset = 0; + state.carry = ""; + } + + const chunk = text.slice(state.contentOffset); + state.contentOffset = text.length; + state.carry += chunk; + + const parts = state.carry.split("\n"); + state.carry = parts.pop() ?? ""; + + for (const line of parts) { + const { parseError, workflowResult } = handleJsonlLine(line, state); + if (parseError !== null) { + printCliError(parseError); + return 1; + } + if (workflowResult !== null) { + printSummary(workflowResult); + return 0; + } + } + + return null; +} + +function watchLiveFile(params: { + dataPath: string; + state: LiveSessionState; + signal: AbortSignal; +}): Promise { + const { dataPath, state, signal } = params; + + return new Promise((resolve, reject) => { + let settled = false; + const finish = (code: number): void => { + if (settled) { + return; + } + settled = true; + resolve(code); + }; + + /** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */ + let pumpChain: Promise = Promise.resolve(); + + const watcher = watch(dataPath, (eventType) => { + if (eventType === "rename") { + return; + } + schedulePump(); + }); + + watcher.on("error", (err: Error) => { + watcher.close(); + reject(err); + }); + + const onAbort = (): void => { + watcher.close(); + finish(0); + }; + signal.addEventListener("abort", onAbort, { once: true }); + + async function drainQueuedPump(): Promise { + if (settled) { + return; + } + try { + const code = await pumpNewContent(dataPath, state); + if (code !== null) { + watcher.close(); + finish(code); + } + } catch (e) { + watcher.close(); + reject(e instanceof Error ? e : new Error(String(e))); + } + } + + function schedulePump(): void { + pumpChain = pumpChain.then(() => drainQueuedPump()); + } + + schedulePump(); + }); +} + +export async function cmdLive(storageRoot: string, threadId: string): Promise { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + printCliError(`thread not found: ${threadId}`); + return 1; + } + + const state: LiveSessionState = { + sawStart: false, + completed: false, + carry: "", + contentOffset: 0, + }; + + const controller = new AbortController(); + const onSigInt = (): void => { + controller.abort(); + }; + process.on("SIGINT", onSigInt); + + try { + const first = await pumpNewContent(dataPath, state); + if (first !== null) { + return first; + } + + if (state.completed) { + return 0; + } + + return await watchLiveFile({ dataPath, state, signal: controller.signal }); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + printCliError(`live: ${message}`); + return 1; + } finally { + process.off("SIGINT", onSigInt); + } +} diff --git a/packages/workflow/__tests__/fork-thread.test.ts b/packages/workflow/__tests__/fork-thread.test.ts index 244fd87..4e75084 100644 --- a/packages/workflow/__tests__/fork-thread.test.ts +++ b/packages/workflow/__tests__/fork-thread.test.ts @@ -87,6 +87,26 @@ describe("fork-thread", () => { expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 }); }); + test("parseThreadDataJsonl ignores trailing WorkflowResult line", () => { + const text = `${sampleDataJsonl.trim()}\n{"returnCode":0,"summary":"done"}\n`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.roleSteps.length).toBe(3); + expect(r.value.roleSteps[2]?.role).toBe("reviewer"); + }); + + test("parseThreadDataJsonl errors when WorkflowResult is not last", () => { + const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3}},"timestamp":1} +{"returnCode":0,"summary":"early"} +{"role":"planner","content":"x","meta":{},"timestamp":2} +`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(false); + }); + test("parseThreadDataJsonl reads explicit depth from start record", () => { const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1} {"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2} diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 7067260..78ccd2c 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -125,7 +125,7 @@ describe("worker process", () => { .trim() .split("\n") .filter((l) => l !== "").length, - ).toBe(3); + ).toBe(4); } finally { await rm(root, { recursive: true, force: true }); } @@ -187,7 +187,7 @@ describe("worker process", () => { .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(3); + expect(lines.length).toBe(4); const start = JSON.parse(lines[0] ?? "{}") as Record; expect(start.forkFrom).toEqual({ threadId: srcId }); const replay = JSON.parse(lines[1] ?? "{}") as Record; @@ -195,6 +195,8 @@ describe("worker process", () => { expect(replay.timestamp).toBe(555); const coder = JSON.parse(lines[2] ?? "{}") as Record; expect(coder.role).toBe("coder"); + const done = JSON.parse(lines[3] ?? "{}") as Record; + expect(done.returnCode).toBe(0); } finally { await rm(root, { recursive: true, force: true }); } diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index 94a3d85..445381f 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -1,6 +1,6 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; -import type { RoleOutput } from "./types.js"; +import type { RoleOutput, WorkflowResult } from "./types.js"; /** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ export type ForkHistoricalStep = RoleOutput & { timestamp: number }; @@ -14,33 +14,54 @@ export type ParsedThreadStartRecord = { depth: number; }; -function parseRoleLine( - obj: Record, - lineIndex: number, -): Result { +/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */ +export function tryParseWorkflowResultRecord(obj: Record): WorkflowResult | null { + if (obj.role !== undefined) { + return null; + } + const returnCode = obj.returnCode; + const summary = obj.summary; + if (typeof returnCode !== "number" || typeof summary !== "string") { + return null; + } + return { returnCode, summary }; +} + +export function tryParseRoleStepRecord(obj: Record): ForkHistoricalStep | null { const role = obj.role; const contentHash = obj.contentHash; const meta = obj.meta; const timestamp = obj.timestamp; if (typeof role !== "string") { - return err(`invalid role record at line ${lineIndex}: missing role`); + return null; } if (typeof contentHash !== "string") { - return err(`invalid role record at line ${lineIndex}: missing contentHash`); + return null; } if (meta === null || typeof meta !== "object") { - return err(`invalid role record at line ${lineIndex}: missing meta`); + return null; } if (typeof timestamp !== "number") { - return err(`invalid role record at line ${lineIndex}: missing timestamp`); + return null; } - return ok({ + return { role, contentHash, meta: meta as Record, refs: normalizeRefsField(obj.refs), timestamp, - }); + }; +} + +function parseRoleLine( + obj: Record, + lineIndex: number, +): Result { + const parsed = tryParseRoleStepRecord(obj); + if (parsed === null) { + return err(`invalid role record at line ${lineIndex}`); + } + return ok(parsed); } function parseStartRecordLine(firstLine: string): Result { @@ -109,7 +130,15 @@ function parseFollowingRoleLines(lines: string[]): Result, i + 1); + const recObj = rec as Record; + const wf = tryParseWorkflowResultRecord(recObj); + if (wf !== null) { + if (i !== lines.length - 1) { + return err("WorkflowResult record must be the final line in `.data.jsonl`"); + } + break; + } + const parsed = parseRoleLine(recObj, i + 1); if (!parsed.ok) { return parsed; } diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index aa201d6..7071f99 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -25,6 +25,8 @@ export { type ParsedThreadStartRecord, parseThreadDataJsonl, selectForkHistoricalSteps, + tryParseRoleStepRecord, + tryParseWorkflowResultRecord, } from "./fork-thread.js"; export { type GcResult, garbageCollectCas } from "./gc.js"; export { stringifyWorkflowDescriptor } from "./generate-descriptor.js"; diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 8191b1d..574aaf4 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -1,4 +1,4 @@ -import { mkdir, unlink, writeFile } from "node:fs/promises"; +import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises"; import { createServer, type Socket } from "node:net"; import { dirname, join } from "node:path"; import { importWorkflowBundleModule } from "./bundle-import-env.js"; @@ -11,7 +11,7 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; import { getGlobalCasDir } from "./storage-root.js"; import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; -import type { RoleOutput, WorkflowFn } from "./types.js"; +import type { RoleOutput, WorkflowFn, WorkflowResult } from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); @@ -404,7 +404,7 @@ async function main(): Promise { }); } - await executeThread( + const runResult = await executeThread( workflowFn, cmd.workflowName, { prompt: cmd.prompt, steps: cmd.steps }, @@ -418,9 +418,12 @@ async function main(): Promise { io, logger, ); + await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8"); } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); + const failure: WorkflowResult = { returnCode: 1, summary: message }; + await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {}); } finally { threads.delete(threadId); await unlink(runningPath).catch(() => {}); -- 2.43.0 From 8fe26417cfcb978d960e2a0a2e78a5fe4ebffe5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Thu, 7 May 2026 21:34:04 +0800 Subject: [PATCH 2/2] feat(cli): add --latest, --debug, --role flags to live command (#37 Phase 2) - --latest: auto-find most recent thread by start timestamp - --debug: display .info.jsonl debug log with tags - --role: filter output to specific role - Add live-argv.ts for flag parsing - Add fixtures and test coverage for all flags Testing: #50 --- .../01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl | 6 +- .../01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl | 2 + .../01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl | 2 +- .../01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl | 2 + .../cli-workflow/__tests__/fork-cli.test.ts | 12 +- packages/cli-workflow/__tests__/live.test.ts | 218 +++++++++++- packages/cli-workflow/src/cli-dispatch.ts | 12 +- packages/cli-workflow/src/cmd-live.ts | 321 +++++++++++++++--- packages/cli-workflow/src/live-argv.ts | 75 ++++ packages/cli-workflow/src/thread-scan.ts | 68 +++- packages/workflow/src/fork-thread.ts | 8 +- packages/workflow/src/worker.ts | 2 +- 12 files changed, 651 insertions(+), 77 deletions(-) create mode 100644 packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl create mode 100644 packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl create mode 100644 packages/cli-workflow/src/live-argv.ts diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl index 5076f72..96fc47d 100644 --- a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl @@ -1,4 +1,4 @@ -{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} -{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} -{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000} +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963400000} +{"role":"planner","contentHash":"FF7YQ5W3S2EV6","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} +{"role":"coder","contentHash":"EN34XX1W4WAFJ","meta":{},"refs":[],"timestamp":1714963202000} {"returnCode":0,"summary":"fixture completed"} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl new file mode 100644 index 0000000..d9e2d29 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl @@ -0,0 +1,2 @@ +{"tag":"DEBUGTAG1","content":"bundle loaded","timestamp":1714963400050} +{"tag":"DEBUGTAG2","content":"multi\nline","timestamp":1714963400500} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl index 7ad5878..fb9cdec 100644 --- a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl @@ -1,2 +1,2 @@ {"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} -{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000} +{"role":"planner","contentHash":"P6M9FHE1GSBN0","meta":{"x":1},"refs":[],"timestamp":1714963201000} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..63ec7df --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,2 @@ +{"name":"demo-live-old","hash":"C9NMV6V2TQT81","threadId":"01LIVEOLDER01DDDDDDDDDDDDG","parameters":{"prompt":"old","options":{"maxRounds":5,"depth":0}},"timestamp":1714963000000} +{"returnCode":0,"summary":"older thread"} diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 6f9c88b..a0fe69d 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -134,10 +134,10 @@ describe("cli fork", () => { expect(start.threadId).toBe(newId); expect(start.forkFrom).toEqual({ threadId: sourceId }); - const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; - expect(last.role).toBe("reviewer"); + const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; + expect(lastRoleLine.role).toBe("reviewer"); const cas = createCasStore(getGlobalCasDir(storageRoot)); - expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1"); + expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1"); }); test("fork without --from-role retries last role", async () => { @@ -187,9 +187,9 @@ describe("cli fork", () => { const cas = createCasStore(getGlobalCasDir(storageRoot)); expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1"); - const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; - expect(last.role).toBe("reviewer"); - expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2"); + const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; + expect(lastRoleLine.role).toBe("reviewer"); + expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2"); }); test("fork rejects unknown role with available names", async () => { diff --git a/packages/cli-workflow/__tests__/live.test.ts b/packages/cli-workflow/__tests__/live.test.ts index 38d9ca3..7d44cda 100644 --- a/packages/cli-workflow/__tests__/live.test.ts +++ b/packages/cli-workflow/__tests__/live.test.ts @@ -5,22 +5,37 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; +import { createCasStore, getGlobalCasDir, putContentMerkleNode } from "@uncaged/workflow"; + import { + formatLiveDebugLine, formatLiveTimeLabel, LIVE_CONTENT_MAX_LINES, type LiveRoleRow, renderLiveRoleStepLines, } from "../src/cmd-live.js"; +import { parseLiveArgv } from "../src/live-argv.js"; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url)); +/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */ +const LIVE_FIXTURE_PLANNER_BODY = + "alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11"; + describe("live helpers", () => { test("formatLiveTimeLabel pads HH:MM:SS", () => { const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime()); expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/); }); + test("formatLiveDebugLine flattens newlines in message", () => { + const line = formatLiveDebugLine(0, "TAG1", "a\nb"); + expect(line).toContain("[TAG1]"); + expect(line).toContain("a b"); + expect(line).not.toContain("\n"); + }); + test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => { const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`); const row: LiveRoleRow = { @@ -37,6 +52,31 @@ describe("live helpers", () => { }); }); +describe("parseLiveArgv", () => { + test("parses thread id and flags in any order", () => { + const a = parseLiveArgv(["01ABC", "--debug", "--role", "planner"]); + expect(a.ok).toBe(true); + if (a.ok) { + expect(a.value.threadId).toBe("01ABC"); + expect(a.value.latest).toBe(false); + expect(a.value.debug).toBe(true); + expect(a.value.role).toBe("planner"); + } + const b = parseLiveArgv(["--latest", "--role", "x"]); + expect(b.ok).toBe(true); + if (b.ok) { + expect(b.value.latest).toBe(true); + expect(b.value.threadId).toBe(null); + expect(b.value.role).toBe("x"); + } + }); + + test("rejects --latest with thread id", () => { + const r = parseLiveArgv(["--latest", "01ABC"]); + expect(r.ok).toBe(false); + }); +}); + describe("live CLI", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -50,10 +90,23 @@ describe("live CLI", () => { join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), + ); await cp( join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), + ); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY); + await putContentMerkleNode(cas, "patch"); + await putContentMerkleNode(cas, "still running"); }); afterEach(async () => { @@ -100,6 +153,135 @@ describe("live CLI", () => { expect(stdout).toContain("fixture completed"); }); + test("--latest tails the newest thread by start timestamp", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("fixture completed"); + expect(stdout).not.toContain("older thread"); + }); + + test("--debug prints .info.jsonl records after data output", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("[DEBUGTAG1]"); + expect(stdout).toContain("bundle loaded"); + expect(stdout).toContain("[DEBUGTAG2]"); + expect(stdout).toContain("multi line"); + }); + + test("--role filters out non-matching roles", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).not.toContain("patch"); + expect(stdout).toContain("completed: returnCode=0"); + }); + + test("--latest --debug --role combine", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn( + process.execPath, + [cliEntryPath, "live", "--latest", "--debug", "--role", "planner"], + { + env, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("[DEBUGTAG1]"); + expect(stdout).toContain("planner"); + expect(stdout).not.toContain("patch"); + expect(stdout).toContain("fixture completed"); + }); + test("unknown thread id exits 1", () => { const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], { @@ -126,7 +308,11 @@ describe("live CLI", () => { await new Promise((r) => setTimeout(r, 120)); const prior = await readFile(dataPath, "utf8"); - await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8"); + await writeFile( + dataPath, + `${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`, + "utf8", + ); const stdout = await new Promise((resolve, reject) => { let buf = ""; @@ -151,3 +337,33 @@ describe("live CLI", () => { expect(stdout).toContain("caught up"); }); }); + +describe("live --latest with empty storage", () => { + let prevEnv: string | undefined; + let emptyRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + emptyRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-empty-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = emptyRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(emptyRoot, { recursive: true, force: true }); + }); + + test("exits 1 when no threads exist", () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: emptyRoot }; + const r = spawnSync(process.execPath, [cliEntryPath, "live", "--latest"], { + env, + encoding: "utf8", + }); + expect(r.status).toBe(1); + expect(String(r.stderr ?? "")).toContain("no threads"); + }); +}); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 47828c0..7f676c2 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -17,6 +17,7 @@ import { cmdRun } from "./cmd-run.js"; import { cmdShow, formatShowYaml } from "./cmd-show.js"; import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js"; import { cmdThreads } from "./cmd-threads.js"; +import { parseLiveArgv } from "./live-argv.js"; import { parseRunArgv } from "./run-argv.js"; export function formatCliUsage(): string { @@ -29,7 +30,8 @@ export function formatCliUsage(): string { " uncaged-workflow run [--prompt ] [--max-rounds N]", " uncaged-workflow ps", " uncaged-workflow kill ", - " uncaged-workflow live ", + " uncaged-workflow live [--debug] [--role ]", + " uncaged-workflow live --latest [--debug] [--role ]", " uncaged-workflow history ", " uncaged-workflow rollback [hash]", " uncaged-workflow pause ", @@ -193,12 +195,12 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise { - const threadId = argv[0]; - if (threadId === undefined || argv.length > 1) { - printCliError(`${usage()}\n\nerror: live requires `); + const parsed = parseLiveArgv(argv); + if (!parsed.ok) { + printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`); return 1; } - return cmdLive(storageRoot, threadId); + return cmdLive(storageRoot, parsed.value); } async function dispatchHistory(storageRoot: string, argv: string[]): Promise { diff --git a/packages/cli-workflow/src/cmd-live.ts b/packages/cli-workflow/src/cmd-live.ts index 468fae3..ccbfcba 100644 --- a/packages/cli-workflow/src/cmd-live.ts +++ b/packages/cli-workflow/src/cmd-live.ts @@ -1,14 +1,21 @@ import { watch } from "node:fs"; import { readFile } from "node:fs/promises"; +import { dirname, join } from "node:path"; import { + type CasStore, + createCasStore, + getContentMerklePayload, + getGlobalCasDir, tryParseRoleStepRecord, tryParseWorkflowResultRecord, - type WorkflowResult, + type WorkflowCompletion, } from "@uncaged/workflow"; import { printCliError, printCliLine } from "./cli-output.js"; -import { resolveThreadDataPath } from "./thread-scan.js"; +import { pathExists } from "./fs-utils.js"; +import type { ParsedLiveArgv } from "./live-argv.js"; +import { findLatestThreadDataPath, resolveThreadDataPath } from "./thread-scan.js"; export const LIVE_CONTENT_MAX_LINES = 10; @@ -38,6 +45,18 @@ function highlightLiveRole(name: string): string { return `\x1b[1m\x1b[36m${name}\x1b[0m`; } +function dimGreyLine(line: string): string { + if (!shouldUseColor()) { + return line; + } + return `\x1b[2m\x1b[90m${line}\x1b[0m`; +} + +export function formatLiveDebugLine(timestampMs: number, tag: string, message: string): string { + const label = `[${formatLiveTimeLabel(timestampMs)}] [${tag}] ${message.replace(/\n/g, " ")}`; + return dimGreyLine(label); +} + export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] { const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`; const lines: string[] = [header]; @@ -54,7 +73,7 @@ export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): return lines; } -function printSummary(result: WorkflowResult): void { +function printSummary(result: WorkflowCompletion): void { printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`); } @@ -65,10 +84,36 @@ type LiveSessionState = { contentOffset: number; }; -function handleJsonlLine( +type InfoLiveState = { + carry: string; + contentOffset: number; +}; + +function tryParseInfoRecord(obj: Record): { + tag: string; + content: string; + timestamp: number; +} | null { + const tag = obj.tag; + const content = obj.content; + const timestamp = obj.timestamp; + if ( + typeof tag !== "string" || + typeof content !== "string" || + typeof timestamp !== "number" || + !Number.isFinite(timestamp) + ) { + return null; + } + return { tag, content, timestamp }; +} + +async function handleJsonlLine( rawLine: string, state: LiveSessionState, -): { parseError: string | null; workflowResult: WorkflowResult | null } { + roleFilter: string | null, + cas: CasStore, +): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> { const trimmed = rawLine.trim(); if (trimmed === "") { return { parseError: null, workflowResult: null }; @@ -104,9 +149,17 @@ function handleJsonlLine( }; } + if (roleFilter !== null && roleRow.role !== roleFilter) { + return { parseError: null, workflowResult: null }; + } + + const payload = await getContentMerklePayload(cas, roleRow.contentHash); + const content = + payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`; + const row: LiveRoleRow = { role: roleRow.role, - content: roleRow.content, + content, meta: roleRow.meta, timestamp: roleRow.timestamp, }; @@ -116,7 +169,12 @@ function handleJsonlLine( return { parseError: null, workflowResult: null }; } -async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise { +async function pumpNewContent( + dataPath: string, + state: LiveSessionState, + roleFilter: string | null, + cas: CasStore, +): Promise { let text: string; try { text = await readFile(dataPath, "utf8"); @@ -137,7 +195,7 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis state.carry = parts.pop() ?? ""; for (const line of parts) { - const { parseError, workflowResult } = handleJsonlLine(line, state); + const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas); if (parseError !== null) { printCliError(parseError); return 1; @@ -151,12 +209,76 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis return null; } -function watchLiveFile(params: { - dataPath: string; - state: LiveSessionState; - signal: AbortSignal; -}): Promise { - const { dataPath, state, signal } = params; +async function pumpNewInfoContent(infoPath: string, state: InfoLiveState): Promise { + let text: string; + try { + text = await readFile(infoPath, "utf8"); + } catch { + return; + } + + if (text.length < state.contentOffset) { + state.contentOffset = 0; + state.carry = ""; + } + + const chunk = text.slice(state.contentOffset); + state.contentOffset = text.length; + state.carry += chunk; + + const parts = state.carry.split("\n"); + state.carry = parts.pop() ?? ""; + + for (const line of parts) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + let rec: unknown; + try { + rec = JSON.parse(trimmed) as unknown; + } catch { + continue; + } + if (rec === null || typeof rec !== "object") { + continue; + } + const parsed = tryParseInfoRecord(rec as Record); + if (parsed === null) { + continue; + } + printCliLine(formatLiveDebugLine(parsed.timestamp, parsed.tag, parsed.content)); + } +} + +type WatchPumpTask = { + path: string; + pump: () => Promise; +}; + +async function runWatchPumpStep( + settled: () => boolean, + pump: () => Promise, + closeAll: () => void, + finish: (code: number) => void, +): Promise { + if (settled()) { + return; + } + try { + const code = await pump(); + if (code !== null) { + closeAll(); + finish(code); + } + } catch (e) { + closeAll(); + throw e instanceof Error ? e : new Error(String(e)); + } +} + +function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }): Promise { + const { tasks, signal } = params; return new Promise((resolve, reject) => { let settled = false; @@ -168,65 +290,138 @@ function watchLiveFile(params: { resolve(code); }; - /** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */ - let pumpChain: Promise = Promise.resolve(); + const pumpChains = new Map>(); + for (const t of tasks) { + pumpChains.set(t.path, Promise.resolve()); + } - const watcher = watch(dataPath, (eventType) => { - if (eventType === "rename") { - return; + const watchers: ReturnType[] = []; + + const closeAll = (): void => { + for (const w of watchers) { + w.close(); } - schedulePump(); - }); + }; - watcher.on("error", (err: Error) => { - watcher.close(); - reject(err); - }); + function schedulePump(path: string, pump: () => Promise): void { + const prev = pumpChains.get(path) ?? Promise.resolve(); + const next = (async () => { + await prev; + await runWatchPumpStep(() => settled, pump, closeAll, finish); + })(); + pumpChains.set(path, next); + } + + for (const { path, pump } of tasks) { + const watcher = watch(path, (eventType) => { + if (eventType === "rename") { + return; + } + schedulePump(path, pump); + }); + watchers.push(watcher); + watcher.on("error", (err: Error) => { + closeAll(); + reject(err); + }); + } const onAbort = (): void => { - watcher.close(); + closeAll(); finish(0); }; signal.addEventListener("abort", onAbort, { once: true }); - async function drainQueuedPump(): Promise { - if (settled) { - return; - } - try { - const code = await pumpNewContent(dataPath, state); - if (code !== null) { - watcher.close(); - finish(code); - } - } catch (e) { - watcher.close(); - reject(e instanceof Error ? e : new Error(String(e))); - } + for (const { path, pump } of tasks) { + schedulePump(path, pump); } - - function schedulePump(): void { - pumpChain = pumpChain.then(() => drainQueuedPump()); - } - - schedulePump(); }); } -export async function cmdLive(storageRoot: string, threadId: string): Promise { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - printCliError(`thread not found: ${threadId}`); +type LiveThreadTarget = { + threadId: string; + dataPath: string; +}; + +async function resolveLiveThreadTarget( + storageRoot: string, + parsed: ParsedLiveArgv, +): Promise { + if (parsed.latest) { + const found = await findLatestThreadDataPath(storageRoot); + if (found === null) { + printCliError("live: no threads found"); + return null; + } + return found; + } + + const id = parsed.threadId; + if (id === null) { + printCliError("live: internal error: missing thread id"); + return null; + } + const resolved = await resolveThreadDataPath(storageRoot, id); + if (resolved === null) { + printCliError(`thread not found: ${id}`); + return null; + } + return { threadId: id, dataPath: resolved }; +} + +async function buildLiveWatchTasks(params: { + dataPath: string; + infoPath: string; + debug: boolean; + dataState: LiveSessionState; + infoState: InfoLiveState; + roleFilter: string | null; + cas: CasStore; +}): Promise { + const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params; + const tasks: WatchPumpTask[] = [ + { + path: dataPath, + pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas), + }, + ]; + + if (debug && (await pathExists(infoPath))) { + tasks.push({ + path: infoPath, + pump: async () => { + await pumpNewInfoContent(infoPath, infoState); + return null; + }, + }); + } + + return tasks; +} + +export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Promise { + const target = await resolveLiveThreadTarget(storageRoot, parsed); + if (target === null) { return 1; } - const state: LiveSessionState = { + const { threadId, dataPath } = target; + const roleFilter = parsed.role; + const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`); + const cas = createCasStore(getGlobalCasDir(storageRoot)); + + const dataState: LiveSessionState = { sawStart: false, completed: false, carry: "", contentOffset: 0, }; + const infoState: InfoLiveState = { + carry: "", + contentOffset: 0, + }; + const controller = new AbortController(); const onSigInt = (): void => { controller.abort(); @@ -234,16 +429,30 @@ export async function cmdLive(storageRoot: string, threadId: string): Promise { + const a = argv[i]; + if (a === "--latest") { + s.latest = true; + return ok(i + 1); + } + if (a === "--debug") { + s.debug = true; + return ok(i + 1); + } + if (a === "--role") { + const v = argv[i + 1]; + if (v === undefined || v.startsWith("--")) { + return err("missing value for --role"); + } + s.role = v; + return ok(i + 2); + } + if (a.startsWith("--")) { + return err(`unknown live flag: ${a}`); + } + if (s.threadId !== null) { + return err("unexpected extra argument"); + } + s.threadId = a; + return ok(i + 1); +} + +export function parseLiveArgv(argv: string[]): Result { + const s: LiveArgvScan = { + latest: false, + debug: false, + role: null, + threadId: null, + }; + + let i = 0; + while (i < argv.length) { + const step = applyLiveArgvToken(argv, i, s); + if (!step.ok) { + return step; + } + i = step.value; + } + + if (s.latest && s.threadId !== null) { + return err("live --latest does not take "); + } + if (!s.latest && s.threadId === null) { + return err("live requires or --latest"); + } + + return ok({ + threadId: s.threadId, + latest: s.latest, + debug: s.debug, + role: s.role, + }); +} diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts index df06731..0cf2951 100644 --- a/packages/cli-workflow/src/thread-scan.ts +++ b/packages/cli-workflow/src/thread-scan.ts @@ -1,4 +1,4 @@ -import { readdir } from "node:fs/promises"; +import { readdir, stat } from "node:fs/promises"; import { join } from "node:path"; import { pathExists, readTextFileIfExists } from "./fs-utils.js"; @@ -15,6 +15,28 @@ export type HistoricalThreadRow = { workflowName: string | null; }; +async function readThreadStartTimestampMs(dataPath: string): Promise { + const text = await readTextFileIfExists(dataPath); + if (text === null) { + return null; + } + const firstLine = text.split("\n")[0]; + if (firstLine === undefined || firstLine.trim() === "") { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(firstLine) as unknown; + } catch { + return null; + } + if (parsed === null || typeof parsed !== "object") { + return null; + } + const ts = (parsed as Record).timestamp; + return typeof ts === "number" && Number.isFinite(ts) ? ts : null; +} + async function readWorkflowNameFromDataJsonl(dataPath: string): Promise { const text = await readTextFileIfExists(dataPath); if (text === null) { @@ -124,6 +146,50 @@ export async function listHistoricalThreads( return out; } +/** + * Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`, + * falling back to file `mtime` when the timestamp is missing. + * Tie-breaker: larger `mtime` wins when start timestamps are equal. + */ +export async function findLatestThreadDataPath( + storageRoot: string, +): Promise<{ threadId: string; dataPath: string } | null> { + const threads = await listHistoricalThreads(storageRoot, null); + if (threads.length === 0) { + return null; + } + + let best: { + threadId: string; + dataPath: string; + primary: number; + secondary: number; + } | null = null; + + for (const t of threads) { + const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`); + let mtimeMs = 0; + try { + const st = await stat(dataPath); + mtimeMs = st.mtimeMs; + } catch { + continue; + } + const startTs = await readThreadStartTimestampMs(dataPath); + const primary = startTs !== null ? startTs : mtimeMs; + const secondary = mtimeMs; + if ( + best === null || + primary > best.primary || + (primary === best.primary && secondary > best.secondary) + ) { + best = { threadId: t.threadId, dataPath, primary, secondary }; + } + } + + return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath }; +} + export async function resolveThreadDataPath( storageRoot: string, threadId: string, diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index 445381f..8a6aa76 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -1,6 +1,6 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; -import type { RoleOutput, WorkflowResult } from "./types.js"; +import type { RoleOutput, WorkflowCompletion } from "./types.js"; /** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ export type ForkHistoricalStep = RoleOutput & { timestamp: number }; @@ -14,8 +14,10 @@ export type ParsedThreadStartRecord = { depth: number; }; -/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */ -export function tryParseWorkflowResultRecord(obj: Record): WorkflowResult | null { +/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */ +export function tryParseWorkflowResultRecord( + obj: Record, +): WorkflowCompletion | null { if (obj.role !== undefined) { return null; } diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 574aaf4..0edbbde 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -422,7 +422,7 @@ async function main(): Promise { } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); - const failure: WorkflowResult = { returnCode: 1, summary: message }; + const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" }; await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {}); } finally { threads.delete(threadId); -- 2.43.0