diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..5076f72 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,4 @@ +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} +{"role":"planner","content":"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000} +{"role":"coder","content":"patch","meta":{},"refs":[],"timestamp":1714963202000} +{"returnCode":0,"summary":"fixture completed"} diff --git a/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl new file mode 100644 index 0000000..7ad5878 --- /dev/null +++ b/packages/cli-workflow/__tests__/fixtures/live/logs/C9NMV6V2TQT81/01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl @@ -0,0 +1,2 @@ +{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000} +{"role":"planner","content":"still running","meta":{"x":1},"refs":[],"timestamp":1714963201000} diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 9dbc64e..6f9c88b 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -111,7 +111,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const forked = await cmdFork(storageRoot, sourceId, "planner"); expect(forked.ok).toBe(true); @@ -122,14 +122,14 @@ describe("cli fork", () => { const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 4); + await waitUntilMinDataLines(newData, 5); const text = await readFile(newData, "utf8"); const lines = text .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(4); + expect(lines.length).toBe(5); const start = JSON.parse(lines[0] ?? "{}") as Record; expect(start.threadId).toBe(newId); expect(start.forkFrom).toEqual({ threadId: sourceId }); @@ -162,7 +162,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const forked = await cmdFork(storageRoot, sourceId, null); expect(forked.ok).toBe(true); @@ -173,14 +173,14 @@ describe("cli fork", () => { const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 4); + await waitUntilMinDataLines(newData, 5); const text = await readFile(newData, "utf8"); const lines = text .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(4); + expect(lines.length).toBe(5); const replayCoder = JSON.parse(lines[2] ?? "{}") as Record; expect(replayCoder.role).toBe("coder"); @@ -213,7 +213,7 @@ describe("cli fork", () => { const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 4); + await waitUntilMinDataLines(sourceData, 5); const bad = await cmdFork(storageRoot, sourceId, "ghost-role"); expect(bad.ok).toBe(false); diff --git a/packages/cli-workflow/__tests__/live.test.ts b/packages/cli-workflow/__tests__/live.test.ts new file mode 100644 index 0000000..38d9ca3 --- /dev/null +++ b/packages/cli-workflow/__tests__/live.test.ts @@ -0,0 +1,153 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { spawn, spawnSync } from "node:child_process"; +import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import { + formatLiveTimeLabel, + LIVE_CONTENT_MAX_LINES, + type LiveRoleRow, + renderLiveRoleStepLines, +} from "../src/cmd-live.js"; + +const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); +const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url)); + +describe("live helpers", () => { + test("formatLiveTimeLabel pads HH:MM:SS", () => { + const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime()); + expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/); + }); + + test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => { + const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`); + const row: LiveRoleRow = { + role: "r", + content: lines.join("\n"), + meta: { k: "v" }, + timestamp: 0, + }; + const out = renderLiveRoleStepLines(row, "r"); + const body = out.filter((l) => l.startsWith(" L")); + expect(body.length).toBe(LIVE_CONTENT_MAX_LINES); + expect(out.some((l) => l.includes("more line"))).toBe(true); + expect(out.some((l) => l.startsWith(" meta: "))).toBe(true); + }); +}); + +describe("live CLI", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true }); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), + ); + await cp( + join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), + join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), + ); + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("prints role steps and summary for a completed thread", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).toContain("coder"); + expect(stdout).toContain("meta:"); + expect(stdout).toContain('"phase":"plan"'); + expect(stdout).toContain("LINE10"); + expect(stdout).not.toContain("LINE11"); + expect(stdout).toContain("more line"); + expect(stdout).toContain("completed: returnCode=0"); + expect(stdout).toContain("fixture completed"); + }); + + test("unknown thread id exits 1", () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], { + env, + encoding: "utf8", + }); + expect(r.status).toBe(1); + expect(String(r.stderr ?? "")).toContain("thread not found"); + }); + + test("follows file until WorkflowResult is appended", async () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const dataPath = join( + storageRoot, + "logs", + "C9NMV6V2TQT81", + "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl", + ); + + const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], { + env, + stdio: ["ignore", "pipe", "pipe"], + }); + + await new Promise((r) => setTimeout(r, 120)); + const prior = await readFile(dataPath, "utf8"); + await writeFile(dataPath, `${prior}{"returnCode":0,"summary":"caught up"}\n`, "utf8"); + + const stdout = await new Promise((resolve, reject) => { + let buf = ""; + proc.stdout?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.stderr?.on("data", (c: Buffer) => { + buf += c.toString("utf8"); + }); + proc.on("error", reject); + proc.on("exit", (code: number | null) => { + if (code === 0) { + resolve(buf); + } else { + reject(new Error(`exit ${code}: ${buf}`)); + } + }); + }); + + expect(stdout).toContain("planner"); + expect(stdout).toContain("completed: returnCode=0"); + expect(stdout).toContain("caught up"); + }); +}); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 8e76b04..779a205 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -323,7 +323,7 @@ describe("cli thread commands", () => { .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(2); + expect(lines.length).toBe(3); const runningPath = join(dirname(dataPath), `${threadId}.running`); expect(await pathExists(runningPath)).toBe(false); @@ -362,8 +362,8 @@ describe("cli thread commands", () => { const resumed = await cmdResume(storageRoot, threadId); expect(resumed.ok).toBe(true); - await waitUntilMinDataLines(dataPath, 3, 120); - expect(await countDataJsonlLines(dataPath)).toBe(3); + await waitUntilMinDataLines(dataPath, 4, 120); + expect(await countDataJsonlLines(dataPath)).toBe(4); const runningPath = join(dirname(dataPath), `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index d40fe07..47828c0 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -7,6 +7,7 @@ import { cmdHistory } from "./cmd-history.js"; import { cmdInitTemplate, cmdInitWorkspace } from "./cmd-init.js"; import { cmdKill } from "./cmd-kill.js"; import { cmdList, formatListLines } from "./cmd-list.js"; +import { cmdLive } from "./cmd-live.js"; import { cmdPause } from "./cmd-pause.js"; import { cmdPs } from "./cmd-ps.js"; import { cmdRemove } from "./cmd-remove.js"; @@ -28,6 +29,7 @@ export function formatCliUsage(): string { " uncaged-workflow run [--prompt ] [--max-rounds N]", " uncaged-workflow ps", " uncaged-workflow kill ", + " uncaged-workflow live ", " uncaged-workflow history ", " uncaged-workflow rollback [hash]", " uncaged-workflow pause ", @@ -190,6 +192,15 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise { + const threadId = argv[0]; + if (threadId === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: live requires `); + return 1; + } + return cmdLive(storageRoot, threadId); +} + async function dispatchHistory(storageRoot: string, argv: string[]): Promise { const name = argv[0]; if (name === undefined || argv.length > 1) { @@ -435,6 +446,7 @@ const COMMAND_TABLE: Record = { run: dispatchRun, ps: dispatchPs, kill: dispatchKill, + live: dispatchLive, history: dispatchHistory, rollback: dispatchRollback, pause: dispatchPause, diff --git a/packages/cli-workflow/src/cmd-live.ts b/packages/cli-workflow/src/cmd-live.ts new file mode 100644 index 0000000..468fae3 --- /dev/null +++ b/packages/cli-workflow/src/cmd-live.ts @@ -0,0 +1,254 @@ +import { watch } from "node:fs"; +import { readFile } from "node:fs/promises"; + +import { + tryParseRoleStepRecord, + tryParseWorkflowResultRecord, + type WorkflowResult, +} from "@uncaged/workflow"; + +import { printCliError, printCliLine } from "./cli-output.js"; +import { resolveThreadDataPath } from "./thread-scan.js"; + +export const LIVE_CONTENT_MAX_LINES = 10; + +export type LiveRoleRow = { + role: string; + content: string; + meta: Record; + timestamp: number; +}; + +export function formatLiveTimeLabel(timestampMs: number): string { + const d = new Date(timestampMs); + const hh = String(d.getHours()).padStart(2, "0"); + const mm = String(d.getMinutes()).padStart(2, "0"); + const ss = String(d.getSeconds()).padStart(2, "0"); + return `${hh}:${mm}:${ss}`; +} + +function shouldUseColor(): boolean { + return process.stdout.isTTY === true && process.env.NO_COLOR === undefined; +} + +function highlightLiveRole(name: string): string { + if (!shouldUseColor()) { + return name; + } + return `\x1b[1m\x1b[36m${name}\x1b[0m`; +} + +export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] { + const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`; + const lines: string[] = [header]; + const parts = row.content.split("\n"); + const shown = parts.slice(0, LIVE_CONTENT_MAX_LINES); + for (const ln of shown) { + lines.push(` ${ln}`); + } + const omitted = parts.length - shown.length; + if (omitted > 0) { + lines.push(` … (${omitted} more line${omitted === 1 ? "" : "s"})`); + } + lines.push(` meta: ${JSON.stringify(row.meta)}`); + return lines; +} + +function printSummary(result: WorkflowResult): void { + printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`); +} + +type LiveSessionState = { + sawStart: boolean; + completed: boolean; + carry: string; + contentOffset: number; +}; + +function handleJsonlLine( + rawLine: string, + state: LiveSessionState, +): { parseError: string | null; workflowResult: WorkflowResult | null } { + const trimmed = rawLine.trim(); + if (trimmed === "") { + return { parseError: null, workflowResult: null }; + } + + let rec: unknown; + try { + rec = JSON.parse(trimmed) as unknown; + } catch { + return { parseError: "invalid JSON in thread data file", workflowResult: null }; + } + if (rec === null || typeof rec !== "object") { + return { parseError: "invalid record in thread data file", workflowResult: null }; + } + const obj = rec as Record; + + if (!state.sawStart) { + state.sawStart = true; + return { parseError: null, workflowResult: null }; + } + + const wf = tryParseWorkflowResultRecord(obj); + if (wf !== null) { + state.completed = true; + return { parseError: null, workflowResult: wf }; + } + + const roleRow = tryParseRoleStepRecord(obj); + if (roleRow === null) { + return { + parseError: "unrecognized record in thread data (expected role step or result)", + workflowResult: null, + }; + } + + const row: LiveRoleRow = { + role: roleRow.role, + content: roleRow.content, + meta: roleRow.meta, + timestamp: roleRow.timestamp, + }; + for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) { + printCliLine(outLine); + } + return { parseError: null, workflowResult: null }; +} + +async function pumpNewContent(dataPath: string, state: LiveSessionState): Promise { + let text: string; + try { + text = await readFile(dataPath, "utf8"); + } catch { + return null; + } + + if (text.length < state.contentOffset) { + state.contentOffset = 0; + state.carry = ""; + } + + const chunk = text.slice(state.contentOffset); + state.contentOffset = text.length; + state.carry += chunk; + + const parts = state.carry.split("\n"); + state.carry = parts.pop() ?? ""; + + for (const line of parts) { + const { parseError, workflowResult } = handleJsonlLine(line, state); + if (parseError !== null) { + printCliError(parseError); + return 1; + } + if (workflowResult !== null) { + printSummary(workflowResult); + return 0; + } + } + + return null; +} + +function watchLiveFile(params: { + dataPath: string; + state: LiveSessionState; + signal: AbortSignal; +}): Promise { + const { dataPath, state, signal } = params; + + return new Promise((resolve, reject) => { + let settled = false; + const finish = (code: number): void => { + if (settled) { + return; + } + settled = true; + resolve(code); + }; + + /** Serialize reads — `fs.watch` may emit faster than `readFile` completes. */ + let pumpChain: Promise = Promise.resolve(); + + const watcher = watch(dataPath, (eventType) => { + if (eventType === "rename") { + return; + } + schedulePump(); + }); + + watcher.on("error", (err: Error) => { + watcher.close(); + reject(err); + }); + + const onAbort = (): void => { + watcher.close(); + finish(0); + }; + signal.addEventListener("abort", onAbort, { once: true }); + + async function drainQueuedPump(): Promise { + if (settled) { + return; + } + try { + const code = await pumpNewContent(dataPath, state); + if (code !== null) { + watcher.close(); + finish(code); + } + } catch (e) { + watcher.close(); + reject(e instanceof Error ? e : new Error(String(e))); + } + } + + function schedulePump(): void { + pumpChain = pumpChain.then(() => drainQueuedPump()); + } + + schedulePump(); + }); +} + +export async function cmdLive(storageRoot: string, threadId: string): Promise { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + printCliError(`thread not found: ${threadId}`); + return 1; + } + + const state: LiveSessionState = { + sawStart: false, + completed: false, + carry: "", + contentOffset: 0, + }; + + const controller = new AbortController(); + const onSigInt = (): void => { + controller.abort(); + }; + process.on("SIGINT", onSigInt); + + try { + const first = await pumpNewContent(dataPath, state); + if (first !== null) { + return first; + } + + if (state.completed) { + return 0; + } + + return await watchLiveFile({ dataPath, state, signal: controller.signal }); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + printCliError(`live: ${message}`); + return 1; + } finally { + process.off("SIGINT", onSigInt); + } +} diff --git a/packages/workflow/__tests__/fork-thread.test.ts b/packages/workflow/__tests__/fork-thread.test.ts index 244fd87..4e75084 100644 --- a/packages/workflow/__tests__/fork-thread.test.ts +++ b/packages/workflow/__tests__/fork-thread.test.ts @@ -87,6 +87,26 @@ describe("fork-thread", () => { expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 }); }); + test("parseThreadDataJsonl ignores trailing WorkflowResult line", () => { + const text = `${sampleDataJsonl.trim()}\n{"returnCode":0,"summary":"done"}\n`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.roleSteps.length).toBe(3); + expect(r.value.roleSteps[2]?.role).toBe("reviewer"); + }); + + test("parseThreadDataJsonl errors when WorkflowResult is not last", () => { + const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3}},"timestamp":1} +{"returnCode":0,"summary":"early"} +{"role":"planner","content":"x","meta":{},"timestamp":2} +`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(false); + }); + test("parseThreadDataJsonl reads explicit depth from start record", () => { const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1} {"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2} diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 7067260..78ccd2c 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -125,7 +125,7 @@ describe("worker process", () => { .trim() .split("\n") .filter((l) => l !== "").length, - ).toBe(3); + ).toBe(4); } finally { await rm(root, { recursive: true, force: true }); } @@ -187,7 +187,7 @@ describe("worker process", () => { .trim() .split("\n") .filter((l) => l !== ""); - expect(lines.length).toBe(3); + expect(lines.length).toBe(4); const start = JSON.parse(lines[0] ?? "{}") as Record; expect(start.forkFrom).toEqual({ threadId: srcId }); const replay = JSON.parse(lines[1] ?? "{}") as Record; @@ -195,6 +195,8 @@ describe("worker process", () => { expect(replay.timestamp).toBe(555); const coder = JSON.parse(lines[2] ?? "{}") as Record; expect(coder.role).toBe("coder"); + const done = JSON.parse(lines[3] ?? "{}") as Record; + expect(done.returnCode).toBe(0); } finally { await rm(root, { recursive: true, force: true }); } diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index 94a3d85..445381f 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -1,6 +1,6 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; -import type { RoleOutput } from "./types.js"; +import type { RoleOutput, WorkflowResult } from "./types.js"; /** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ export type ForkHistoricalStep = RoleOutput & { timestamp: number }; @@ -14,33 +14,54 @@ export type ParsedThreadStartRecord = { depth: number; }; -function parseRoleLine( - obj: Record, - lineIndex: number, -): Result { +/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */ +export function tryParseWorkflowResultRecord(obj: Record): WorkflowResult | null { + if (obj.role !== undefined) { + return null; + } + const returnCode = obj.returnCode; + const summary = obj.summary; + if (typeof returnCode !== "number" || typeof summary !== "string") { + return null; + } + return { returnCode, summary }; +} + +export function tryParseRoleStepRecord(obj: Record): ForkHistoricalStep | null { const role = obj.role; const contentHash = obj.contentHash; const meta = obj.meta; const timestamp = obj.timestamp; if (typeof role !== "string") { - return err(`invalid role record at line ${lineIndex}: missing role`); + return null; } if (typeof contentHash !== "string") { - return err(`invalid role record at line ${lineIndex}: missing contentHash`); + return null; } if (meta === null || typeof meta !== "object") { - return err(`invalid role record at line ${lineIndex}: missing meta`); + return null; } if (typeof timestamp !== "number") { - return err(`invalid role record at line ${lineIndex}: missing timestamp`); + return null; } - return ok({ + return { role, contentHash, meta: meta as Record, refs: normalizeRefsField(obj.refs), timestamp, - }); + }; +} + +function parseRoleLine( + obj: Record, + lineIndex: number, +): Result { + const parsed = tryParseRoleStepRecord(obj); + if (parsed === null) { + return err(`invalid role record at line ${lineIndex}`); + } + return ok(parsed); } function parseStartRecordLine(firstLine: string): Result { @@ -109,7 +130,15 @@ function parseFollowingRoleLines(lines: string[]): Result, i + 1); + const recObj = rec as Record; + const wf = tryParseWorkflowResultRecord(recObj); + if (wf !== null) { + if (i !== lines.length - 1) { + return err("WorkflowResult record must be the final line in `.data.jsonl`"); + } + break; + } + const parsed = parseRoleLine(recObj, i + 1); if (!parsed.ok) { return parsed; } diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index aa201d6..7071f99 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -25,6 +25,8 @@ export { type ParsedThreadStartRecord, parseThreadDataJsonl, selectForkHistoricalSteps, + tryParseRoleStepRecord, + tryParseWorkflowResultRecord, } from "./fork-thread.js"; export { type GcResult, garbageCollectCas } from "./gc.js"; export { stringifyWorkflowDescriptor } from "./generate-descriptor.js"; diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 8191b1d..574aaf4 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -1,4 +1,4 @@ -import { mkdir, unlink, writeFile } from "node:fs/promises"; +import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises"; import { createServer, type Socket } from "node:net"; import { dirname, join } from "node:path"; import { importWorkflowBundleModule } from "./bundle-import-env.js"; @@ -11,7 +11,7 @@ import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; import { getGlobalCasDir } from "./storage-root.js"; import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; -import type { RoleOutput, WorkflowFn } from "./types.js"; +import type { RoleOutput, WorkflowFn, WorkflowResult } from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); @@ -404,7 +404,7 @@ async function main(): Promise { }); } - await executeThread( + const runResult = await executeThread( workflowFn, cmd.workflowName, { prompt: cmd.prompt, steps: cmd.steps }, @@ -418,9 +418,12 @@ async function main(): Promise { io, logger, ); + await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8"); } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); + const failure: WorkflowResult = { returnCode: 1, summary: message }; + await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {}); } finally { threads.delete(threadId); await unlink(runningPath).catch(() => {});