feat(cli): add live command for real-time thread monitoring (#37 Phase 1)
- Add cmd-live.ts: tail .data.jsonl with formatted output - Display role steps with timestamp, role name, truncated content, meta - fs.watch for running threads, auto-exit on completion - Write WorkflowResult to .data.jsonl in worker.ts for completion detection - Add live.test.ts with JSONL fixtures Testing: #49
This commit is contained in:
+4
@@ -0,0 +1,4 @@
|
|||||||
|
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000}
|
||||||
|
{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000}
|
||||||
|
{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000}
|
||||||
|
{"returnCode":0,"summary":"fixture completed"}
|
||||||
+2
@@ -0,0 +1,2 @@
|
|||||||
|
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000}
|
||||||
|
{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000}
|
||||||
@@ -111,7 +111,7 @@ describe("cli fork", () => {
|
|||||||
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
||||||
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
||||||
await waitUntilRunningAbsent(sourceRunning);
|
await waitUntilRunningAbsent(sourceRunning);
|
||||||
await waitUntilMinDataLines(sourceData, 4);
|
await waitUntilMinDataLines(sourceData, 5);
|
||||||
|
|
||||||
const forked = await cmdFork(storageRoot, sourceId, "planner");
|
const forked = await cmdFork(storageRoot, sourceId, "planner");
|
||||||
expect(forked.ok).toBe(true);
|
expect(forked.ok).toBe(true);
|
||||||
@@ -122,14 +122,14 @@ describe("cli fork", () => {
|
|||||||
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
||||||
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
||||||
await waitUntilRunningAbsent(newRunning);
|
await waitUntilRunningAbsent(newRunning);
|
||||||
await waitUntilMinDataLines(newData, 4);
|
await waitUntilMinDataLines(newData, 5);
|
||||||
|
|
||||||
const text = await readFile(newData, "utf8");
|
const text = await readFile(newData, "utf8");
|
||||||
const lines = text
|
const lines = text
|
||||||
.trim()
|
.trim()
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.filter((l) => l !== "");
|
.filter((l) => l !== "");
|
||||||
expect(lines.length).toBe(4);
|
expect(lines.length).toBe(5);
|
||||||
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
||||||
expect(start.threadId).toBe(newId);
|
expect(start.threadId).toBe(newId);
|
||||||
expect(start.forkFrom).toEqual({ threadId: sourceId });
|
expect(start.forkFrom).toEqual({ threadId: sourceId });
|
||||||
@@ -162,7 +162,7 @@ describe("cli fork", () => {
|
|||||||
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
||||||
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
||||||
await waitUntilRunningAbsent(sourceRunning);
|
await waitUntilRunningAbsent(sourceRunning);
|
||||||
await waitUntilMinDataLines(sourceData, 4);
|
await waitUntilMinDataLines(sourceData, 5);
|
||||||
|
|
||||||
const forked = await cmdFork(storageRoot, sourceId, null);
|
const forked = await cmdFork(storageRoot, sourceId, null);
|
||||||
expect(forked.ok).toBe(true);
|
expect(forked.ok).toBe(true);
|
||||||
@@ -173,14 +173,14 @@ describe("cli fork", () => {
|
|||||||
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
||||||
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
||||||
await waitUntilRunningAbsent(newRunning);
|
await waitUntilRunningAbsent(newRunning);
|
||||||
await waitUntilMinDataLines(newData, 4);
|
await waitUntilMinDataLines(newData, 5);
|
||||||
|
|
||||||
const text = await readFile(newData, "utf8");
|
const text = await readFile(newData, "utf8");
|
||||||
const lines = text
|
const lines = text
|
||||||
.trim()
|
.trim()
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.filter((l) => l !== "");
|
.filter((l) => l !== "");
|
||||||
expect(lines.length).toBe(4);
|
expect(lines.length).toBe(5);
|
||||||
|
|
||||||
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
||||||
expect(replayCoder.role).toBe("coder");
|
expect(replayCoder.role).toBe("coder");
|
||||||
@@ -213,7 +213,7 @@ describe("cli fork", () => {
|
|||||||
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
|
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
|
||||||
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
|
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
|
||||||
await waitUntilRunningAbsent(sourceRunning);
|
await waitUntilRunningAbsent(sourceRunning);
|
||||||
await waitUntilMinDataLines(sourceData, 4);
|
await waitUntilMinDataLines(sourceData, 5);
|
||||||
|
|
||||||
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
|
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
|
||||||
expect(bad.ok).toBe(false);
|
expect(bad.ok).toBe(false);
|
||||||
|
|||||||
@@ -0,0 +1,153 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||||
|
import { spawn, spawnSync } from "node:child_process";
|
||||||
|
import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
|
||||||
|
import { tmpdir } from "node:os";
|
||||||
|
import { join } from "node:path";
|
||||||
|
import { fileURLToPath } from "node:url";
|
||||||
|
|
||||||
|
import {
|
||||||
|
formatLiveTimeLabel,
|
||||||
|
LIVE_CONTENT_MAX_LINES,
|
||||||
|
type LiveRoleRow,
|
||||||
|
renderLiveRoleStepLines,
|
||||||
|
} from "../src/cmd-live.js";
|
||||||
|
|
||||||
|
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
|
||||||
|
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
|
||||||
|
|
||||||
|
describe("live helpers", () => {
|
||||||
|
test("formatLiveTimeLabel pads HH:MM:SS", () => {
|
||||||
|
const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime());
|
||||||
|
expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => {
|
||||||
|
const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`);
|
||||||
|
const row: LiveRoleRow = {
|
||||||
|
role: "r",
|
||||||
|
content: lines.join("\n"),
|
||||||
|
meta: { k: "v" },
|
||||||
|
timestamp: 0,
|
||||||
|
};
|
||||||
|
const out = renderLiveRoleStepLines(row, "r");
|
||||||
|
const body = out.filter((l) => l.startsWith(" L"));
|
||||||
|
expect(body.length).toBe(LIVE_CONTENT_MAX_LINES);
|
||||||
|
expect(out.some((l) => l.includes("more line"))).toBe(true);
|
||||||
|
expect(out.some((l) => l.startsWith(" meta: "))).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("live CLI", () => {
|
||||||
|
let prevEnv: string | undefined;
|
||||||
|
let storageRoot: string;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||||
|
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-"));
|
||||||
|
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
|
||||||
|
await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true });
|
||||||
|
await cp(
|
||||||
|
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||||
|
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||||
|
);
|
||||||
|
await cp(
|
||||||
|
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||||
|
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
if (prevEnv === undefined) {
|
||||||
|
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||||
|
} else {
|
||||||
|
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
|
||||||
|
}
|
||||||
|
await rm(storageRoot, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
test("prints role steps and summary for a completed thread", async () => {
|
||||||
|
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||||
|
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], {
|
||||||
|
env,
|
||||||
|
stdio: ["ignore", "pipe", "pipe"],
|
||||||
|
});
|
||||||
|
const stdout = await new Promise<string>((resolve, reject) => {
|
||||||
|
let buf = "";
|
||||||
|
proc.stdout?.on("data", (c: Buffer) => {
|
||||||
|
buf += c.toString("utf8");
|
||||||
|
});
|
||||||
|
proc.stderr?.on("data", (c: Buffer) => {
|
||||||
|
buf += c.toString("utf8");
|
||||||
|
});
|
||||||
|
proc.on("error", reject);
|
||||||
|
proc.on("exit", (code: number | null) => {
|
||||||
|
if (code === 0) {
|
||||||
|
resolve(buf);
|
||||||
|
} else {
|
||||||
|
reject(new Error(`exit ${code}: ${buf}`));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(stdout).toContain("planner");
|
||||||
|
expect(stdout).toContain("coder");
|
||||||
|
expect(stdout).toContain("meta:");
|
||||||
|
expect(stdout).toContain('"phase":"plan"');
|
||||||
|
expect(stdout).toContain("LINE10");
|
||||||
|
expect(stdout).not.toContain("LINE11");
|
||||||
|
expect(stdout).toContain("more line");
|
||||||
|
expect(stdout).toContain("completed: returnCode=0");
|
||||||
|
expect(stdout).toContain("fixture completed");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("unknown thread id exits 1", () => {
|
||||||
|
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||||
|
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
|
||||||
|
env,
|
||||||
|
encoding: "utf8",
|
||||||
|
});
|
||||||
|
expect(r.status).toBe(1);
|
||||||
|
expect(String(r.stderr ?? "")).toContain("thread not found");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("follows file until WorkflowResult is appended", async () => {
|
||||||
|
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||||
|
const dataPath = join(
|
||||||
|
storageRoot,
|
||||||
|
"logs",
|
||||||
|
"C9NMV6V2TQT81",
|
||||||
|
"01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl",
|
||||||
|
);
|
||||||
|
|
||||||
|
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], {
|
||||||
|
env,
|
||||||
|
stdio: ["ignore", "pipe", "pipe"],
|
||||||
|
});
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 120));
|
||||||
|
const prior = await readFile(dataPath, "utf8");
|
||||||
|
await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8");
|
||||||
|
|
||||||
|
const stdout = await new Promise<string>((resolve, reject) => {
|
||||||
|
let buf = "";
|
||||||
|
proc.stdout?.on("data", (c: Buffer) => {
|
||||||
|
buf += c.toString("utf8");
|
||||||
|
});
|
||||||
|
proc.stderr?.on("data", (c: Buffer) => {
|
||||||
|
buf += c.toString("utf8");
|
||||||
|
});
|
||||||
|
proc.on("error", reject);
|
||||||
|
proc.on("exit", (code: number | null) => {
|
||||||
|
if (code === 0) {
|
||||||
|
resolve(buf);
|
||||||
|
} else {
|
||||||
|
reject(new Error(`exit ${code}: ${buf}`));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(stdout).toContain("planner");
|
||||||
|
expect(stdout).toContain("completed: returnCode=0");
|
||||||
|
expect(stdout).toContain("caught up");
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -323,7 +323,7 @@ describe("cli thread commands", () => {
|
|||||||
.trim()
|
.trim()
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.filter((l) => l !== "");
|
.filter((l) => l !== "");
|
||||||
expect(lines.length).toBe(2);
|
expect(lines.length).toBe(3);
|
||||||
|
|
||||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||||
expect(await pathExists(runningPath)).toBe(false);
|
expect(await pathExists(runningPath)).toBe(false);
|
||||||
@@ -362,8 +362,8 @@ describe("cli thread commands", () => {
|
|||||||
const resumed = await cmdResume(storageRoot, threadId);
|
const resumed = await cmdResume(storageRoot, threadId);
|
||||||
expect(resumed.ok).toBe(true);
|
expect(resumed.ok).toBe(true);
|
||||||
|
|
||||||
await waitUntilMinDataLines(dataPath, 3, 120);
|
await waitUntilMinDataLines(dataPath, 4, 120);
|
||||||
expect(await countDataJsonlLines(dataPath)).toBe(3);
|
expect(await countDataJsonlLines(dataPath)).toBe(4);
|
||||||
|
|
||||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||||
await waitUntilRunningFileAbsent(runningPath, 100);
|
await waitUntilRunningFileAbsent(runningPath, 100);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import { cmdHistory } from "./cmd-history.js";
|
|||||||
import { cmdInitTemplate, cmdInitWorkspace } from "./cmd-init.js";
|
import { cmdInitTemplate, cmdInitWorkspace } from "./cmd-init.js";
|
||||||
import { cmdKill } from "./cmd-kill.js";
|
import { cmdKill } from "./cmd-kill.js";
|
||||||
import { cmdList, formatListLines } from "./cmd-list.js";
|
import { cmdList, formatListLines } from "./cmd-list.js";
|
||||||
|
import { cmdLive } from "./cmd-live.js";
|
||||||
import { cmdPause } from "./cmd-pause.js";
|
import { cmdPause } from "./cmd-pause.js";
|
||||||
import { cmdPs } from "./cmd-ps.js";
|
import { cmdPs } from "./cmd-ps.js";
|
||||||
import { cmdRemove } from "./cmd-remove.js";
|
import { cmdRemove } from "./cmd-remove.js";
|
||||||
@@ -28,6 +29,7 @@ export function formatCliUsage(): string {
|
|||||||
" uncaged-workflow run <name> [--prompt <text>] [--max-rounds N]",
|
" uncaged-workflow run <name> [--prompt <text>] [--max-rounds N]",
|
||||||
" uncaged-workflow ps",
|
" uncaged-workflow ps",
|
||||||
" uncaged-workflow kill <thread-id>",
|
" uncaged-workflow kill <thread-id>",
|
||||||
|
" uncaged-workflow live <thread-id>",
|
||||||
" uncaged-workflow history <name>",
|
" uncaged-workflow history <name>",
|
||||||
" uncaged-workflow rollback <name> [hash]",
|
" uncaged-workflow rollback <name> [hash]",
|
||||||
" uncaged-workflow pause <thread-id>",
|
" uncaged-workflow pause <thread-id>",
|
||||||
@@ -190,6 +192,15 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise<number
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function dispatchLive(storageRoot: string, argv: string[]): Promise<number> {
|
||||||
|
const threadId = argv[0];
|
||||||
|
if (threadId === undefined || argv.length > 1) {
|
||||||
|
printCliError(`${usage()}\n\nerror: live requires <thread-id>`);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return cmdLive(storageRoot, threadId);
|
||||||
|
}
|
||||||
|
|
||||||
async function dispatchHistory(storageRoot: string, argv: string[]): Promise<number> {
|
async function dispatchHistory(storageRoot: string, argv: string[]): Promise<number> {
|
||||||
const name = argv[0];
|
const name = argv[0];
|
||||||
if (name === undefined || argv.length > 1) {
|
if (name === undefined || argv.length > 1) {
|
||||||
@@ -435,6 +446,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
|
|||||||
run: dispatchRun,
|
run: dispatchRun,
|
||||||
ps: dispatchPs,
|
ps: dispatchPs,
|
||||||
kill: dispatchKill,
|
kill: dispatchKill,
|
||||||
|
live: dispatchLive,
|
||||||
history: dispatchHistory,
|
history: dispatchHistory,
|
||||||
rollback: dispatchRollback,
|
rollback: dispatchRollback,
|
||||||
pause: dispatchPause,
|
pause: dispatchPause,
|
||||||
|
|||||||
@@ -0,0 +1,254 @@
|
|||||||
|
import { watch } from "node:fs";
|
||||||
|
import { readFile } from "node:fs/promises";
|
||||||
|
|
||||||
|
import {
|
||||||
|
tryParseRoleStepRecord,
|
||||||
|
tryParseWorkflowResultRecord,
|
||||||
|
type WorkflowResult,
|
||||||
|
} from "@uncaged/workflow";
|
||||||
|
|
||||||
|
import { printCliError, printCliLine } from "./cli-output.js";
|
||||||
|
import { resolveThreadDataPath } from "./thread-scan.js";
|
||||||
|
|
||||||
|
export const LIVE_CONTENT_MAX_LINES = 10;
|
||||||
|
|
||||||
|
export type LiveRoleRow = {
|
||||||
|
role: string;
|
||||||
|
content: string;
|
||||||
|
meta: Record<string, unknown>;
|
||||||
|
timestamp: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function formatLiveTimeLabel(timestampMs: number): string {
|
||||||
|
const d = new Date(timestampMs);
|
||||||
|
const hh = String(d.getHours()).padStart(2, "0");
|
||||||
|
const mm = String(d.getMinutes()).padStart(2, "0");
|
||||||
|
const ss = String(d.getSeconds()).padStart(2, "0");
|
||||||
|
return `${hh}:${mm}:${ss}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
function shouldUseColor(): boolean {
|
||||||
|
return process.stdout.isTTY === true && process.env.NO_COLOR === undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
function highlightLiveRole(name: string): string {
|
||||||
|
if (!shouldUseColor()) {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
return `\x1b[1m\x1b[36m${name}\x1b[0m`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] {
|
||||||
|
const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`;
|
||||||
|
const lines: string[] = [header];
|
||||||
|
const parts = row.content.split("\n");
|
||||||
|
const shown = parts.slice(0, LIVE_CONTENT_MAX_LINES);
|
||||||
|
for (const ln of shown) {
|
||||||
|
lines.push(` ${ln}`);
|
||||||
|
}
|
||||||
|
const omitted = parts.length - shown.length;
|
||||||
|
if (omitted > 0) {
|
||||||
|
lines.push(` … (${omitted} more line${omitted === 1 ? "" : "s"})`);
|
||||||
|
}
|
||||||
|
lines.push(` meta: ${JSON.stringify(row.meta)}`);
|
||||||
|
return lines;
|
||||||
|
}
|
||||||
|
|
||||||
|
function printSummary(result: WorkflowResult): void {
|
||||||
|
printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
type LiveSessionState = {
|
||||||
|
sawStart: boolean;
|
||||||
|
completed: boolean;
|
||||||
|
carry: string;
|
||||||
|
contentOffset: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
function handleJsonlLine(
|
||||||
|
rawLine: string,
|
||||||
|
state: LiveSessionState,
|
||||||
|
): { parseError: string | null; workflowResult: WorkflowResult | null } {
|
||||||
|
const trimmed = rawLine.trim();
|
||||||
|
if (trimmed === "") {
|
||||||
|
return { parseError: null, workflowResult: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
let rec: unknown;
|
||||||
|
try {
|
||||||
|
rec = JSON.parse(trimmed) as unknown;
|
||||||
|
} catch {
|
||||||
|
return { parseError: "invalid JSON in thread data file", workflowResult: null };
|
||||||
|
}
|
||||||
|
if (rec === null || typeof rec !== "object") {
|
||||||
|
return { parseError: "invalid record in thread data file", workflowResult: null };
|
||||||
|
}
|
||||||
|
const obj = rec as Record<string, unknown>;
|
||||||
|
|
||||||
|
if (!state.sawStart) {
|
||||||
|
state.sawStart = true;
|
||||||
|
return { parseError: null, workflowResult: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
const wf = tryParseWorkflowResultRecord(obj);
|
||||||
|
if (wf !== null) {
|
||||||
|
state.completed = true;
|
||||||
|
return { parseError: null, workflowResult: wf };
|
||||||
|
}
|
||||||
|
|
||||||
|
const roleRow = tryParseRoleStepRecord(obj);
|
||||||
|
if (roleRow === null) {
|
||||||
|
return {
|
||||||
|
parseError: "unrecognized record in thread data (expected role step or result)",
|
||||||
|
workflowResult: null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const row: LiveRoleRow = {
|
||||||
|
role: roleRow.role,
|
||||||
|
content: roleRow.content,
|
||||||
|
meta: roleRow.meta,
|
||||||
|
timestamp: roleRow.timestamp,
|
||||||
|
};
|
||||||
|
for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) {
|
||||||
|
printCliLine(outLine);
|
||||||
|
}
|
||||||
|
return { parseError: null, workflowResult: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise<number | null> {
|
||||||
|
let text: string;
|
||||||
|
try {
|
||||||
|
text = await readFile(dataPath, "utf8");
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (text.length < state.contentOffset) {
|
||||||
|
state.contentOffset = 0;
|
||||||
|
state.carry = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunk = text.slice(state.contentOffset);
|
||||||
|
state.contentOffset = text.length;
|
||||||
|
state.carry += chunk;
|
||||||
|
|
||||||
|
const parts = state.carry.split("\n");
|
||||||
|
state.carry = parts.pop() ?? "";
|
||||||
|
|
||||||
|
for (const line of parts) {
|
||||||
|
const { parseError, workflowResult } = handleJsonlLine(line, state);
|
||||||
|
if (parseError !== null) {
|
||||||
|
printCliError(parseError);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (workflowResult !== null) {
|
||||||
|
printSummary(workflowResult);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function watchLiveFile(params: {
|
||||||
|
dataPath: string;
|
||||||
|
state: LiveSessionState;
|
||||||
|
signal: AbortSignal;
|
||||||
|
}): Promise<number> {
|
||||||
|
const { dataPath, state, signal } = params;
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let settled = false;
|
||||||
|
const finish = (code: number): void => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
resolve(code);
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */
|
||||||
|
let pumpChain: Promise<void> = Promise.resolve();
|
||||||
|
|
||||||
|
const watcher = watch(dataPath, (eventType) => {
|
||||||
|
if (eventType === "rename") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
schedulePump();
|
||||||
|
});
|
||||||
|
|
||||||
|
watcher.on("error", (err: Error) => {
|
||||||
|
watcher.close();
|
||||||
|
reject(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
const onAbort = (): void => {
|
||||||
|
watcher.close();
|
||||||
|
finish(0);
|
||||||
|
};
|
||||||
|
signal.addEventListener("abort", onAbort, { once: true });
|
||||||
|
|
||||||
|
async function drainQueuedPump(): Promise<void> {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const code = await pumpNewContent(dataPath, state);
|
||||||
|
if (code !== null) {
|
||||||
|
watcher.close();
|
||||||
|
finish(code);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
watcher.close();
|
||||||
|
reject(e instanceof Error ? e : new Error(String(e)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function schedulePump(): void {
|
||||||
|
pumpChain = pumpChain.then(() => drainQueuedPump());
|
||||||
|
}
|
||||||
|
|
||||||
|
schedulePump();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function cmdLive(storageRoot: string, threadId: string): Promise<number> {
|
||||||
|
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||||
|
if (dataPath === null) {
|
||||||
|
printCliError(`thread not found: ${threadId}`);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const state: LiveSessionState = {
|
||||||
|
sawStart: false,
|
||||||
|
completed: false,
|
||||||
|
carry: "",
|
||||||
|
contentOffset: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const controller = new AbortController();
|
||||||
|
const onSigInt = (): void => {
|
||||||
|
controller.abort();
|
||||||
|
};
|
||||||
|
process.on("SIGINT", onSigInt);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const first = await pumpNewContent(dataPath, state);
|
||||||
|
if (first !== null) {
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state.completed) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return await watchLiveFile({ dataPath, state, signal: controller.signal });
|
||||||
|
} catch (e) {
|
||||||
|
const message = e instanceof Error ? e.message : String(e);
|
||||||
|
printCliError(`live: ${message}`);
|
||||||
|
return 1;
|
||||||
|
} finally {
|
||||||
|
process.off("SIGINT", onSigInt);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -87,6 +87,26 @@ describe("fork-thread", () => {
|
|||||||
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
|
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("parseThreadDataJsonl ignores trailing WorkflowResult line", () => {
|
||||||
|
const text = `${sampleDataJsonl.trim()}\n{"returnCode":0,"summary":"done"}\n`;
|
||||||
|
const r = parseThreadDataJsonl(text);
|
||||||
|
expect(r.ok).toBe(true);
|
||||||
|
if (!r.ok) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
expect(r.value.roleSteps.length).toBe(3);
|
||||||
|
expect(r.value.roleSteps[2]?.role).toBe("reviewer");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("parseThreadDataJsonl errors when WorkflowResult is not last", () => {
|
||||||
|
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3}},"timestamp":1}
|
||||||
|
{"returnCode":0,"summary":"early"}
|
||||||
|
{"role":"planner","content":"x","meta":{},"timestamp":2}
|
||||||
|
`;
|
||||||
|
const r = parseThreadDataJsonl(text);
|
||||||
|
expect(r.ok).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
test("parseThreadDataJsonl reads explicit depth from start record", () => {
|
test("parseThreadDataJsonl reads explicit depth from start record", () => {
|
||||||
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
|
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
|
||||||
{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2}
|
{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2}
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ describe("worker process", () => {
|
|||||||
.trim()
|
.trim()
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.filter((l) => l !== "").length,
|
.filter((l) => l !== "").length,
|
||||||
).toBe(3);
|
).toBe(4);
|
||||||
} finally {
|
} finally {
|
||||||
await rm(root, { recursive: true, force: true });
|
await rm(root, { recursive: true, force: true });
|
||||||
}
|
}
|
||||||
@@ -187,7 +187,7 @@ describe("worker process", () => {
|
|||||||
.trim()
|
.trim()
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.filter((l) => l !== "");
|
.filter((l) => l !== "");
|
||||||
expect(lines.length).toBe(3);
|
expect(lines.length).toBe(4);
|
||||||
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
||||||
expect(start.forkFrom).toEqual({ threadId: srcId });
|
expect(start.forkFrom).toEqual({ threadId: srcId });
|
||||||
const replay = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
|
const replay = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
|
||||||
@@ -195,6 +195,8 @@ describe("worker process", () => {
|
|||||||
expect(replay.timestamp).toBe(555);
|
expect(replay.timestamp).toBe(555);
|
||||||
const coder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
const coder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
||||||
expect(coder.role).toBe("coder");
|
expect(coder.role).toBe("coder");
|
||||||
|
const done = JSON.parse(lines[3] ?? "{}") as Record<string, unknown>;
|
||||||
|
expect(done.returnCode).toBe(0);
|
||||||
} finally {
|
} finally {
|
||||||
await rm(root, { recursive: true, force: true });
|
await rm(root, { recursive: true, force: true });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { normalizeRefsField } from "./refs-field.js";
|
import { normalizeRefsField } from "./refs-field.js";
|
||||||
import { err, ok, type Result } from "./result.js";
|
import { err, ok, type Result } from "./result.js";
|
||||||
import type { RoleOutput } from "./types.js";
|
import type { RoleOutput, WorkflowResult } from "./types.js";
|
||||||
|
|
||||||
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
|
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
|
||||||
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
|
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
|
||||||
@@ -14,33 +14,54 @@ export type ParsedThreadStartRecord = {
|
|||||||
depth: number;
|
depth: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
function parseRoleLine(
|
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */
|
||||||
obj: Record<string, unknown>,
|
export function tryParseWorkflowResultRecord(obj: Record<string, unknown>): WorkflowResult | null {
|
||||||
lineIndex: number,
|
if (obj.role !== undefined) {
|
||||||
): Result<ForkHistoricalStep, string> {
|
return null;
|
||||||
|
}
|
||||||
|
const returnCode = obj.returnCode;
|
||||||
|
const summary = obj.summary;
|
||||||
|
if (typeof returnCode !== "number" || typeof summary !== "string") {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return { returnCode, summary };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
|
||||||
const role = obj.role;
|
const role = obj.role;
|
||||||
const contentHash = obj.contentHash;
|
const contentHash = obj.contentHash;
|
||||||
const meta = obj.meta;
|
const meta = obj.meta;
|
||||||
const timestamp = obj.timestamp;
|
const timestamp = obj.timestamp;
|
||||||
if (typeof role !== "string") {
|
if (typeof role !== "string") {
|
||||||
return err(`invalid role record at line ${lineIndex}: missing role`);
|
return null;
|
||||||
}
|
}
|
||||||
if (typeof contentHash !== "string") {
|
if (typeof contentHash !== "string") {
|
||||||
return err(`invalid role record at line ${lineIndex}: missing contentHash`);
|
return null;
|
||||||
}
|
}
|
||||||
if (meta === null || typeof meta !== "object") {
|
if (meta === null || typeof meta !== "object") {
|
||||||
return err(`invalid role record at line ${lineIndex}: missing meta`);
|
return null;
|
||||||
}
|
}
|
||||||
if (typeof timestamp !== "number") {
|
if (typeof timestamp !== "number") {
|
||||||
return err(`invalid role record at line ${lineIndex}: missing timestamp`);
|
return null;
|
||||||
}
|
}
|
||||||
return ok({
|
return {
|
||||||
role,
|
role,
|
||||||
contentHash,
|
contentHash,
|
||||||
meta: meta as Record<string, unknown>,
|
meta: meta as Record<string, unknown>,
|
||||||
refs: normalizeRefsField(obj.refs),
|
refs: normalizeRefsField(obj.refs),
|
||||||
timestamp,
|
timestamp,
|
||||||
});
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseRoleLine(
|
||||||
|
obj: Record<string, unknown>,
|
||||||
|
lineIndex: number,
|
||||||
|
): Result<ForkHistoricalStep, string> {
|
||||||
|
const parsed = tryParseRoleStepRecord(obj);
|
||||||
|
if (parsed === null) {
|
||||||
|
return err(`invalid role record at line ${lineIndex}`);
|
||||||
|
}
|
||||||
|
return ok(parsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
|
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
|
||||||
@@ -109,7 +130,15 @@ function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[],
|
|||||||
if (rec === null || typeof rec !== "object") {
|
if (rec === null || typeof rec !== "object") {
|
||||||
return err(`invalid record at line ${i + 1}`);
|
return err(`invalid record at line ${i + 1}`);
|
||||||
}
|
}
|
||||||
const parsed = parseRoleLine(rec as Record<string, unknown>, i + 1);
|
const recObj = rec as Record<string, unknown>;
|
||||||
|
const wf = tryParseWorkflowResultRecord(recObj);
|
||||||
|
if (wf !== null) {
|
||||||
|
if (i !== lines.length - 1) {
|
||||||
|
return err("WorkflowResult record must be the final line in `.data.jsonl`");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const parsed = parseRoleLine(recObj, i + 1);
|
||||||
if (!parsed.ok) {
|
if (!parsed.ok) {
|
||||||
return parsed;
|
return parsed;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ export {
|
|||||||
type ParsedThreadStartRecord,
|
type ParsedThreadStartRecord,
|
||||||
parseThreadDataJsonl,
|
parseThreadDataJsonl,
|
||||||
selectForkHistoricalSteps,
|
selectForkHistoricalSteps,
|
||||||
|
tryParseRoleStepRecord,
|
||||||
|
tryParseWorkflowResultRecord,
|
||||||
} from "./fork-thread.js";
|
} from "./fork-thread.js";
|
||||||
export { type GcResult, garbageCollectCas } from "./gc.js";
|
export { type GcResult, garbageCollectCas } from "./gc.js";
|
||||||
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
|
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { mkdir, unlink, writeFile } from "node:fs/promises";
|
import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
|
||||||
import { createServer, type Socket } from "node:net";
|
import { createServer, type Socket } from "node:net";
|
||||||
import { dirname, join } from "node:path";
|
import { dirname, join } from "node:path";
|
||||||
import { importWorkflowBundleModule } from "./bundle-import-env.js";
|
import { importWorkflowBundleModule } from "./bundle-import-env.js";
|
||||||
@@ -11,7 +11,7 @@ import { normalizeRefsField } from "./refs-field.js";
|
|||||||
import { err, ok, type Result } from "./result.js";
|
import { err, ok, type Result } from "./result.js";
|
||||||
import { getGlobalCasDir } from "./storage-root.js";
|
import { getGlobalCasDir } from "./storage-root.js";
|
||||||
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
|
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
|
||||||
import type { RoleOutput, WorkflowFn } from "./types.js";
|
import type { RoleOutput, WorkflowFn, WorkflowResult } from "./types.js";
|
||||||
|
|
||||||
const bootLog = createLogger({ sink: { kind: "stderr" } });
|
const bootLog = createLogger({ sink: { kind: "stderr" } });
|
||||||
|
|
||||||
@@ -404,7 +404,7 @@ async function main(): Promise<void> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
await executeThread(
|
const runResult = await executeThread(
|
||||||
workflowFn,
|
workflowFn,
|
||||||
cmd.workflowName,
|
cmd.workflowName,
|
||||||
{ prompt: cmd.prompt, steps: cmd.steps },
|
{ prompt: cmd.prompt, steps: cmd.steps },
|
||||||
@@ -418,9 +418,12 @@ async function main(): Promise<void> {
|
|||||||
io,
|
io,
|
||||||
logger,
|
logger,
|
||||||
);
|
);
|
||||||
|
await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8");
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
const message = e instanceof Error ? e.message : String(e);
|
const message = e instanceof Error ? e.message : String(e);
|
||||||
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
|
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
|
||||||
|
const failure: WorkflowResult = { returnCode: 1, summary: message };
|
||||||
|
await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {});
|
||||||
} finally {
|
} finally {
|
||||||
threads.delete(threadId);
|
threads.delete(threadId);
|
||||||
await unlink(runningPath).catch(() => {});
|
await unlink(runningPath).catch(() => {});
|
||||||
|
|||||||
Reference in New Issue
Block a user