feat(cli): add --latest, --debug, --role flags to live command (#37 Phase 2)
- --latest: auto-find most recent thread by start timestamp - --debug: display .info.jsonl debug log with tags - --role: filter output to specific role - Add live-argv.ts for flag parsing - Add fixtures and test coverage for all flags Testing: #50
This commit is contained in:
+3
-3
@@ -1,4 +1,4 @@
|
||||
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000}
|
||||
{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000}
|
||||
{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000}
|
||||
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963400000}
|
||||
{"role":"planner","contentHash":"FF7YQ5W3S2EV6","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000}
|
||||
{"role":"coder","contentHash":"EN34XX1W4WAFJ","meta":{},"refs":[],"timestamp":1714963202000}
|
||||
{"returnCode":0,"summary":"fixture completed"}
|
||||
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
{"tag":"DEBUGTAG1","content":"bundle loaded","timestamp":1714963400050}
|
||||
{"tag":"DEBUGTAG2","content":"multi\nline","timestamp":1714963400500}
|
||||
+1
-1
@@ -1,2 +1,2 @@
|
||||
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000}
|
||||
{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000}
|
||||
{"role":"planner","contentHash":"P6M9FHE1GSBN0","meta":{"x":1},"refs":[],"timestamp":1714963201000}
|
||||
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
{"name":"demo-live-old","hash":"C9NMV6V2TQT81","threadId":"01LIVEOLDER01DDDDDDDDDDDDG","parameters":{"prompt":"old","options":{"maxRounds":5,"depth":0}},"timestamp":1714963000000}
|
||||
{"returnCode":0,"summary":"older thread"}
|
||||
@@ -134,10 +134,10 @@ describe("cli fork", () => {
|
||||
expect(start.threadId).toBe(newId);
|
||||
expect(start.forkFrom).toEqual({ threadId: sourceId });
|
||||
|
||||
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
|
||||
expect(last.role).toBe("reviewer");
|
||||
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
|
||||
expect(lastRoleLine.role).toBe("reviewer");
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1");
|
||||
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1");
|
||||
});
|
||||
|
||||
test("fork without --from-role retries last role", async () => {
|
||||
@@ -187,9 +187,9 @@ describe("cli fork", () => {
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
|
||||
|
||||
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
|
||||
expect(last.role).toBe("reviewer");
|
||||
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2");
|
||||
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
|
||||
expect(lastRoleLine.role).toBe("reviewer");
|
||||
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2");
|
||||
});
|
||||
|
||||
test("fork rejects unknown role with available names", async () => {
|
||||
|
||||
@@ -5,22 +5,37 @@ import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { createCasStore, getGlobalCasDir, putContentMerkleNode } from "@uncaged/workflow";
|
||||
|
||||
import {
|
||||
formatLiveDebugLine,
|
||||
formatLiveTimeLabel,
|
||||
LIVE_CONTENT_MAX_LINES,
|
||||
type LiveRoleRow,
|
||||
renderLiveRoleStepLines,
|
||||
} from "../src/cmd-live.js";
|
||||
import { parseLiveArgv } from "../src/live-argv.js";
|
||||
|
||||
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
|
||||
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
|
||||
|
||||
/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */
|
||||
const LIVE_FIXTURE_PLANNER_BODY =
|
||||
"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11";
|
||||
|
||||
describe("live helpers", () => {
|
||||
test("formatLiveTimeLabel pads HH:MM:SS", () => {
|
||||
const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime());
|
||||
expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/);
|
||||
});
|
||||
|
||||
test("formatLiveDebugLine flattens newlines in message", () => {
|
||||
const line = formatLiveDebugLine(0, "TAG1", "a\nb");
|
||||
expect(line).toContain("[TAG1]");
|
||||
expect(line).toContain("a b");
|
||||
expect(line).not.toContain("\n");
|
||||
});
|
||||
|
||||
test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => {
|
||||
const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`);
|
||||
const row: LiveRoleRow = {
|
||||
@@ -37,6 +52,31 @@ describe("live helpers", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("parseLiveArgv", () => {
|
||||
test("parses thread id and flags in any order", () => {
|
||||
const a = parseLiveArgv(["01ABC", "--debug", "--role", "planner"]);
|
||||
expect(a.ok).toBe(true);
|
||||
if (a.ok) {
|
||||
expect(a.value.threadId).toBe("01ABC");
|
||||
expect(a.value.latest).toBe(false);
|
||||
expect(a.value.debug).toBe(true);
|
||||
expect(a.value.role).toBe("planner");
|
||||
}
|
||||
const b = parseLiveArgv(["--latest", "--role", "x"]);
|
||||
expect(b.ok).toBe(true);
|
||||
if (b.ok) {
|
||||
expect(b.value.latest).toBe(true);
|
||||
expect(b.value.threadId).toBe(null);
|
||||
expect(b.value.role).toBe("x");
|
||||
}
|
||||
});
|
||||
|
||||
test("rejects --latest with thread id", () => {
|
||||
const r = parseLiveArgv(["--latest", "01ABC"]);
|
||||
expect(r.ok).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("live CLI", () => {
|
||||
let prevEnv: string | undefined;
|
||||
let storageRoot: string;
|
||||
@@ -50,10 +90,23 @@ describe("live CLI", () => {
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY);
|
||||
await putContentMerkleNode(cas, "patch");
|
||||
await putContentMerkleNode(cas, "still running");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
@@ -100,6 +153,135 @@ describe("live CLI", () => {
|
||||
expect(stdout).toContain("fixture completed");
|
||||
});
|
||||
|
||||
test("--latest tails the newest thread by start timestamp", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], {
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("fixture completed");
|
||||
expect(stdout).not.toContain("older thread");
|
||||
});
|
||||
|
||||
test("--debug prints .info.jsonl records after data output", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("[DEBUGTAG1]");
|
||||
expect(stdout).toContain("bundle loaded");
|
||||
expect(stdout).toContain("[DEBUGTAG2]");
|
||||
expect(stdout).toContain("multi line");
|
||||
});
|
||||
|
||||
test("--role filters out non-matching roles", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).not.toContain("patch");
|
||||
expect(stdout).toContain("completed: returnCode=0");
|
||||
});
|
||||
|
||||
test("--latest --debug --role combine", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "--latest", "--debug", "--role", "planner"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("[DEBUGTAG1]");
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).not.toContain("patch");
|
||||
expect(stdout).toContain("fixture completed");
|
||||
});
|
||||
|
||||
test("unknown thread id exits 1", () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
|
||||
@@ -126,7 +308,11 @@ describe("live CLI", () => {
|
||||
|
||||
await new Promise((r) => setTimeout(r, 120));
|
||||
const prior = await readFile(dataPath, "utf8");
|
||||
await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8");
|
||||
await writeFile(
|
||||
dataPath,
|
||||
`${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`,
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
@@ -151,3 +337,33 @@ describe("live CLI", () => {
|
||||
expect(stdout).toContain("caught up");
|
||||
});
|
||||
});
|
||||
|
||||
describe("live --latest with empty storage", () => {
|
||||
let prevEnv: string | undefined;
|
||||
let emptyRoot: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||
emptyRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-empty-"));
|
||||
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = emptyRoot;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (prevEnv === undefined) {
|
||||
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||
} else {
|
||||
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
|
||||
}
|
||||
await rm(emptyRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
test("exits 1 when no threads exist", () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: emptyRoot };
|
||||
const r = spawnSync(process.execPath, [cliEntryPath, "live", "--latest"], {
|
||||
env,
|
||||
encoding: "utf8",
|
||||
});
|
||||
expect(r.status).toBe(1);
|
||||
expect(String(r.stderr ?? "")).toContain("no threads");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -17,6 +17,7 @@ import { cmdRun } from "./cmd-run.js";
|
||||
import { cmdShow, formatShowYaml } from "./cmd-show.js";
|
||||
import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js";
|
||||
import { cmdThreads } from "./cmd-threads.js";
|
||||
import { parseLiveArgv } from "./live-argv.js";
|
||||
import { parseRunArgv } from "./run-argv.js";
|
||||
|
||||
export function formatCliUsage(): string {
|
||||
@@ -29,7 +30,8 @@ export function formatCliUsage(): string {
|
||||
" uncaged-workflow run <name> [--prompt <text>] [--max-rounds N]",
|
||||
" uncaged-workflow ps",
|
||||
" uncaged-workflow kill <thread-id>",
|
||||
" uncaged-workflow live <thread-id>",
|
||||
" uncaged-workflow live <thread-id> [--debug] [--role <name>]",
|
||||
" uncaged-workflow live --latest [--debug] [--role <name>]",
|
||||
" uncaged-workflow history <name>",
|
||||
" uncaged-workflow rollback <name> [hash]",
|
||||
" uncaged-workflow pause <thread-id>",
|
||||
@@ -193,12 +195,12 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise<number
|
||||
}
|
||||
|
||||
async function dispatchLive(storageRoot: string, argv: string[]): Promise<number> {
|
||||
const threadId = argv[0];
|
||||
if (threadId === undefined || argv.length > 1) {
|
||||
printCliError(`${usage()}\n\nerror: live requires <thread-id>`);
|
||||
const parsed = parseLiveArgv(argv);
|
||||
if (!parsed.ok) {
|
||||
printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`);
|
||||
return 1;
|
||||
}
|
||||
return cmdLive(storageRoot, threadId);
|
||||
return cmdLive(storageRoot, parsed.value);
|
||||
}
|
||||
|
||||
async function dispatchHistory(storageRoot: string, argv: string[]): Promise<number> {
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
import { watch } from "node:fs";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import {
|
||||
type CasStore,
|
||||
createCasStore,
|
||||
getContentMerklePayload,
|
||||
getGlobalCasDir,
|
||||
tryParseRoleStepRecord,
|
||||
tryParseWorkflowResultRecord,
|
||||
type WorkflowResult,
|
||||
type WorkflowCompletion,
|
||||
} from "@uncaged/workflow";
|
||||
|
||||
import { printCliError, printCliLine } from "./cli-output.js";
|
||||
import { resolveThreadDataPath } from "./thread-scan.js";
|
||||
import { pathExists } from "./fs-utils.js";
|
||||
import type { ParsedLiveArgv } from "./live-argv.js";
|
||||
import { findLatestThreadDataPath, resolveThreadDataPath } from "./thread-scan.js";
|
||||
|
||||
export const LIVE_CONTENT_MAX_LINES = 10;
|
||||
|
||||
@@ -38,6 +45,18 @@ function highlightLiveRole(name: string): string {
|
||||
return `\x1b[1m\x1b[36m${name}\x1b[0m`;
|
||||
}
|
||||
|
||||
function dimGreyLine(line: string): string {
|
||||
if (!shouldUseColor()) {
|
||||
return line;
|
||||
}
|
||||
return `\x1b[2m\x1b[90m${line}\x1b[0m`;
|
||||
}
|
||||
|
||||
export function formatLiveDebugLine(timestampMs: number, tag: string, message: string): string {
|
||||
const label = `[${formatLiveTimeLabel(timestampMs)}] [${tag}] ${message.replace(/\n/g, " ")}`;
|
||||
return dimGreyLine(label);
|
||||
}
|
||||
|
||||
export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] {
|
||||
const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`;
|
||||
const lines: string[] = [header];
|
||||
@@ -54,7 +73,7 @@ export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string):
|
||||
return lines;
|
||||
}
|
||||
|
||||
function printSummary(result: WorkflowResult): void {
|
||||
function printSummary(result: WorkflowCompletion): void {
|
||||
printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`);
|
||||
}
|
||||
|
||||
@@ -65,10 +84,36 @@ type LiveSessionState = {
|
||||
contentOffset: number;
|
||||
};
|
||||
|
||||
function handleJsonlLine(
|
||||
type InfoLiveState = {
|
||||
carry: string;
|
||||
contentOffset: number;
|
||||
};
|
||||
|
||||
function tryParseInfoRecord(obj: Record<string, unknown>): {
|
||||
tag: string;
|
||||
content: string;
|
||||
timestamp: number;
|
||||
} | null {
|
||||
const tag = obj.tag;
|
||||
const content = obj.content;
|
||||
const timestamp = obj.timestamp;
|
||||
if (
|
||||
typeof tag !== "string" ||
|
||||
typeof content !== "string" ||
|
||||
typeof timestamp !== "number" ||
|
||||
!Number.isFinite(timestamp)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return { tag, content, timestamp };
|
||||
}
|
||||
|
||||
async function handleJsonlLine(
|
||||
rawLine: string,
|
||||
state: LiveSessionState,
|
||||
): { parseError: string | null; workflowResult: WorkflowResult | null } {
|
||||
roleFilter: string | null,
|
||||
cas: CasStore,
|
||||
): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> {
|
||||
const trimmed = rawLine.trim();
|
||||
if (trimmed === "") {
|
||||
return { parseError: null, workflowResult: null };
|
||||
@@ -104,9 +149,17 @@ function handleJsonlLine(
|
||||
};
|
||||
}
|
||||
|
||||
if (roleFilter !== null && roleRow.role !== roleFilter) {
|
||||
return { parseError: null, workflowResult: null };
|
||||
}
|
||||
|
||||
const payload = await getContentMerklePayload(cas, roleRow.contentHash);
|
||||
const content =
|
||||
payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`;
|
||||
|
||||
const row: LiveRoleRow = {
|
||||
role: roleRow.role,
|
||||
content: roleRow.content,
|
||||
content,
|
||||
meta: roleRow.meta,
|
||||
timestamp: roleRow.timestamp,
|
||||
};
|
||||
@@ -116,7 +169,12 @@ function handleJsonlLine(
|
||||
return { parseError: null, workflowResult: null };
|
||||
}
|
||||
|
||||
async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise<number | null> {
|
||||
async function pumpNewContent(
|
||||
dataPath: string,
|
||||
state: LiveSessionState,
|
||||
roleFilter: string | null,
|
||||
cas: CasStore,
|
||||
): Promise<number | null> {
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(dataPath, "utf8");
|
||||
@@ -137,7 +195,7 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis
|
||||
state.carry = parts.pop() ?? "";
|
||||
|
||||
for (const line of parts) {
|
||||
const { parseError, workflowResult } = handleJsonlLine(line, state);
|
||||
const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas);
|
||||
if (parseError !== null) {
|
||||
printCliError(parseError);
|
||||
return 1;
|
||||
@@ -151,12 +209,76 @@ async function pumpNewContent(dataPath: string, state: LiveSessionState): Promis
|
||||
return null;
|
||||
}
|
||||
|
||||
function watchLiveFile(params: {
|
||||
dataPath: string;
|
||||
state: LiveSessionState;
|
||||
signal: AbortSignal;
|
||||
}): Promise<number> {
|
||||
const { dataPath, state, signal } = params;
|
||||
async function pumpNewInfoContent(infoPath: string, state: InfoLiveState): Promise<void> {
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(infoPath, "utf8");
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
if (text.length < state.contentOffset) {
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
|
||||
const chunk = text.slice(state.contentOffset);
|
||||
state.contentOffset = text.length;
|
||||
state.carry += chunk;
|
||||
|
||||
const parts = state.carry.split("\n");
|
||||
state.carry = parts.pop() ?? "";
|
||||
|
||||
for (const line of parts) {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed === "") {
|
||||
continue;
|
||||
}
|
||||
let rec: unknown;
|
||||
try {
|
||||
rec = JSON.parse(trimmed) as unknown;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (rec === null || typeof rec !== "object") {
|
||||
continue;
|
||||
}
|
||||
const parsed = tryParseInfoRecord(rec as Record<string, unknown>);
|
||||
if (parsed === null) {
|
||||
continue;
|
||||
}
|
||||
printCliLine(formatLiveDebugLine(parsed.timestamp, parsed.tag, parsed.content));
|
||||
}
|
||||
}
|
||||
|
||||
type WatchPumpTask = {
|
||||
path: string;
|
||||
pump: () => Promise<number | null>;
|
||||
};
|
||||
|
||||
async function runWatchPumpStep(
|
||||
settled: () => boolean,
|
||||
pump: () => Promise<number | null>,
|
||||
closeAll: () => void,
|
||||
finish: (code: number) => void,
|
||||
): Promise<void> {
|
||||
if (settled()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const code = await pump();
|
||||
if (code !== null) {
|
||||
closeAll();
|
||||
finish(code);
|
||||
}
|
||||
} catch (e) {
|
||||
closeAll();
|
||||
throw e instanceof Error ? e : new Error(String(e));
|
||||
}
|
||||
}
|
||||
|
||||
function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }): Promise<number> {
|
||||
const { tasks, signal } = params;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let settled = false;
|
||||
@@ -168,65 +290,138 @@ function watchLiveFile(params: {
|
||||
resolve(code);
|
||||
};
|
||||
|
||||
/** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */
|
||||
let pumpChain: Promise<void> = Promise.resolve();
|
||||
const pumpChains = new Map<string, Promise<void>>();
|
||||
for (const t of tasks) {
|
||||
pumpChains.set(t.path, Promise.resolve());
|
||||
}
|
||||
|
||||
const watcher = watch(dataPath, (eventType) => {
|
||||
if (eventType === "rename") {
|
||||
return;
|
||||
const watchers: ReturnType<typeof watch>[] = [];
|
||||
|
||||
const closeAll = (): void => {
|
||||
for (const w of watchers) {
|
||||
w.close();
|
||||
}
|
||||
schedulePump();
|
||||
});
|
||||
};
|
||||
|
||||
watcher.on("error", (err: Error) => {
|
||||
watcher.close();
|
||||
reject(err);
|
||||
});
|
||||
function schedulePump(path: string, pump: () => Promise<number | null>): void {
|
||||
const prev = pumpChains.get(path) ?? Promise.resolve();
|
||||
const next = (async () => {
|
||||
await prev;
|
||||
await runWatchPumpStep(() => settled, pump, closeAll, finish);
|
||||
})();
|
||||
pumpChains.set(path, next);
|
||||
}
|
||||
|
||||
for (const { path, pump } of tasks) {
|
||||
const watcher = watch(path, (eventType) => {
|
||||
if (eventType === "rename") {
|
||||
return;
|
||||
}
|
||||
schedulePump(path, pump);
|
||||
});
|
||||
watchers.push(watcher);
|
||||
watcher.on("error", (err: Error) => {
|
||||
closeAll();
|
||||
reject(err);
|
||||
});
|
||||
}
|
||||
|
||||
const onAbort = (): void => {
|
||||
watcher.close();
|
||||
closeAll();
|
||||
finish(0);
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
async function drainQueuedPump(): Promise<void> {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const code = await pumpNewContent(dataPath, state);
|
||||
if (code !== null) {
|
||||
watcher.close();
|
||||
finish(code);
|
||||
}
|
||||
} catch (e) {
|
||||
watcher.close();
|
||||
reject(e instanceof Error ? e : new Error(String(e)));
|
||||
}
|
||||
for (const { path, pump } of tasks) {
|
||||
schedulePump(path, pump);
|
||||
}
|
||||
|
||||
function schedulePump(): void {
|
||||
pumpChain = pumpChain.then(() => drainQueuedPump());
|
||||
}
|
||||
|
||||
schedulePump();
|
||||
});
|
||||
}
|
||||
|
||||
export async function cmdLive(storageRoot: string, threadId: string): Promise<number> {
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
printCliError(`thread not found: ${threadId}`);
|
||||
type LiveThreadTarget = {
|
||||
threadId: string;
|
||||
dataPath: string;
|
||||
};
|
||||
|
||||
async function resolveLiveThreadTarget(
|
||||
storageRoot: string,
|
||||
parsed: ParsedLiveArgv,
|
||||
): Promise<LiveThreadTarget | null> {
|
||||
if (parsed.latest) {
|
||||
const found = await findLatestThreadDataPath(storageRoot);
|
||||
if (found === null) {
|
||||
printCliError("live: no threads found");
|
||||
return null;
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
const id = parsed.threadId;
|
||||
if (id === null) {
|
||||
printCliError("live: internal error: missing thread id");
|
||||
return null;
|
||||
}
|
||||
const resolved = await resolveThreadDataPath(storageRoot, id);
|
||||
if (resolved === null) {
|
||||
printCliError(`thread not found: ${id}`);
|
||||
return null;
|
||||
}
|
||||
return { threadId: id, dataPath: resolved };
|
||||
}
|
||||
|
||||
async function buildLiveWatchTasks(params: {
|
||||
dataPath: string;
|
||||
infoPath: string;
|
||||
debug: boolean;
|
||||
dataState: LiveSessionState;
|
||||
infoState: InfoLiveState;
|
||||
roleFilter: string | null;
|
||||
cas: CasStore;
|
||||
}): Promise<WatchPumpTask[]> {
|
||||
const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params;
|
||||
const tasks: WatchPumpTask[] = [
|
||||
{
|
||||
path: dataPath,
|
||||
pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas),
|
||||
},
|
||||
];
|
||||
|
||||
if (debug && (await pathExists(infoPath))) {
|
||||
tasks.push({
|
||||
path: infoPath,
|
||||
pump: async () => {
|
||||
await pumpNewInfoContent(infoPath, infoState);
|
||||
return null;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return tasks;
|
||||
}
|
||||
|
||||
export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Promise<number> {
|
||||
const target = await resolveLiveThreadTarget(storageRoot, parsed);
|
||||
if (target === null) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
const state: LiveSessionState = {
|
||||
const { threadId, dataPath } = target;
|
||||
const roleFilter = parsed.role;
|
||||
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
|
||||
const dataState: LiveSessionState = {
|
||||
sawStart: false,
|
||||
completed: false,
|
||||
carry: "",
|
||||
contentOffset: 0,
|
||||
};
|
||||
|
||||
const infoState: InfoLiveState = {
|
||||
carry: "",
|
||||
contentOffset: 0,
|
||||
};
|
||||
|
||||
const controller = new AbortController();
|
||||
const onSigInt = (): void => {
|
||||
controller.abort();
|
||||
@@ -234,16 +429,30 @@ export async function cmdLive(storageRoot: string, threadId: string): Promise<nu
|
||||
process.on("SIGINT", onSigInt);
|
||||
|
||||
try {
|
||||
const first = await pumpNewContent(dataPath, state);
|
||||
if (first !== null) {
|
||||
return first;
|
||||
const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas);
|
||||
if (firstData === 1) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (state.completed) {
|
||||
if (parsed.debug && (await pathExists(infoPath))) {
|
||||
await pumpNewInfoContent(infoPath, infoState);
|
||||
}
|
||||
|
||||
if (firstData === 0 || dataState.completed) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return await watchLiveFile({ dataPath, state, signal: controller.signal });
|
||||
const tasks = await buildLiveWatchTasks({
|
||||
dataPath,
|
||||
infoPath,
|
||||
debug: parsed.debug,
|
||||
dataState,
|
||||
infoState,
|
||||
roleFilter,
|
||||
cas,
|
||||
});
|
||||
|
||||
return await watchLivePaths({ tasks, signal: controller.signal });
|
||||
} catch (e) {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
printCliError(`live: ${message}`);
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
import { err, ok, type Result } from "@uncaged/workflow";
|
||||
|
||||
export type ParsedLiveArgv = {
|
||||
threadId: string | null;
|
||||
latest: boolean;
|
||||
debug: boolean;
|
||||
role: string | null;
|
||||
};
|
||||
|
||||
type LiveArgvScan = {
|
||||
latest: boolean;
|
||||
debug: boolean;
|
||||
role: string | null;
|
||||
threadId: string | null;
|
||||
};
|
||||
|
||||
function applyLiveArgvToken(argv: string[], i: number, s: LiveArgvScan): Result<number, string> {
|
||||
const a = argv[i];
|
||||
if (a === "--latest") {
|
||||
s.latest = true;
|
||||
return ok(i + 1);
|
||||
}
|
||||
if (a === "--debug") {
|
||||
s.debug = true;
|
||||
return ok(i + 1);
|
||||
}
|
||||
if (a === "--role") {
|
||||
const v = argv[i + 1];
|
||||
if (v === undefined || v.startsWith("--")) {
|
||||
return err("missing value for --role");
|
||||
}
|
||||
s.role = v;
|
||||
return ok(i + 2);
|
||||
}
|
||||
if (a.startsWith("--")) {
|
||||
return err(`unknown live flag: ${a}`);
|
||||
}
|
||||
if (s.threadId !== null) {
|
||||
return err("unexpected extra argument");
|
||||
}
|
||||
s.threadId = a;
|
||||
return ok(i + 1);
|
||||
}
|
||||
|
||||
export function parseLiveArgv(argv: string[]): Result<ParsedLiveArgv, string> {
|
||||
const s: LiveArgvScan = {
|
||||
latest: false,
|
||||
debug: false,
|
||||
role: null,
|
||||
threadId: null,
|
||||
};
|
||||
|
||||
let i = 0;
|
||||
while (i < argv.length) {
|
||||
const step = applyLiveArgvToken(argv, i, s);
|
||||
if (!step.ok) {
|
||||
return step;
|
||||
}
|
||||
i = step.value;
|
||||
}
|
||||
|
||||
if (s.latest && s.threadId !== null) {
|
||||
return err("live --latest does not take <thread-id>");
|
||||
}
|
||||
if (!s.latest && s.threadId === null) {
|
||||
return err("live requires <thread-id> or --latest");
|
||||
}
|
||||
|
||||
return ok({
|
||||
threadId: s.threadId,
|
||||
latest: s.latest,
|
||||
debug: s.debug,
|
||||
role: s.role,
|
||||
});
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { readdir } from "node:fs/promises";
|
||||
import { readdir, stat } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
|
||||
@@ -15,6 +15,28 @@ export type HistoricalThreadRow = {
|
||||
workflowName: string | null;
|
||||
};
|
||||
|
||||
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return null;
|
||||
}
|
||||
const firstLine = text.split("\n")[0];
|
||||
if (firstLine === undefined || firstLine.trim() === "") {
|
||||
return null;
|
||||
}
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(firstLine) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (parsed === null || typeof parsed !== "object") {
|
||||
return null;
|
||||
}
|
||||
const ts = (parsed as Record<string, unknown>).timestamp;
|
||||
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
|
||||
}
|
||||
|
||||
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
@@ -124,6 +146,50 @@ export async function listHistoricalThreads(
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
|
||||
* falling back to file `mtime` when the timestamp is missing.
|
||||
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
|
||||
*/
|
||||
export async function findLatestThreadDataPath(
|
||||
storageRoot: string,
|
||||
): Promise<{ threadId: string; dataPath: string } | null> {
|
||||
const threads = await listHistoricalThreads(storageRoot, null);
|
||||
if (threads.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let best: {
|
||||
threadId: string;
|
||||
dataPath: string;
|
||||
primary: number;
|
||||
secondary: number;
|
||||
} | null = null;
|
||||
|
||||
for (const t of threads) {
|
||||
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
|
||||
let mtimeMs = 0;
|
||||
try {
|
||||
const st = await stat(dataPath);
|
||||
mtimeMs = st.mtimeMs;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const startTs = await readThreadStartTimestampMs(dataPath);
|
||||
const primary = startTs !== null ? startTs : mtimeMs;
|
||||
const secondary = mtimeMs;
|
||||
if (
|
||||
best === null ||
|
||||
primary > best.primary ||
|
||||
(primary === best.primary && secondary > best.secondary)
|
||||
) {
|
||||
best = { threadId: t.threadId, dataPath, primary, secondary };
|
||||
}
|
||||
}
|
||||
|
||||
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
|
||||
}
|
||||
|
||||
export async function resolveThreadDataPath(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
|
||||
Reference in New Issue
Block a user