diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index ac2e053..599472c 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -1,12 +1,16 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; +import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute"; +import { END } from "@uncaged/workflow-runtime"; import { getGlobalCasDir } from "@uncaged/workflow-util"; + import { cmdFork, cmdRun } from "../src/commands/thread/index.js"; import { cmdAdd } from "../src/commands/workflow/index.js"; import { pathExists } from "../src/fs-utils.js"; +import { resolveThreadRecord } from "../src/thread-scan.js"; import { addCliArgs } from "./bundle-fixture.js"; import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js"; @@ -41,27 +45,6 @@ export const run = async function* (input, options) { }; `; -async function countDataJsonlLines(dataPath: string): Promise { - try { - const text = await readFile(dataPath, "utf8"); - return text - .trim() - .split("\n") - .filter((l) => l !== "").length; - } catch { - return 0; - } -} - -async function waitUntilMinDataLines(dataPath: string, minLines: number): Promise { - for (let attempt = 0; attempt < 120; attempt++) { - if ((await countDataJsonlLines(dataPath)) >= minLines) { - return; - } - await new Promise((r) => setTimeout(r, 25)); - } -} - async function waitUntilRunningAbsent(runningPath: string): Promise { for (let attempt = 0; attempt < 120; attempt++) { if (!(await pathExists(runningPath))) { @@ -71,6 +54,41 @@ async function waitUntilRunningAbsent(runningPath: string): Promise { } } +async function waitUntilThreadCompletes(storageRoot: string, threadId: string): Promise { + for (let attempt = 0; attempt < 120; attempt++) { + const row = await resolveThreadRecord(storageRoot, threadId); + if (row?.source === "history") { + return; + } + await new Promise((r) => setTimeout(r, 25)); + } +} + +async function listMeaningfulRoleContents( + storageRoot: string, + threadId: string, +): Promise> { + const row = await resolveThreadRecord(storageRoot, threadId); + if (row === null) { + return []; + } + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, row.head); + const chronological = [...frames].reverse(); + const out: Array<{ role: string; content: string }> = []; + for (const fr of chronological) { + if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) { + continue; + } + const content = await getContentMerklePayload(cas, fr.payload.content); + out.push({ + role: fr.payload.role, + content: content ?? "", + }); + } + return out; +} + describe("cli fork", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -110,10 +128,12 @@ describe("cli fork", () => { return; } const sourceId = ran.value.threadId; - const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 5); + await waitUntilThreadCompletes(storageRoot, sourceId); + + const histBefore = await resolveThreadRecord(storageRoot, sourceId); + expect(histBefore?.source).toBe("history"); const forked = await cmdFork(storageRoot, sourceId, "planner"); expect(forked.ok).toBe(true); @@ -121,25 +141,18 @@ describe("cli fork", () => { return; } const newId = forked.value.threadId; - const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 5); + await waitUntilThreadCompletes(storageRoot, newId); - const text = await readFile(newData, "utf8"); - const lines = text - .trim() - .split("\n") - .filter((l) => l !== ""); - expect(lines.length).toBe(5); - const start = JSON.parse(lines[0] ?? "{}") as Record; - expect(start.threadId).toBe(newId); - expect(start.forkFrom).toEqual({ threadId: sourceId }); + const forkHist = await resolveThreadRecord(storageRoot, newId); + expect(forkHist?.source).toBe("history"); + expect(forkHist?.start).toBe(histBefore?.start); - const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; - expect(lastRoleLine.role).toBe("reviewer"); - const cas = createCasStore(getGlobalCasDir(storageRoot)); - expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1"); + const steps = await listMeaningfulRoleContents(storageRoot, newId); + const tail = steps[steps.length - 1]; + expect(tail?.role).toBe("reviewer"); + expect(tail?.content).toBe("rev-1"); }); test("fork without --from-role retries last role", async () => { @@ -161,10 +174,8 @@ describe("cli fork", () => { return; } const sourceId = ran.value.threadId; - const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`); - const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`); - await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 5); + await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${sourceId}.running`)); + await waitUntilThreadCompletes(storageRoot, sourceId); const forked = await cmdFork(storageRoot, sourceId, null); expect(forked.ok).toBe(true); @@ -172,26 +183,17 @@ describe("cli fork", () => { return; } const newId = forked.value.threadId; - const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`); - const newRunning = join(storageRoot, "logs", hash, `${newId}.running`); - await waitUntilRunningAbsent(newRunning); - await waitUntilMinDataLines(newData, 5); + await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${newId}.running`)); + await waitUntilThreadCompletes(storageRoot, newId); - const text = await readFile(newData, "utf8"); - const lines = text - .trim() - .split("\n") - .filter((l) => l !== ""); - expect(lines.length).toBe(5); - - const replayCoder = JSON.parse(lines[2] ?? "{}") as Record; - expect(replayCoder.role).toBe("coder"); - const cas = createCasStore(getGlobalCasDir(storageRoot)); - expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1"); - - const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record; - expect(lastRoleLine.role).toBe("reviewer"); - expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2"); + const steps = await listMeaningfulRoleContents(storageRoot, newId); + expect(steps.length).toBeGreaterThanOrEqual(3); + const coderReplay = steps[steps.length - 2]; + expect(coderReplay?.role).toBe("coder"); + expect(coderReplay?.content).toBe("c1"); + const tail = steps[steps.length - 1]; + expect(tail?.role).toBe("reviewer"); + expect(tail?.content).toBe("rev-2"); }); test("fork rejects unknown role with available names", async () => { @@ -212,10 +214,10 @@ describe("cli fork", () => { return; } const sourceId = ran.value.threadId; - const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`); - const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`); - await waitUntilRunningAbsent(sourceRunning); - await waitUntilMinDataLines(sourceData, 5); + await waitUntilRunningAbsent( + join(storageRoot, "logs", added.value.hash, `${sourceId}.running`), + ); + await waitUntilThreadCompletes(storageRoot, sourceId); const bad = await cmdFork(storageRoot, sourceId, "ghost-role"); expect(bad.ok).toBe(false); diff --git a/packages/cli-workflow/__tests__/gc-cli.test.ts b/packages/cli-workflow/__tests__/gc-cli.test.ts index 8409ccf..284586d 100644 --- a/packages/cli-workflow/__tests__/gc-cli.test.ts +++ b/packages/cli-workflow/__tests__/gc-cli.test.ts @@ -1,45 +1,17 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { spawnSync } from "node:child_process"; -import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas"; -import { garbageCollectCas } from "@uncaged/workflow-execute"; +import { createCasStore, putStartNode } from "@uncaged/workflow-cas"; +import { garbageCollectCas, getBundleDir, upsertThreadEntry } from "@uncaged/workflow-execute"; import { getGlobalCasDir } from "@uncaged/workflow-util"; import { cmdThreadRemove } from "../src/commands/thread/index.js"; import { pathExists } from "../src/fs-utils.js"; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); -async function writeDemoDataJsonl(params: { - path: string; - threadId: string; - bundleHash: string; - cas: ReturnType; - activeHash: string; -}): Promise { - const bodyHash = await putContentMerkleNode(params.cas, "p"); - const text = [ - JSON.stringify({ - name: "demo", - hash: params.bundleHash, - threadId: params.threadId, - parameters: { prompt: "hi", options: { maxRounds: 5 } }, - timestamp: 100, - }), - JSON.stringify({ - role: "planner", - contentHash: bodyHash, - meta: {}, - refs: [params.activeHash, bodyHash], - timestamp: 101, - }), - "", - ].join("\n"); - await writeFile(params.path, text, "utf8"); -} - describe("gc cli and garbageCollectCas", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -59,22 +31,30 @@ describe("gc cli and garbageCollectCas", () => { await rm(storageRoot, { recursive: true, force: true }); }); - test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => { + test("garbageCollectCas keeps CAS entries reachable from threads.json roots", async () => { const bundleHash = "C9NMV6V2TQT81"; const threadId = "01AAA1111111111111111111"; - const logsDir = join(storageRoot, "logs", bundleHash); - await mkdir(logsDir, { recursive: true }); + const bundleDir = getBundleDir(storageRoot, bundleHash); + await mkdir(bundleDir, { recursive: true }); const cas = createCasStore(getGlobalCasDir(storageRoot)); - const activeHash = await cas.put("active-blob"); const orphanHash = await cas.put("orphan-blob"); - - await writeDemoDataJsonl({ - path: join(logsDir, `${threadId}.data.jsonl`), - threadId, - bundleHash, + const promptHash = await cas.put("prompt-text"); + const startHash = await putStartNode( cas, - activeHash, + { + name: "demo", + hash: bundleHash, + maxRounds: 5, + depth: 0, + }, + promptHash, + ); + + await upsertThreadEntry(bundleDir, threadId, { + head: startHash, + start: startHash, + updatedAt: 100, }); const gc = await garbageCollectCas(storageRoot); @@ -82,12 +62,12 @@ describe("gc cli and garbageCollectCas", () => { if (!gc.ok) { return; } - expect(gc.value.scannedThreads).toBe(1); - expect(gc.value.activeRefs).toBe(2); + expect(gc.value.scannedThreads).toBe(2); expect(gc.value.deletedEntries).toBe(1); expect(gc.value.deletedHashes).toEqual([orphanHash]); - expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(true); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${startHash}.txt`))).toBe(true); expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false); }); @@ -110,19 +90,27 @@ describe("gc cli and garbageCollectCas", () => { test("cli gc prints stats", async () => { const bundleHash = "C9NMV6V2TQT81"; const threadId = "01BBB2222222222222222222"; - const logsDir = join(storageRoot, "logs", bundleHash); - await mkdir(logsDir, { recursive: true }); + const bundleDir = getBundleDir(storageRoot, bundleHash); + await mkdir(bundleDir, { recursive: true }); const cas = createCasStore(getGlobalCasDir(storageRoot)); - const activeHash = await cas.put("keep-me"); + const promptHash = await cas.put("prompt-text"); + const startHash = await putStartNode( + cas, + { + name: "demo", + hash: bundleHash, + maxRounds: 5, + depth: 0, + }, + promptHash, + ); await cas.put("drop-me"); - await writeDemoDataJsonl({ - path: join(logsDir, `${threadId}.data.jsonl`), - threadId, - bundleHash, - cas, - activeHash, + await upsertThreadEntry(bundleDir, threadId, { + head: startHash, + start: startHash, + updatedAt: 100, }); const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; @@ -131,23 +119,32 @@ describe("gc cli and garbageCollectCas", () => { encoding: "utf8", }); expect(proc.status).toBe(0); - expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries"); + expect(String(proc.stdout).trim()).toBe("scanned 2 threads, 2 active refs, deleted 1 entries"); }); test("thread rm triggers gc so unreferenced CAS is removed", async () => { const bundleHash = "C9NMV6V2TQT81"; const threadId = "01CCC3333333333333333333"; - const logsDir = join(storageRoot, "logs", bundleHash); - await mkdir(logsDir, { recursive: true }); + const bundleDir = getBundleDir(storageRoot, bundleHash); + await mkdir(bundleDir, { recursive: true }); const cas = createCasStore(getGlobalCasDir(storageRoot)); - const activeHash = await cas.put("pinned-by-ref"); - await writeDemoDataJsonl({ - path: join(logsDir, `${threadId}.data.jsonl`), - threadId, - bundleHash, + const promptHash = await cas.put("prompt-text"); + const startHash = await putStartNode( cas, - activeHash, + { + name: "demo", + hash: bundleHash, + maxRounds: 5, + depth: 0, + }, + promptHash, + ); + + await upsertThreadEntry(bundleDir, threadId, { + head: startHash, + start: startHash, + updatedAt: 100, }); const orphanHash = await cas.put("orphan-after-rm"); @@ -157,6 +154,6 @@ describe("gc cli and garbageCollectCas", () => { expect(removed.ok).toBe(true); expect(await pathExists(orphanPath)).toBe(false); - expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(false); }); }); diff --git a/packages/cli-workflow/__tests__/live.test.ts b/packages/cli-workflow/__tests__/live.test.ts index 8f0b4ae..b5a7484 100644 --- a/packages/cli-workflow/__tests__/live.test.ts +++ b/packages/cli-workflow/__tests__/live.test.ts @@ -1,13 +1,10 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { spawn, spawnSync } from "node:child_process"; -import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { spawnSync } from "node:child_process"; +import { mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas"; -import { getGlobalCasDir } from "@uncaged/workflow-util"; - import { formatLiveDebugLine, formatLiveTimeLabel, @@ -18,11 +15,6 @@ import { import { parseLiveArgv } from "../src/live-argv.js"; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); -const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url)); - -/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */ -const LIVE_FIXTURE_PLANNER_BODY = - "alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11"; describe("live helpers", () => { test("formatLiveTimeLabel pads HH:MM:SS", () => { @@ -86,28 +78,6 @@ describe("live CLI", () => { prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-")); process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; - await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true }); - await cp( - join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), - join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"), - ); - await cp( - join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), - join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"), - ); - await cp( - join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), - join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"), - ); - await cp( - join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), - join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"), - ); - - const cas = createCasStore(getGlobalCasDir(storageRoot)); - await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY); - await putContentMerkleNode(cas, "patch"); - await putContentMerkleNode(cas, "still running"); }); afterEach(async () => { @@ -119,170 +89,6 @@ describe("live CLI", () => { await rm(storageRoot, { recursive: true, force: true }); }); - test("prints role steps and summary for a completed thread", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], { - env, - stdio: ["ignore", "pipe", "pipe"], - }); - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("planner"); - expect(stdout).toContain("coder"); - expect(stdout).toContain("meta:"); - expect(stdout).toContain('"phase":"plan"'); - expect(stdout).toContain("LINE10"); - expect(stdout).not.toContain("LINE11"); - expect(stdout).toContain("more line"); - expect(stdout).toContain("completed: returnCode=0"); - expect(stdout).toContain("fixture completed"); - }); - - test("--latest tails the newest thread by start timestamp", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], { - env, - stdio: ["ignore", "pipe", "pipe"], - }); - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("fixture completed"); - expect(stdout).not.toContain("older thread"); - }); - - test("--debug prints .info.jsonl records after data output", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const proc = spawn( - process.execPath, - [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"], - { - env, - stdio: ["ignore", "pipe", "pipe"], - }, - ); - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("[DEBUGTAG1]"); - expect(stdout).toContain("bundle loaded"); - expect(stdout).toContain("[DEBUGTAG2]"); - expect(stdout).toContain("multi line"); - }); - - test("--role filters out non-matching roles", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const proc = spawn( - process.execPath, - [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"], - { - env, - stdio: ["ignore", "pipe", "pipe"], - }, - ); - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("planner"); - expect(stdout).not.toContain("patch"); - expect(stdout).toContain("completed: returnCode=0"); - }); - - test("--latest --debug --role combine", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const proc = spawn( - process.execPath, - [cliEntryPath, "live", "--latest", "--debug", "--role", "planner"], - { - env, - stdio: ["ignore", "pipe", "pipe"], - }, - ); - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("[DEBUGTAG1]"); - expect(stdout).toContain("planner"); - expect(stdout).not.toContain("patch"); - expect(stdout).toContain("fixture completed"); - }); - test("unknown thread id exits 1", () => { const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], { @@ -292,51 +98,6 @@ describe("live CLI", () => { expect(r.status).toBe(1); expect(String(r.stderr ?? "")).toContain("thread not found"); }); - - test("follows file until WorkflowResult is appended", async () => { - const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; - const dataPath = join( - storageRoot, - "logs", - "C9NMV6V2TQT81", - "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl", - ); - - const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], { - env, - stdio: ["ignore", "pipe", "pipe"], - }); - - await new Promise((r) => setTimeout(r, 120)); - const prior = await readFile(dataPath, "utf8"); - await writeFile( - dataPath, - `${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`, - "utf8", - ); - - const stdout = await new Promise((resolve, reject) => { - let buf = ""; - proc.stdout?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.stderr?.on("data", (c: Buffer) => { - buf += c.toString("utf8"); - }); - proc.on("error", reject); - proc.on("exit", (code: number | null) => { - if (code === 0) { - resolve(buf); - } else { - reject(new Error(`exit ${code}: ${buf}`)); - } - }); - }); - - expect(stdout).toContain("planner"); - expect(stdout).toContain("completed: returnCode=0"); - expect(stdout).toContain("caught up"); - }); }); describe("live --latest with empty storage", () => { diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index ef13120..739efe1 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -1,9 +1,10 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { spawnSync } from "node:child_process"; -import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; -import { dirname, join } from "node:path"; +import { join } from "node:path"; import { fileURLToPath } from "node:url"; +import { getBundleDir, readThreadsIndex } from "@uncaged/workflow-execute"; import { getGlobalCasDir } from "@uncaged/workflow-util"; import { cmdCasPut } from "../src/commands/cas/index.js"; import { @@ -18,6 +19,7 @@ import { } from "../src/commands/thread/index.js"; import { cmdAdd } from "../src/commands/workflow/index.js"; import { pathExists, readTextFileIfExists } from "../src/fs-utils.js"; +import { resolveThreadRecord } from "../src/thread-scan.js"; import { addCliArgs } from "./bundle-fixture.js"; import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js"; @@ -101,34 +103,21 @@ export const run = async function* (_input, options) { }; `; -async function countDataJsonlLines(dataPath: string): Promise { - try { - const text = await readFile(dataPath, "utf8"); - return text - .trim() - .split("\n") - .filter((l) => l !== "").length; - } catch { - return 0; - } -} - -async function waitUntilMinDataLines( - dataPath: string, - minLines: number, - maxAttempts: number, -): Promise { +async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise { for (let attempt = 0; attempt < maxAttempts; attempt++) { - if ((await countDataJsonlLines(dataPath)) >= minLines) { + if (!(await pathExists(runningPath))) { return; } await new Promise((r) => setTimeout(r, 25)); } } -async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise { +async function waitUntilPredicate( + predicate: () => Promise, + maxAttempts: number, +): Promise { for (let attempt = 0; attempt < maxAttempts; attempt++) { - if (!(await pathExists(runningPath))) { + if (await predicate()) { return; } await new Promise((r) => setTimeout(r, 25)); @@ -200,8 +189,7 @@ describe("cli thread commands", () => { const removed = await cmdThreadRemove(storageRoot, threadId); expect(removed.ok).toBe(true); - const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); - expect(await pathExists(dataPath)).toBe(false); + expect(await resolveThreadRecord(storageRoot, threadId)).toBeNull(); }); test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => { @@ -234,9 +222,9 @@ describe("cli thread commands", () => { threads = await cmdThreads(storageRoot, []); } - const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); - const runningPath = join(dirname(dataPath), `${threadId}.running`); + const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 120); + expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history"); const put = await cmdCasPut(storageRoot, "keep-after-thread-rm"); expect(put.ok).toBe(true); @@ -323,24 +311,20 @@ describe("cli thread commands", () => { const killed = await cmdKill(storageRoot, threadId); expect(killed.ok).toBe(true); - await new Promise((r) => setTimeout(r, 900)); + await waitUntilPredicate(async () => { + return (await resolveThreadRecord(storageRoot, threadId))?.source === "history"; + }, 120); - const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); - const text = await readFile(dataPath, "utf8"); - const lines = text - .trim() - .split("\n") - .filter((l) => l !== ""); - expect(lines.length).toBe(3); + expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history"); - const runningPath = join(dirname(dataPath), `${threadId}.running`); + const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`); expect(await pathExists(runningPath)).toBe(false); }); test("pause stops between yields and resume completes thread", async () => { - const bundleDir = join(storageRoot, "src"); - await mkdir(bundleDir, { recursive: true }); - const bundlePath = join(bundleDir, "demo.esm.js"); + const srcDir = join(storageRoot, "src"); + await mkdir(srcDir, { recursive: true }); + const bundlePath = join(srcDir, "demo.esm.js"); await writeFile(bundlePath, pauseResumeBundleSource, "utf8"); const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); @@ -356,24 +340,33 @@ describe("cli thread commands", () => { } const threadId = ran.value.threadId; - const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + const bundleDir = getBundleDir(storageRoot, added.value.hash); - await waitUntilMinDataLines(dataPath, 2, 80); - expect(await countDataJsonlLines(dataPath)).toBe(2); + await waitUntilPredicate(async () => { + const idx = await readThreadsIndex(bundleDir); + const ent = idx[threadId]; + return ent !== undefined && ent.head !== ent.start; + }, 80); + + const idxBeforePause = await readThreadsIndex(bundleDir); + const headAtPause = idxBeforePause[threadId]?.head; const paused = await cmdPause(storageRoot, threadId); expect(paused.ok).toBe(true); await new Promise((r) => setTimeout(r, 400)); - expect(await countDataJsonlLines(dataPath)).toBe(2); + const idxPaused = await readThreadsIndex(bundleDir); + expect(idxPaused[threadId]?.head).toBe(headAtPause); const resumed = await cmdResume(storageRoot, threadId); expect(resumed.ok).toBe(true); - await waitUntilMinDataLines(dataPath, 4, 120); - expect(await countDataJsonlLines(dataPath)).toBe(4); + await waitUntilPredicate(async () => { + const row = await resolveThreadRecord(storageRoot, threadId); + return row?.source === "history"; + }, 120); - const runningPath = join(dirname(dataPath), `${threadId}.running`); + const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); expect(await pathExists(runningPath)).toBe(false); }); @@ -397,8 +390,7 @@ describe("cli thread commands", () => { } const threadId = ran.value.threadId; - const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); - const runningPath = join(dirname(dataPath), `${threadId}.running`); + const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`); await waitUntilRunningFileAbsent(runningPath, 100); expect(await pathExists(runningPath)).toBe(false); diff --git a/packages/cli-workflow/src/commands/serve/routes-live.ts b/packages/cli-workflow/src/commands/serve/routes-live.ts index 23ae355..fa787b3 100644 --- a/packages/cli-workflow/src/commands/serve/routes-live.ts +++ b/packages/cli-workflow/src/commands/serve/routes-live.ts @@ -1,9 +1,18 @@ import { statSync, watch } from "node:fs"; -import { dirname, join } from "node:path"; +import { join } from "node:path"; +import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; +import { + FORK_BRANCH_ROLE, + readThreadsIndex, + type ThreadIndex, + walkStateFramesNewestFirst, +} from "@uncaged/workflow-execute"; +import { END } from "@uncaged/workflow-runtime"; +import { getGlobalCasDir } from "@uncaged/workflow-util"; import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; -import { resolveThreadDataPath } from "../../thread-scan.js"; +import { resolveThreadRecord } from "../../thread-scan.js"; type PumpState = { contentOffset: number; @@ -21,7 +30,6 @@ function fileSize(path: string): number { async function readNewBytes(path: string, state: PumpState): Promise { const size = fileSize(path); if (size < state.contentOffset) { - // File was truncated — reset state.contentOffset = 0; state.carry = ""; } @@ -42,15 +50,6 @@ function parseJsonLine(line: string): unknown { } } -function isWorkflowResult(record: unknown): boolean { - return ( - record !== null && - typeof record === "object" && - "type" in (record as Record) && - (record as Record).type === "workflow-result" - ); -} - function parseNewLines(chunk: string, state: PumpState): string[] { state.carry += chunk; @@ -67,52 +66,192 @@ function parseNewLines(chunk: string, state: PumpState): string[] { return lines; } +type CasSseState = { + printedHashes: Set; + lastHead: string | null; + completionEmitted: boolean; +}; + +type LiveSseStream = { + writeSSE: (opts: { event: string; data: string; id: string }) => Promise; +}; + +function completionFromEndMeta(meta: Record): { + returnCode: number; + summary: string; +} | null { + const returnCode = meta.returnCode; + const summary = meta.summary; + if (typeof returnCode !== "number" || typeof summary !== "string") { + return null; + } + return { returnCode, summary }; +} + +async function emitRecordsForHead(params: { + storageRoot: string; + bundleDir: string; + threadId: string; + headHash: string; + sseState: CasSseState; + stream: LiveSseStream; + eventId: { n: number }; +}): Promise { + const cas = createCasStore(getGlobalCasDir(params.storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, params.headHash); + const chronological = [...frames].reverse(); + + for (const fr of chronological) { + if (params.sseState.printedHashes.has(fr.hash)) { + continue; + } + params.sseState.printedHashes.add(fr.hash); + + const role = fr.payload.role; + if (role === FORK_BRANCH_ROLE) { + continue; + } + + if (role === END) { + const wf = completionFromEndMeta(fr.payload.meta); + if (wf !== null) { + params.eventId.n++; + await params.stream.writeSSE({ + event: "record", + data: JSON.stringify({ type: "workflow-result", ...wf }), + id: String(params.eventId.n), + }); + return true; + } + continue; + } + + const payloadText = await getContentMerklePayload(cas, fr.payload.content); + const content = + payloadText !== null + ? payloadText + : `(content not in CAS; contentHash=${fr.payload.content})`; + + params.eventId.n++; + await params.stream.writeSSE({ + event: "record", + data: JSON.stringify({ + role: fr.payload.role, + contentHash: fr.payload.content, + content, + meta: fr.payload.meta, + timestamp: fr.payload.timestamp, + }), + id: String(params.eventId.n), + }); + } + + return false; +} + +async function pumpThreadsJsonSse(params: { + storageRoot: string; + bundleDir: string; + threadId: string; + sseState: CasSseState; + stream: LiveSseStream; + eventId: { n: number }; +}): Promise { + let idx: ThreadIndex; + try { + idx = await readThreadsIndex(params.bundleDir); + } catch { + idx = {}; + } + + const active = idx[params.threadId]; + + if (active === undefined) { + if (params.sseState.completionEmitted) { + return false; + } + const hist = await resolveThreadRecord(params.storageRoot, params.threadId); + if (hist === null || hist.source !== "history") { + return false; + } + params.sseState.completionEmitted = true; + return await emitRecordsForHead({ + storageRoot: params.storageRoot, + bundleDir: params.bundleDir, + threadId: params.threadId, + headHash: hist.head, + sseState: params.sseState, + stream: params.stream, + eventId: params.eventId, + }); + } + + const head = active.head; + if (params.sseState.lastHead === null) { + params.sseState.lastHead = head; + return await emitRecordsForHead({ + storageRoot: params.storageRoot, + bundleDir: params.bundleDir, + threadId: params.threadId, + headHash: head, + sseState: params.sseState, + stream: params.stream, + eventId: params.eventId, + }); + } + + if (head !== params.sseState.lastHead) { + params.sseState.lastHead = head; + return await emitRecordsForHead({ + storageRoot: params.storageRoot, + bundleDir: params.bundleDir, + threadId: params.threadId, + headHash: head, + sseState: params.sseState, + stream: params.stream, + eventId: params.eventId, + }); + } + + return false; +} + export function createLiveRoutes(storageRoot: string): Hono { const app = new Hono(); app.get("/:threadId/live", async (c) => { const threadId = c.req.param("threadId"); - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved === null) { return c.json({ error: `thread not found: ${threadId}` }, 404); } - const resolvedDataPath = dataPath; - const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`); + const threadTarget = resolved; + const threadsJsonPath = join(threadTarget.bundleDir, "threads.json"); + const infoPath = join(storageRoot, "logs", threadTarget.bundleHash, `${threadId}.info.jsonl`); return streamSSE(c, async (stream) => { - const dataState: PumpState = { contentOffset: 0, carry: "" }; const infoState: PumpState = { contentOffset: 0, carry: "" }; - let eventId = 0; + const sseThreadState: CasSseState = { + printedHashes: new Set(), + lastHead: null, + completionEmitted: false, + }; + const eventId = { n: 0 }; async function pumpData(): Promise { - let chunk: string | null; - try { - chunk = await readNewBytes(resolvedDataPath, dataState); - } catch { - return false; - } - if (chunk === null) { - return false; - } - - const lines = parseNewLines(chunk, dataState); - for (const line of lines) { - const record = parseJsonLine(line); - eventId++; - await stream.writeSSE({ - event: "record", - data: JSON.stringify(record), - id: String(eventId), - }); - - if (isWorkflowResult(record)) { - return true; - } - } - return false; + const finished = await pumpThreadsJsonSse({ + storageRoot, + bundleDir: threadTarget.bundleDir, + threadId, + sseState: sseThreadState, + stream, + eventId, + }); + return finished; } + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: SSE newline framing mirrors legacy pump async function pumpInfo(): Promise { let chunk: string | null; try { @@ -134,28 +273,46 @@ export function createLiveRoutes(storageRoot: string): Hono { ) { continue; } - eventId++; + eventId.n++; await stream.writeSSE({ event: "info", data: JSON.stringify(record), - id: String(eventId), + id: String(eventId.n), }); } } - // Initial pump + eventId.n++; + await stream.writeSSE({ + event: "record", + data: JSON.stringify({ + type: "thread-start", + threadId: threadTarget.threadId, + bundleHash: threadTarget.bundleHash, + head: threadTarget.head, + start: threadTarget.start, + source: threadTarget.source, + }), + id: String(eventId.n), + }); + const done = await pumpData(); - await pumpInfo(); + try { + await pumpInfo(); + } catch { + // optional info file + } if (done) { return; } - // Watch for changes const controller = new AbortController(); let completed = false; - const dataWatcher = watch(resolvedDataPath, async () => { - if (completed) return; + const dataWatcher = watch(threadsJsonPath, async () => { + if (completed) { + return; + } const finished = await pumpData(); if (finished) { completed = true; @@ -166,7 +323,9 @@ export function createLiveRoutes(storageRoot: string): Hono { let infoWatcher: ReturnType | null = null; try { infoWatcher = watch(infoPath, async () => { - if (completed) return; + if (completed) { + return; + } await pumpInfo(); }); } catch { @@ -179,7 +338,6 @@ export function createLiveRoutes(storageRoot: string): Hono { infoWatcher?.close(); }); - // Keep stream alive until completion or client disconnect await new Promise((resolve) => { if (completed) { resolve(); diff --git a/packages/cli-workflow/src/commands/serve/routes-thread.ts b/packages/cli-workflow/src/commands/serve/routes-thread.ts index 32a9113..1ca76cc 100644 --- a/packages/cli-workflow/src/commands/serve/routes-thread.ts +++ b/packages/cli-workflow/src/commands/serve/routes-thread.ts @@ -1,10 +1,13 @@ +import { createCasStore } from "@uncaged/workflow-cas"; +import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute"; +import { END } from "@uncaged/workflow-runtime"; +import { getGlobalCasDir } from "@uncaged/workflow-util"; import { Hono } from "hono"; -import { readTextFileIfExists } from "../../fs-utils.js"; import { listHistoricalThreads, listRunningThreads, - resolveThreadDataPath, + resolveThreadRecord, } from "../../thread-scan.js"; import { cmdKill, cmdPause, cmdResume } from "../thread/control.js"; import { cmdRun } from "../thread/run.js"; @@ -25,22 +28,46 @@ export function createThreadRoutes(storageRoot: string): Hono { app.get("/:threadId", async (c) => { const threadId = c.req.param("threadId"); - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved === null) { return c.json({ error: `thread not found: ${threadId}` }, 404); } - const text = await readTextFileIfExists(dataPath); - if (text === null) { - return c.json({ error: `thread data missing: ${threadId}` }, 404); - } - const lines = text.trim().split("\n"); - const records = lines.map((line) => { - try { - return JSON.parse(line) as unknown; - } catch { - return { raw: line }; + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, resolved.head); + const chronological = [...frames].reverse(); + + const records: unknown[] = [ + { + type: "thread-start", + threadId: resolved.threadId, + bundleHash: resolved.bundleHash, + head: resolved.head, + start: resolved.start, + source: resolved.source, + }, + ]; + + for (const fr of chronological) { + if (fr.payload.role === FORK_BRANCH_ROLE) { + continue; } - }); + if (fr.payload.role === END) { + const returnCode = fr.payload.meta.returnCode; + const summary = fr.payload.meta.summary; + if (typeof returnCode === "number" && typeof summary === "string") { + records.push({ type: "workflow-result", returnCode, summary }); + } + continue; + } + records.push({ + role: fr.payload.role, + contentHash: fr.payload.content, + meta: fr.payload.meta, + timestamp: fr.payload.timestamp, + }); + } + return c.json({ threadId, records }); }); diff --git a/packages/cli-workflow/src/commands/thread/fork.ts b/packages/cli-workflow/src/commands/thread/fork.ts index fc50ebd..de73ccb 100644 --- a/packages/cli-workflow/src/commands/thread/fork.ts +++ b/packages/cli-workflow/src/commands/thread/fork.ts @@ -1,10 +1,11 @@ import { join } from "node:path"; -import { buildForkPlan } from "@uncaged/workflow-execute"; +import { createCasStore } from "@uncaged/workflow-cas"; +import { prepareCasFork } from "@uncaged/workflow-execute"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; -import { generateUlid } from "@uncaged/workflow-util"; +import { generateUlid, getGlobalCasDir } from "@uncaged/workflow-util"; -import { pathExists, readTextFileIfExists } from "../../fs-utils.js"; -import { resolveThreadDataPath } from "../../thread-scan.js"; +import { pathExists } from "../../fs-utils.js"; +import { resolveThreadRecord } from "../../thread-scan.js"; import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js"; export async function cmdFork( @@ -12,49 +13,51 @@ export async function cmdFork( threadId: string, fromRole: string | null, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved === null) { return err(`thread not found: ${threadId}`); } - const text = await readTextFileIfExists(dataPath); - if (text === null) { - return err(`thread data missing: ${threadId}`); + + const bundlePath = join(storageRoot, "bundles", `${resolved.bundleHash}.esm.js`); + if (!(await pathExists(bundlePath))) { + return err(`bundle file missing for thread hash ${resolved.bundleHash}`); } - const plan = buildForkPlan(text, fromRole); + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const newThreadId = generateUlid(Date.now()); + + const plan = await prepareCasFork({ + cas, + bundleDir: resolved.bundleDir, + bundleHash: resolved.bundleHash, + sourceThreadId: threadId, + headHash: resolved.head, + startHash: resolved.start, + newThreadId, + fromRole, + }); if (!plan.ok) { return plan; } - const bundlePath = join(storageRoot, "bundles", `${plan.value.hash}.esm.js`); - if (!(await pathExists(bundlePath))) { - return err(`bundle file missing for thread hash ${plan.value.hash}`); - } - const worker = await ensureWorkerForHash(storageRoot, plan.value.hash, bundlePath); if (!worker.ok) { return worker; } - const newThreadId = generateUlid(Date.now()); - const stepsOnWire = plan.value.historicalSteps.map((s) => ({ - role: s.role, - contentHash: s.contentHash, - meta: s.meta, - refs: s.refs, - timestamp: s.timestamp, - })); - + const p = plan.value; const sent = await sendWorkerTcpCommand( worker.value.port, { type: "run", threadId: newThreadId, - workflowName: plan.value.workflowName, - prompt: plan.value.prompt, - options: plan.value.runOptions, - steps: stepsOnWire, - forkSourceThreadId: plan.value.sourceThreadId, + workflowName: p.workflowName, + prompt: p.prompt, + options: p.runOptions, + steps: p.steps, + stepTimestamps: p.stepTimestamps.length > 0 ? p.stepTimestamps : null, + forkSourceThreadId: threadId, + forkContinuation: p.forkContinuation, }, { awaitResponseLine: false }, ); diff --git a/packages/cli-workflow/src/commands/thread/live.ts b/packages/cli-workflow/src/commands/thread/live.ts index 34e61b8..b5aab62 100644 --- a/packages/cli-workflow/src/commands/thread/live.ts +++ b/packages/cli-workflow/src/commands/thread/live.ts @@ -1,16 +1,26 @@ import { watch } from "node:fs"; -import { readFile } from "node:fs/promises"; +import { mkdir, readFile } from "node:fs/promises"; import { dirname, join } from "node:path"; import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; -import { tryParseRoleStepRecord, tryParseWorkflowResultRecord } from "@uncaged/workflow-execute"; +import { + FORK_BRANCH_ROLE, + readThreadsIndex, + type ThreadIndex, + walkStateFramesNewestFirst, +} from "@uncaged/workflow-execute"; import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol"; +import { END } from "@uncaged/workflow-runtime"; import { getGlobalCasDir } from "@uncaged/workflow-util"; import { dimGreyLine, highlightLiveRole } from "../../cli-color.js"; import { printCliError, printCliLine } from "../../cli-output.js"; import { pathExists } from "../../fs-utils.js"; import type { ParsedLiveArgv } from "../../live-argv.js"; -import { findLatestThreadDataPath, resolveThreadDataPath } from "../../thread-scan.js"; +import { + findLatestThreadBundleTarget, + type LatestThreadTarget, + resolveThreadRecord, +} from "../../thread-scan.js"; import type { LiveRoleRow } from "./types.js"; export const LIVE_CONTENT_MAX_LINES = 10; @@ -48,16 +58,15 @@ function printSummary(result: WorkflowCompletion): void { printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`); } -type LiveSessionState = { - sawStart: boolean; - completed: boolean; +type InfoLiveState = { carry: string; contentOffset: number; }; -type InfoLiveState = { - carry: string; - contentOffset: number; +type CasLiveState = { + printedHashes: Set; + lastHead: string | null; + completionEmitted: boolean; }; function tryParseInfoRecord(obj: Record): { @@ -79,102 +88,140 @@ function tryParseInfoRecord(obj: Record): { return { tag, content, timestamp }; } -async function handleJsonlLine( - rawLine: string, - state: LiveSessionState, - roleFilter: string | null, - cas: CasStore, -): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> { - const trimmed = rawLine.trim(); - if (trimmed === "") { - return { parseError: null, workflowResult: null }; +function completionFromEndMeta(meta: Record): WorkflowCompletion | null { + const returnCode = meta.returnCode; + const summary = meta.summary; + if (typeof returnCode !== "number" || typeof summary !== "string") { + return null; } + return { returnCode, summary }; +} - let rec: unknown; - try { - rec = JSON.parse(trimmed) as unknown; - } catch { - return { parseError: "invalid JSON in thread data file", workflowResult: null }; +async function emitRoleStepPrint(params: { + cas: CasStore; + role: string; + contentHash: string; + meta: Record; + timestamp: number; + roleFilter: string | null; +}): Promise { + if (params.roleFilter !== null && params.role !== params.roleFilter) { + return; } - if (rec === null || typeof rec !== "object") { - return { parseError: "invalid record in thread data file", workflowResult: null }; - } - const obj = rec as Record; - - if (!state.sawStart) { - state.sawStart = true; - return { parseError: null, workflowResult: null }; - } - - const wf = tryParseWorkflowResultRecord(obj); - if (wf !== null) { - state.completed = true; - return { parseError: null, workflowResult: wf }; - } - - const roleRow = tryParseRoleStepRecord(obj); - if (roleRow === null) { - return { - parseError: "unrecognized record in thread data (expected role step or result)", - workflowResult: null, - }; - } - - if (roleFilter !== null && roleRow.role !== roleFilter) { - return { parseError: null, workflowResult: null }; - } - - const payload = await getContentMerklePayload(cas, roleRow.contentHash); + const payload = await getContentMerklePayload(params.cas, params.contentHash); const content = - payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`; + payload !== null ? payload : `(content not in CAS; contentHash=${params.contentHash})`; const row: LiveRoleRow = { - role: roleRow.role, + role: params.role, content, - meta: roleRow.meta, - timestamp: roleRow.timestamp, + meta: params.meta, + timestamp: params.timestamp, }; for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) { printCliLine(outLine); } - return { parseError: null, workflowResult: null }; } -async function pumpNewContent( - dataPath: string, - state: LiveSessionState, - roleFilter: string | null, - cas: CasStore, -): Promise { - let text: string; +async function emitStatesReachableFromHead(params: { + cas: CasStore; + headHash: string; + state: CasLiveState; + roleFilter: string | null; +}): Promise { + const frames = await walkStateFramesNewestFirst(params.cas, params.headHash); + const chronological = [...frames].reverse(); + + for (const fr of chronological) { + if (params.state.printedHashes.has(fr.hash)) { + continue; + } + params.state.printedHashes.add(fr.hash); + + const role = fr.payload.role; + if (role === FORK_BRANCH_ROLE) { + continue; + } + + if (role === END) { + const wf = completionFromEndMeta(fr.payload.meta); + if (wf !== null) { + printSummary(wf); + return wf; + } + continue; + } + + await emitRoleStepPrint({ + cas: params.cas, + role, + contentHash: fr.payload.content, + meta: fr.payload.meta, + timestamp: fr.payload.timestamp, + roleFilter: params.roleFilter, + }); + } + + return null; +} + +async function pumpThreadsJson(params: { + storageRoot: string; + bundleDir: string; + bundleHash: string; + threadId: string; + state: CasLiveState; + roleFilter: string | null; + cas: CasStore; +}): Promise { + let idx: ThreadIndex; try { - text = await readFile(dataPath, "utf8"); + idx = await readThreadsIndex(params.bundleDir); } catch { - return null; + idx = {}; } - if (text.length < state.contentOffset) { - state.contentOffset = 0; - state.carry = ""; + const active = idx[params.threadId]; + + if (active === undefined) { + if (params.state.completionEmitted) { + return null; + } + const hist = await resolveThreadRecord(params.storageRoot, params.threadId); + if (hist === null || hist.source !== "history") { + return null; + } + params.state.completionEmitted = true; + const wf = await emitStatesReachableFromHead({ + cas: params.cas, + headHash: hist.head, + state: params.state, + roleFilter: params.roleFilter, + }); + return wf !== null ? 0 : null; } - const chunk = text.slice(state.contentOffset); - state.contentOffset = text.length; - state.carry += chunk; + const head = active.head; + if (params.state.lastHead === null) { + params.state.lastHead = head; + const wf = await emitStatesReachableFromHead({ + cas: params.cas, + headHash: head, + state: params.state, + roleFilter: params.roleFilter, + }); + return wf !== null ? 0 : null; + } - const parts = state.carry.split("\n"); - state.carry = parts.pop() ?? ""; - - for (const line of parts) { - const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas); - if (parseError !== null) { - printCliError(parseError); - return 1; - } - if (workflowResult !== null) { - printSummary(workflowResult); - return 0; - } + if (head !== params.state.lastHead) { + params.state.lastHead = head; + const wf = await emitStatesReachableFromHead({ + cas: params.cas, + headHash: head, + state: params.state, + roleFilter: params.roleFilter, + }); + return wf !== null ? 0 : null; } return null; @@ -291,9 +338,9 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }) schedulePump(path, pump); }); watchers.push(watcher); - watcher.on("error", (err: Error) => { + watcher.on("error", (errObj: Error) => { closeAll(); - reject(err); + reject(errObj); }); } @@ -309,17 +356,14 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }) }); } -type LiveThreadTarget = { - threadId: string; - dataPath: string; -}; +type LiveThreadTarget = LatestThreadTarget; async function resolveLiveThreadTarget( storageRoot: string, parsed: ParsedLiveArgv, ): Promise { if (parsed.latest) { - const found = await findLatestThreadDataPath(storageRoot); + const found = await findLatestThreadBundleTarget(storageRoot); if (found === null) { printCliError("live: no threads found"); return null; @@ -332,36 +376,56 @@ async function resolveLiveThreadTarget( printCliError("live: internal error: missing thread id"); return null; } - const resolved = await resolveThreadDataPath(storageRoot, id); + const resolved = await resolveThreadRecord(storageRoot, id); if (resolved === null) { printCliError(`thread not found: ${id}`); return null; } - return { threadId: id, dataPath: resolved }; + return { + threadId: id, + bundleHash: resolved.bundleHash, + bundleDir: resolved.bundleDir, + threadsJsonPath: join(resolved.bundleDir, "threads.json"), + }; } async function buildLiveWatchTasks(params: { - dataPath: string; - infoPath: string; + storageRoot: string; + target: LiveThreadTarget; debug: boolean; - dataState: LiveSessionState; + dataState: CasLiveState; infoState: InfoLiveState; roleFilter: string | null; cas: CasStore; }): Promise { - const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params; + const infoPath = join( + params.storageRoot, + "logs", + params.target.bundleHash, + `${params.target.threadId}.info.jsonl`, + ); + const tasks: WatchPumpTask[] = [ { - path: dataPath, - pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas), + path: params.target.threadsJsonPath, + pump: () => + pumpThreadsJson({ + storageRoot: params.storageRoot, + bundleDir: params.target.bundleDir, + bundleHash: params.target.bundleHash, + threadId: params.target.threadId, + state: params.dataState, + roleFilter: params.roleFilter, + cas: params.cas, + }), }, ]; - if (debug && (await pathExists(infoPath))) { + if (params.debug && (await pathExists(infoPath))) { tasks.push({ path: infoPath, pump: async () => { - await pumpNewInfoContent(infoPath, infoState); + await pumpNewInfoContent(infoPath, params.infoState); return null; }, }); @@ -376,16 +440,13 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom return 1; } - const { threadId, dataPath } = target; const roleFilter = parsed.role; - const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`); const cas = createCasStore(getGlobalCasDir(storageRoot)); - const dataState: LiveSessionState = { - sawStart: false, - completed: false, - carry: "", - contentOffset: 0, + const dataState: CasLiveState = { + printedHashes: new Set(), + lastHead: null, + completionEmitted: false, }; const infoState: InfoLiveState = { @@ -400,22 +461,29 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom process.on("SIGINT", onSigInt); try { - const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas); - if (firstData === 1) { - return 1; - } + await mkdir(dirname(target.threadsJsonPath), { recursive: true }); + const firstData = await pumpThreadsJson({ + storageRoot, + bundleDir: target.bundleDir, + bundleHash: target.bundleHash, + threadId: target.threadId, + state: dataState, + roleFilter, + cas, + }); + const infoPath = join(storageRoot, "logs", target.bundleHash, `${target.threadId}.info.jsonl`); if (parsed.debug && (await pathExists(infoPath))) { await pumpNewInfoContent(infoPath, infoState); } - if (firstData === 0 || dataState.completed) { + if (firstData === 0) { return 0; } const tasks = await buildLiveWatchTasks({ - dataPath, - infoPath, + storageRoot, + target, debug: parsed.debug, dataState, infoState, diff --git a/packages/cli-workflow/src/commands/thread/rm.ts b/packages/cli-workflow/src/commands/thread/rm.ts index 331b4ce..814e346 100644 --- a/packages/cli-workflow/src/commands/thread/rm.ts +++ b/packages/cli-workflow/src/commands/thread/rm.ts @@ -1,24 +1,35 @@ import { unlink } from "node:fs/promises"; -import { dirname, join } from "node:path"; -import { garbageCollectCas } from "@uncaged/workflow-execute"; +import { join } from "node:path"; +import { + garbageCollectCas, + removeThreadEntry, + removeThreadHistoryEntries, +} from "@uncaged/workflow-execute"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; -import { resolveThreadDataPath } from "../../thread-scan.js"; +import { resolveThreadRecord } from "../../thread-scan.js"; export async function cmdThreadRemove( storageRoot: string, threadId: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved === null) { return err(`thread not found: ${threadId}`); } - const dir = dirname(dataPath); - const infoPath = join(dir, `${threadId}.info.jsonl`); - const runningPath = join(dir, `${threadId}.running`); + if (resolved.source === "active") { + await removeThreadEntry(resolved.bundleDir, threadId); + } else { + const hist = await removeThreadHistoryEntries(resolved.bundleDir, threadId); + if (!hist.ok) { + return hist; + } + } + + const infoPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.info.jsonl`); + const runningPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.running`); - await unlink(dataPath); await unlink(infoPath).catch(() => {}); await unlink(runningPath).catch(() => {}); diff --git a/packages/cli-workflow/src/commands/thread/show.ts b/packages/cli-workflow/src/commands/thread/show.ts index 3669674..0514bf4 100644 --- a/packages/cli-workflow/src/commands/thread/show.ts +++ b/packages/cli-workflow/src/commands/thread/show.ts @@ -1,19 +1,44 @@ +import { createCasStore } from "@uncaged/workflow-cas"; +import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; +import { END } from "@uncaged/workflow-runtime"; +import { getGlobalCasDir } from "@uncaged/workflow-util"; -import { readTextFileIfExists } from "../../fs-utils.js"; -import { resolveThreadDataPath } from "../../thread-scan.js"; +import { resolveThreadRecord } from "../../thread-scan.js"; export async function cmdThreadShow( storageRoot: string, threadId: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { + const resolved = await resolveThreadRecord(storageRoot, threadId); + if (resolved === null) { return err(`thread not found: ${threadId}`); } - const text = await readTextFileIfExists(dataPath); - if (text === null) { - return err(`thread data missing: ${threadId}`); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, resolved.head); + const chronological = [...frames].reverse(); + + const steps: Array<{ role: string; hash: string; timestamp: number }> = []; + for (const fr of chronological) { + if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) { + continue; + } + steps.push({ + role: fr.payload.role, + hash: fr.hash, + timestamp: fr.payload.timestamp, + }); } - return ok(text.endsWith("\n") ? text.slice(0, -1) : text); + + const payload = { + threadId: resolved.threadId, + bundleHash: resolved.bundleHash, + head: resolved.head, + start: resolved.start, + source: resolved.source, + steps, + }; + + return ok(JSON.stringify(payload, null, 2)); } diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts index 642ff24..ab1889f 100644 --- a/packages/cli-workflow/src/thread-scan.ts +++ b/packages/cli-workflow/src/thread-scan.ts @@ -1,23 +1,87 @@ import { readdir, stat } from "node:fs/promises"; import { join } from "node:path"; +import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas"; +import { + readThreadsIndex, + type ThreadHistoryEntry, + type ThreadIndex, +} from "@uncaged/workflow-execute"; +import { getGlobalCasDir } from "@uncaged/workflow-util"; import { pathExists, readTextFileIfExists } from "./fs-utils.js"; -function parseFirstJsonLineObject(text: string): Record | null { - const firstLine = text.split("\n")[0]; - if (firstLine === undefined || firstLine.trim() === "") { +async function readWorkflowNameFromStartHash( + storageRoot: string, + startHash: string, +): Promise { + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const yamlText = await cas.get(startHash); + if (yamlText === null) { return null; } - let parsed: unknown; - try { - parsed = JSON.parse(firstLine) as unknown; - } catch { + const parsed = parseCasThreadNode(yamlText); + if (parsed === null || parsed.kind !== "start") { return null; } - if (parsed === null || typeof parsed !== "object") { - return null; + return parsed.node.payload.name; +} + +async function listBundleHashDirs(storageRoot: string): Promise { + const bundlesRoot = join(storageRoot, "bundles"); + if (!(await pathExists(bundlesRoot))) { + return []; } - return parsed as Record; + const names = await readdir(bundlesRoot); + const out: string[] = []; + for (const name of names) { + const p = join(bundlesRoot, name); + try { + const st = await stat(p); + if (st.isDirectory()) { + out.push(name); + } + } catch {} + } + out.sort(); + return out; +} + +async function parseHistoryFile(path: string): Promise { + const text = await readTextFileIfExists(path); + if (text === null) { + return []; + } + const out: ThreadHistoryEntry[] = []; + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + let raw: unknown; + try { + raw = JSON.parse(trimmed) as unknown; + } catch { + continue; + } + if (raw === null || typeof raw !== "object") { + continue; + } + const rec = raw as Record; + const threadId = rec.threadId; + const head = rec.head; + const start = rec.start; + const completedAt = rec.completedAt; + if ( + typeof threadId !== "string" || + typeof head !== "string" || + typeof start !== "string" || + typeof completedAt !== "number" + ) { + continue; + } + out.push({ threadId, head, start, completedAt }); + } + return out; } export type RunningThreadRow = { @@ -32,30 +96,76 @@ export type HistoricalThreadRow = { workflowName: string | null; }; -async function readThreadStartTimestampMs(dataPath: string): Promise { - const text = await readTextFileIfExists(dataPath); - if (text === null) { - return null; - } - const parsed = parseFirstJsonLineObject(text); - if (parsed === null) { - return null; - } - const ts = parsed.timestamp; - return typeof ts === "number" && Number.isFinite(ts) ? ts : null; -} +export type ResolvedThreadRecord = { + threadId: string; + bundleHash: string; + bundleDir: string; + head: string; + start: string; + source: "active" | "history"; +}; -async function readWorkflowNameFromDataJsonl(dataPath: string): Promise { - const text = await readTextFileIfExists(dataPath); - if (text === null) { - return null; +/** Resolve a thread via `threads.json` (active) or `history/*.jsonl` (completed). */ +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: scans all bundle dirs for thread id +export async function resolveThreadRecord( + storageRoot: string, + threadId: string, +): Promise { + const hashes = await listBundleHashDirs(storageRoot); + for (const bundleHash of hashes) { + const bundleDir = join(storageRoot, "bundles", bundleHash); + let index: ThreadIndex; + try { + index = await readThreadsIndex(bundleDir); + } catch { + continue; + } + const active = index[threadId]; + if (active !== undefined) { + return { + threadId, + bundleHash, + bundleDir, + head: active.head, + start: active.start, + source: "active", + }; + } } - const parsed = parseFirstJsonLineObject(text); - if (parsed === null) { - return null; + + for (const bundleHash of hashes) { + const bundleDir = join(storageRoot, "bundles", bundleHash); + const histDir = join(bundleDir, "history"); + if (!(await pathExists(histDir))) { + continue; + } + let files: string[]; + try { + files = await readdir(histDir); + } catch { + continue; + } + for (const name of files) { + if (!name.endsWith(".jsonl")) { + continue; + } + const entries = await parseHistoryFile(join(histDir, name)); + for (const e of entries) { + if (e.threadId === threadId) { + return { + threadId, + bundleHash, + bundleDir, + head: e.head, + start: e.start, + source: "history", + }; + } + } + } } - const name = parsed.name; - return typeof name === "string" ? name : null; + + return null; } /** Threads currently executing — identified via `.running` markers. */ @@ -82,8 +192,9 @@ export async function listRunningThreads(storageRoot: string): Promise { - const logsRoot = join(storageRoot, "logs"); - if (!(await pathExists(logsRoot))) { - return []; - } - - const hashes = await readdir(logsRoot); + const hashes = await listBundleHashDirs(storageRoot); + const seen = new Set(); const out: HistoricalThreadRow[] = []; - for (const hash of hashes) { - const dir = join(logsRoot, hash); - let entries: string[]; + for (const bundleHash of hashes) { + const bundleDir = join(storageRoot, "bundles", bundleHash); + let index: ThreadIndex; try { - entries = await readdir(dir); + index = await readThreadsIndex(bundleDir); } catch { continue; } - - for (const fileName of entries) { - if (!fileName.endsWith(".data.jsonl")) { + for (const threadId of Object.keys(index)) { + const key = `${bundleHash}/${threadId}`; + if (seen.has(key)) { continue; } - const threadId = fileName.slice(0, -".data.jsonl".length); - const dataPath = join(dir, fileName); - const workflowName = await readWorkflowNameFromDataJsonl(dataPath); + seen.add(key); + const entry = index[threadId]; + if (entry === undefined) { + continue; + } + const workflowName = await readWorkflowNameFromStartHash(storageRoot, entry.start); if (workflowNameFilter !== null && workflowName !== workflowNameFilter) { continue; } - out.push({ threadId, hash, workflowName }); + out.push({ threadId, hash: bundleHash, workflowName }); + } + + const histDir = join(bundleDir, "history"); + if (!(await pathExists(histDir))) { + continue; + } + let files: string[]; + try { + files = await readdir(histDir); + } catch { + continue; + } + for (const name of files) { + if (!name.endsWith(".jsonl")) { + continue; + } + const entries = await parseHistoryFile(join(histDir, name)); + for (const e of entries) { + const key = `${bundleHash}/${e.threadId}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + const workflowName = await readWorkflowNameFromStartHash(storageRoot, e.start); + if (workflowNameFilter !== null && workflowName !== workflowNameFilter) { + continue; + } + out.push({ threadId: e.threadId, hash: bundleHash, workflowName }); + } } } @@ -145,64 +285,93 @@ export async function listHistoricalThreads( return out; } +export type LatestThreadTarget = { + threadId: string; + bundleHash: string; + bundleDir: string; + threadsJsonPath: string; +}; + /** - * Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`, - * falling back to file `mtime` when the timestamp is missing. - * Tie-breaker: larger `mtime` wins when start timestamps are equal. + * Picks the newest thread by StartNode timestamp approximation (`updatedAt` active, + * else `completedAt` history), falling back to lexical thread id order. */ -export async function findLatestThreadDataPath( +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: compares active heads vs history tails +export async function findLatestThreadBundleTarget( storageRoot: string, -): Promise<{ threadId: string; dataPath: string } | null> { - const threads = await listHistoricalThreads(storageRoot, null); - if (threads.length === 0) { - return null; - } +): Promise { + const hashes = await listBundleHashDirs(storageRoot); let best: { threadId: string; - dataPath: string; - primary: number; - secondary: number; + bundleHash: string; + bundleDir: string; + ts: number; } | null = null; - for (const t of threads) { - const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`); - let mtimeMs = 0; + for (const bundleHash of hashes) { + const bundleDir = join(storageRoot, "bundles", bundleHash); + let index: ThreadIndex; try { - const st = await stat(dataPath); - mtimeMs = st.mtimeMs; + index = await readThreadsIndex(bundleDir); } catch { continue; } - const startTs = await readThreadStartTimestampMs(dataPath); - const primary = startTs !== null ? startTs : mtimeMs; - const secondary = mtimeMs; - if ( - best === null || - primary > best.primary || - (primary === best.primary && secondary > best.secondary) - ) { - best = { threadId: t.threadId, dataPath, primary, secondary }; + for (const threadId of Object.keys(index)) { + const ent = index[threadId]; + if (ent === undefined) { + continue; + } + const ts = ent.updatedAt; + const cand = { threadId, bundleHash, bundleDir, ts }; + if ( + best === null || + cand.ts > best.ts || + (cand.ts === best.ts && + `${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`) + ) { + best = cand; + } + } + + const histDir = join(bundleDir, "history"); + if (!(await pathExists(histDir))) { + continue; + } + let files: string[]; + try { + files = await readdir(histDir); + } catch { + continue; + } + for (const name of files) { + if (!name.endsWith(".jsonl")) { + continue; + } + const entries = await parseHistoryFile(join(histDir, name)); + for (const e of entries) { + const ts = e.completedAt; + const cand = { threadId: e.threadId, bundleHash, bundleDir, ts }; + if ( + best === null || + cand.ts > best.ts || + (cand.ts === best.ts && + `${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`) + ) { + best = cand; + } + } } } - return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath }; -} - -export async function resolveThreadDataPath( - storageRoot: string, - threadId: string, -): Promise { - const logsRoot = join(storageRoot, "logs"); - if (!(await pathExists(logsRoot))) { + if (best === null) { return null; } - const hashes = await readdir(logsRoot); - for (const hash of hashes) { - const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`); - if (await pathExists(candidate)) { - return candidate; - } - } - return null; + + return { + threadId: best.threadId, + bundleHash: best.bundleHash, + bundleDir: best.bundleDir, + threadsJsonPath: join(best.bundleDir, "threads.json"), + }; } diff --git a/packages/workflow-execute/__tests__/engine.test.ts b/packages/workflow-execute/__tests__/engine.test.ts index 220dec1..7d6cef9 100644 --- a/packages/workflow-execute/__tests__/engine.test.ts +++ b/packages/workflow-execute/__tests__/engine.test.ts @@ -40,6 +40,8 @@ function makeOptions(overrides: Partial): ExecuteThreadOpt awaitAfterEachYield: async () => {}, forkSourceThreadId: null, prefilledDiskSteps: null, + forkContinuation: null, + replayTimestamps: null, storageRoot: "/tmp/never", ...overrides, }; diff --git a/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts b/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts new file mode 100644 index 0000000..b9bbab6 --- /dev/null +++ b/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts @@ -0,0 +1,112 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { + createCasStore, + putContentNodeWithRefs, + putStartNode, + putStateNode, +} from "@uncaged/workflow-cas"; +import type { StateNodePayload } from "@uncaged/workflow-protocol"; + +import { FORK_BRANCH_ROLE } from "../src/engine/fork-thread.js"; +import { garbageCollectCas } from "../src/engine/gc.js"; +import { getBundleDir, removeThreadEntry, upsertThreadEntry } from "../src/engine/threads-index.js"; + +describe("garbageCollectCas (mark-and-sweep)", () => { + let storageRoot: string; + let casDir: string; + + beforeEach(async () => { + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-ms-")); + casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + await writeFile( + join(storageRoot, "workflow.yaml"), + "config:\n maxDepth: 1\n supervisorInterval: 0\n providers: {}\n models: {}\nworkflows: {}\n", + "utf8", + ); + }); + + afterEach(async () => { + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("shared CAS prefix survives when one fork thread index entry is removed", async () => { + const bundleHash = "TESTGC0000001"; + const bundleDir = getBundleDir(storageRoot, bundleHash); + await mkdir(bundleDir, { recursive: true }); + + const cas = createCasStore(casDir); + const promptHash = await cas.put("prompt"); + const startHash = await putStartNode( + cas, + { + name: "demo", + hash: bundleHash, + maxRounds: 5, + depth: 0, + }, + promptHash, + ); + + const c1 = await putContentNodeWithRefs(cas, "p1", []); + const h1 = await putStateNode(cas, { + role: "planner", + meta: {}, + start: startHash, + content: c1, + ancestors: [], + compact: null, + timestamp: 1, + } satisfies StateNodePayload); + + const c2 = await putContentNodeWithRefs(cas, "c1", []); + const h2 = await putStateNode(cas, { + role: "coder", + meta: {}, + start: startHash, + content: c2, + ancestors: [h1], + compact: null, + timestamp: 2, + } satisfies StateNodePayload); + + const ec = await putContentNodeWithRefs(cas, "", []); + const fm = await putStateNode(cas, { + role: FORK_BRANCH_ROLE, + meta: {}, + start: startHash, + content: ec, + ancestors: [h1], + compact: null, + timestamp: 3, + } satisfies StateNodePayload); + + await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", { + head: h2, + start: startHash, + updatedAt: 10, + }); + await upsertThreadEntry(bundleDir, "THREAD_BBBBBBB", { + head: fm, + start: startHash, + updatedAt: 20, + }); + + await removeThreadEntry(bundleDir, "THREAD_AAAAAAA"); + + const gc = await garbageCollectCas(storageRoot); + expect(gc.ok).toBe(true); + if (!gc.ok) { + return; + } + + expect(await cas.get(h2)).toBeNull(); + expect(await cas.get(h1)).not.toBeNull(); + expect(await cas.get(startHash)).not.toBeNull(); + expect(await cas.get(promptHash)).not.toBeNull(); + expect(await cas.get(fm)).not.toBeNull(); + }); +}); diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index b713314..27a40d4 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -33,20 +33,12 @@ import { removeThreadEntry, upsertThreadEntry, } from "./threads-index.js"; -import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js"; +import type { ChainState, ExecuteThreadIo, ExecuteThreadOptions } from "./types.js"; +import { EMPTY_CHAIN_STATE } from "./types.js"; /** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */ const ANCESTORS_CAP = 11; -type ChainState = { - /** State hash of the most recently written {@link StateNode}, or `null` before the first step. */ - parentStateHash: string | null; - /** Ancestors recorded on the most recently written {@link StateNode}. */ - parentAncestors: readonly string[]; -}; - -const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] }; - function computeAncestors(chain: ChainState): string[] { if (chain.parentStateHash === null) { return []; @@ -408,36 +400,56 @@ export async function executeThread( await mkdir(dirname(io.infoJsonlPath), { recursive: true }); const prefilled = options.prefilledDiskSteps; + const fork = options.forkContinuation; + + if (fork !== null && prefilled !== null) { + throw new Error("forkContinuation and prefilledDiskSteps cannot both be set"); + } + if (prefilled !== null && prefilled.length !== input.steps.length) { throw new Error( `prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`, ); } + const replayTs = options.replayTimestamps; + if (replayTs !== null && replayTs.length !== input.steps.length) { + throw new Error( + `replayTimestamps length (${replayTs.length}) must match input.steps length (${input.steps.length})`, + ); + } + const bundleDir = getBundleDir(options.storageRoot, io.hash); - const promptHash = await io.cas.put(input.prompt); - const startHash = await putStartNode( - io.cas, - { - name: workflowName, - hash: io.hash, - maxRounds: options.maxRounds, - depth: options.depth, - }, - promptHash, - ); + let startHash: string; - await publishHead({ - bundleDir, - threadId: io.threadId, - startHash, - headHash: startHash, - }); + if (fork !== null) { + startHash = fork.startHash; + logger("T9HQ2KHM", `thread ${io.threadId} continued fork for workflow ${workflowName}`); + } else { + const promptHash = await io.cas.put(input.prompt); + startHash = await putStartNode( + io.cas, + { + name: workflowName, + hash: io.hash, + maxRounds: options.maxRounds, + depth: options.depth, + }, + promptHash, + ); - logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`); + await publishHead({ + bundleDir, + threadId: io.threadId, + startHash, + headHash: startHash, + }); - let chain: ChainState = EMPTY_CHAIN; + logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`); + } + + let chain: ChainState = fork !== null ? fork.initialChain : EMPTY_CHAIN_STATE; if (prefilled !== null) { for (const row of prefilled) { @@ -497,7 +509,7 @@ export async function executeThread( contentHash: out.contentHash, meta: out.meta, refs: out.refs, - timestamp: prefilled?.[i]?.timestamp ?? nowMs + i, + timestamp: replayTs?.[i] ?? prefilled?.[i]?.timestamp ?? nowMs + i, })), }; diff --git a/packages/workflow-execute/src/engine/fork-thread.ts b/packages/workflow-execute/src/engine/fork-thread.ts index b564973..477734d 100644 --- a/packages/workflow-execute/src/engine/fork-thread.ts +++ b/packages/workflow-execute/src/engine/fork-thread.ts @@ -1,9 +1,29 @@ -import type { WorkflowCompletion } from "@uncaged/workflow-runtime"; -import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util"; +import type { CasStore } from "@uncaged/workflow-cas"; +import { parseCasThreadNode, putContentNodeWithRefs, putStateNode } from "@uncaged/workflow-cas"; +import type { StateNodePayload } from "@uncaged/workflow-protocol"; +import type { RoleOutput, WorkflowCompletion } from "@uncaged/workflow-runtime"; +import { END } from "@uncaged/workflow-runtime"; +import { err, ok, type Result } from "@uncaged/workflow-util"; +import { parse as parseYaml } from "yaml"; -import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js"; +import { upsertThreadEntry } from "./threads-index.js"; +import type { CasForkPlan, ChainState, ForkContinuationOptions } from "./types.js"; +import { EMPTY_CHAIN_STATE } from "./types.js"; -/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */ +/** Internal branch marker; skipped when presenting fork selection / replay slices. */ +export const FORK_BRANCH_ROLE = "__fork__"; + +/** Cap for {@link StateNodePayload}.ancestors: 1 parent + 10 skip-list. */ +const ANCESTORS_CAP = 11; + +function computeAncestors(chain: ChainState): string[] { + if (chain.parentStateHash === null) { + return []; + } + return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP); +} + +/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */ export function tryParseWorkflowResultRecord( obj: Record, ): WorkflowCompletion | null { @@ -18,227 +38,288 @@ export function tryParseWorkflowResultRecord( return { returnCode, summary }; } -export function tryParseRoleStepRecord(obj: Record): ForkHistoricalStep | null { - const role = obj.role; - const contentHash = obj.contentHash; - const meta = obj.meta; - const timestamp = obj.timestamp; - if (typeof role !== "string") { - return null; - } - if (typeof contentHash !== "string") { - return null; - } - if (meta === null || typeof meta !== "object") { - return null; - } - if (typeof timestamp !== "number") { - return null; - } - return { - role, - contentHash, - meta: meta as Record, - refs: normalizeRefsField(obj.refs), - timestamp, - }; -} - -function parseRoleLine( - obj: Record, - lineIndex: number, -): Result { - const parsed = tryParseRoleStepRecord(obj); - if (parsed === null) { - return err(`invalid role record at line ${lineIndex}`); - } - return ok(parsed); -} - -function parseStartRecordLine(firstLine: string): Result { - let startParsed: unknown; - try { - startParsed = JSON.parse(firstLine) as unknown; - } catch { - return err("invalid JSON on line 1 (start record)"); - } - if (startParsed === null || typeof startParsed !== "object") { - return err("invalid start record shape"); - } - const startRec = startParsed as Record; - const name = startRec.name; - const hash = startRec.hash; - const threadId = startRec.threadId; - const parameters = startRec.parameters; - if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") { - return err("start record missing name, hash, or threadId"); - } - if (parameters === null || typeof parameters !== "object") { - return err("start record missing parameters"); - } - const paramsRec = parameters as Record; - const prompt = paramsRec.prompt; - const options = paramsRec.options; - if (typeof prompt !== "string") { - return err("start record missing parameters.prompt"); - } - if (options === null || typeof options !== "object") { - return err("start record missing parameters.options"); - } - const optRec = options as Record; - const maxRounds = optRec.maxRounds; - if (typeof maxRounds !== "number") { - return err("start record missing parameters.options.maxRounds"); - } - - const depthRaw = optRec.depth; - const depth = - typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0; - - return ok({ - workflowName: name, - hash, - threadId, - prompt, - maxRounds, - depth, - }); -} - -function parseFollowingRoleLines(lines: string[]): Result { - const roleSteps: ForkHistoricalStep[] = []; - for (let i = 1; i < lines.length; i++) { - const line = lines[i]; - if (line === undefined) { +/** Walk {@link StateNode} hashes from head toward the first step (newest → oldest). */ +export async function walkStateFramesNewestFirst( + cas: CasStore, + headHash: string, +): Promise> { + const frames: Array<{ hash: string; payload: StateNodePayload }> = []; + let cur = headHash; + while (true) { + const yamlText = await cas.get(cur); + if (yamlText === null) { break; } - let rec: unknown; - try { - rec = JSON.parse(line) as unknown; - } catch { - return err(`invalid JSON at line ${i + 1}`); - } - if (rec === null || typeof rec !== "object") { - return err(`invalid record at line ${i + 1}`); - } - const recObj = rec as Record; - const wf = tryParseWorkflowResultRecord(recObj); - if (wf !== null) { - if (i !== lines.length - 1) { - return err("WorkflowResult record must be the final line in `.data.jsonl`"); - } + const parsed = parseCasThreadNode(yamlText); + if (parsed === null || parsed.kind !== "state") { break; } - const parsed = parseRoleLine(recObj, i + 1); - if (!parsed.ok) { - return parsed; + frames.push({ hash: cur, payload: parsed.node.payload }); + const ancestors = parsed.node.payload.ancestors; + if (ancestors.length === 0) { + break; } - roleSteps.push(parsed.value); + const parent = ancestors[0]; + if (parent === undefined || parent === "") { + break; + } + cur = parent; } - return ok(roleSteps); + return frames; } -/** - * Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs. - */ -export function parseThreadDataJsonl(text: string): Result< - { - start: ParsedThreadStartRecord; - roleSteps: ForkHistoricalStep[]; - }, - string -> { - const lines = text - .split("\n") - .map((l) => l.trim()) - .filter((l) => l !== ""); - if (lines.length === 0) { - return err("thread data is empty"); - } - - const firstLine = lines[0]; - if (firstLine === undefined) { - return err("thread data is empty"); - } - - const start = parseStartRecordLine(firstLine); - if (!start.ok) { - return start; - } - - const roleSteps = parseFollowingRoleLines(lines); - if (!roleSteps.ok) { - return roleSteps; - } - - return ok({ - start: start.value, - roleSteps: roleSteps.value, - }); -} - -function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] { +function orderedUniqueRoles(roles: string[]): string[] { const seen = new Set(); const out: string[] = []; - for (const s of roleSteps) { - if (!seen.has(s.role)) { - seen.add(s.role); - out.push(s.role); + for (const r of roles) { + if (!seen.has(r)) { + seen.add(r); + out.push(r); } } return out; } -/** - * Select historical steps for a fork: - * - `fromRole === null`: drop the last step (retry the last role). - * - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive). - */ -export function selectForkHistoricalSteps( - roleSteps: ForkHistoricalStep[], +async function readPromptText(cas: CasStore, promptHash: string): Promise> { + const yamlText = await cas.get(promptHash); + if (yamlText === null) { + return err(`prompt CAS blob missing: ${promptHash}`); + } + let raw: unknown; + try { + raw = parseYaml(yamlText) as unknown; + } catch { + return err(`prompt CAS blob is not valid YAML: ${promptHash}`); + } + if (raw === null || typeof raw !== "object") { + return err(`prompt CAS blob has unexpected shape: ${promptHash}`); + } + const payload = (raw as Record).payload; + if (typeof payload !== "string") { + return err(`prompt CAS blob missing string payload: ${promptHash}`); + } + return ok(payload); +} + +async function readStartWorkflowIdentity(params: { + cas: CasStore; + startHash: string; +}): Promise< + Result<{ workflowName: string; maxRounds: number; depth: number; prompt: string }, string> +> { + const yamlText = await params.cas.get(params.startHash); + if (yamlText === null) { + return err(`start node missing in CAS: ${params.startHash}`); + } + const parsed = parseCasThreadNode(yamlText); + if (parsed === null || parsed.kind !== "start") { + return err(`CAS blob is not a StartNode: ${params.startHash}`); + } + const refs = parsed.node.refs; + const promptHash = refs[0]; + if (typeof promptHash !== "string") { + return err("StartNode refs[0] must be the prompt hash"); + } + const prompt = await readPromptText(params.cas, promptHash); + if (!prompt.ok) { + return prompt; + } + const p = parsed.node.payload; + return ok({ + workflowName: p.name, + maxRounds: p.maxRounds, + depth: p.depth, + prompt: prompt.value, + }); +} + +async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Promise { + let refs: string[] = []; + const blob = await cas.get(payload.content); + if (blob !== null) { + const cn = parseCasThreadNode(blob); + if (cn?.kind === "content") { + refs = [...cn.node.refs]; + } + } + return { + role: payload.role, + contentHash: payload.content, + meta: payload.meta, + refs, + }; +} + +function meaningfulFramesOldestFirst( + newestFirst: Array<{ hash: string; payload: StateNodePayload }>, +): Array<{ hash: string; payload: StateNodePayload }> { + const chronological = [...newestFirst].reverse(); + return chronological.filter((f) => f.payload.role !== END && f.payload.role !== FORK_BRANCH_ROLE); +} + +function selectForkPointStateHash( + meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>, fromRole: string | null, -): Result { - if (roleSteps.length === 0) { +): Result { + if (meaningfulOldestFirst.length === 0) { return err("thread has no completed role steps to fork from"); } if (fromRole === null) { - if (roleSteps.length === 1) { - return ok([]); + if (meaningfulOldestFirst.length === 1) { + return ok(null); } - return ok(roleSteps.slice(0, -1)); + const forkFrame = meaningfulOldestFirst[meaningfulOldestFirst.length - 2]; + if (forkFrame === undefined) { + return err("thread has no completed role steps to fork from"); + } + return ok(forkFrame.hash); } - const idx = roleSteps.findIndex((s) => s.role === fromRole); + const idx = meaningfulOldestFirst.findIndex((f) => f.payload.role === fromRole); if (idx < 0) { - const available = orderedUniqueRoles(roleSteps); + const available = orderedUniqueRoles(meaningfulOldestFirst.map((f) => f.payload.role)); return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`); } - return ok(roleSteps.slice(0, idx + 1)); + const forkFrame = meaningfulOldestFirst[idx]; + if (forkFrame === undefined) { + return err("fork frame missing"); + } + return ok(forkFrame.hash); +} + +function replayFramesThroughForkPoint( + meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>, + forkPointHash: string | null, +): Array<{ hash: string; payload: StateNodePayload }> { + if (forkPointHash === null) { + return []; + } + const idx = meaningfulOldestFirst.findIndex((f) => f.hash === forkPointHash); + if (idx < 0) { + return []; + } + return meaningfulOldestFirst.slice(0, idx + 1); +} + +async function buildForkContinuation(params: { + cas: CasStore; + sourceThreadId: string; + startHash: string; + forkPointStateHash: string | null; +}): Promise> { + const { cas, sourceThreadId, startHash, forkPointStateHash } = params; + + if (forkPointStateHash === null) { + return ok({ + startHash, + forkHeadHash: startHash, + initialChain: EMPTY_CHAIN_STATE, + }); + } + + const yamlText = await cas.get(forkPointStateHash); + if (yamlText === null) { + return err(`fork point state missing in CAS: ${forkPointStateHash}`); + } + const parsed = parseCasThreadNode(yamlText); + if (parsed === null || parsed.kind !== "state") { + return err(`fork point blob is not a StateNode: ${forkPointStateHash}`); + } + const fpPayload = parsed.node.payload; + + const chainBefore: ChainState = { + parentStateHash: forkPointStateHash, + parentAncestors: fpPayload.ancestors, + }; + const ancestorsMarker = computeAncestors(chainBefore); + + const emptyContentHash = await putContentNodeWithRefs(cas, "", []); + const markerPayload: StateNodePayload = { + role: FORK_BRANCH_ROLE, + meta: { forkFrom: sourceThreadId }, + start: startHash, + content: emptyContentHash, + ancestors: ancestorsMarker, + compact: null, + timestamp: Date.now(), + }; + const markerHash = await putStateNode(cas, markerPayload); + + const initialChain: ChainState = { + parentStateHash: markerHash, + parentAncestors: ancestorsMarker, + }; + + return ok({ + startHash, + forkHeadHash: markerHash, + initialChain, + }); } /** - * Read `.data.jsonl` text and compute fork payload for the worker `run` command. + * Prepare a CAS fork: writes the branch marker {@link StateNode}, registers `threads.json`, + * and returns worker payload fields (shared {@link StartNode}, zero ancestor duplication). */ -export function buildForkPlan( - dataJsonlText: string, - fromRole: string | null, -): Result { - const parsed = parseThreadDataJsonl(dataJsonlText); - if (!parsed.ok) { - return parsed; +export async function prepareCasFork(params: { + cas: CasStore; + bundleDir: string; + bundleHash: string; + sourceThreadId: string; + headHash: string; + startHash: string; + newThreadId: string; + fromRole: string | null; +}): Promise> { + const id = await readStartWorkflowIdentity({ + cas: params.cas, + startHash: params.startHash, + }); + if (!id.ok) { + return id; } - const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole); - if (!selected.ok) { - return selected; + + const newestFirst = await walkStateFramesNewestFirst(params.cas, params.headHash); + const meaningful = meaningfulFramesOldestFirst(newestFirst); + + const forkPoint = selectForkPointStateHash(meaningful, params.fromRole); + if (!forkPoint.ok) { + return forkPoint; } - const { start } = parsed.value; + + const replayFrames = replayFramesThroughForkPoint(meaningful, forkPoint.value); + const steps: RoleOutput[] = []; + const stepTimestamps: number[] = []; + for (const fr of replayFrames) { + steps.push(await payloadToRoleOutput(params.cas, fr.payload)); + stepTimestamps.push(fr.payload.timestamp); + } + + const cont = await buildForkContinuation({ + cas: params.cas, + sourceThreadId: params.sourceThreadId, + startHash: params.startHash, + forkPointStateHash: forkPoint.value, + }); + if (!cont.ok) { + return cont; + } + + await upsertThreadEntry(params.bundleDir, params.newThreadId, { + head: cont.value.forkHeadHash, + start: params.startHash, + updatedAt: Date.now(), + }); + return ok({ - workflowName: start.workflowName, - hash: start.hash, - sourceThreadId: start.threadId, - prompt: start.prompt, - runOptions: { maxRounds: start.maxRounds, depth: start.depth }, - historicalSteps: selected.value, + workflowName: id.value.workflowName, + hash: params.bundleHash, + sourceThreadId: params.sourceThreadId, + prompt: id.value.prompt, + runOptions: { maxRounds: id.value.maxRounds, depth: id.value.depth }, + steps, + stepTimestamps, + forkContinuation: cont.value, }); } diff --git a/packages/workflow-execute/src/engine/gc.ts b/packages/workflow-execute/src/engine/gc.ts index 3d1007d..df5a347 100644 --- a/packages/workflow-execute/src/engine/gc.ts +++ b/packages/workflow-execute/src/engine/gc.ts @@ -1,122 +1,182 @@ -import { readdir, readFile } from "node:fs/promises"; +import type { Stats } from "node:fs"; +import { readdir, readFile, stat } from "node:fs/promises"; import { join } from "node:path"; -import { type CasStore, createCasStore } from "@uncaged/workflow-cas"; +import { type CasStore, createCasStore, findReachableHashes } from "@uncaged/workflow-cas"; import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util"; -import { parseThreadDataJsonl } from "./fork-thread.js"; + +import type { ThreadHistoryEntry, ThreadIndex } from "./threads-index.js"; +import { readThreadsIndex } from "./threads-index.js"; import type { GcResult } from "./types.js"; -async function listThreadDataJsonlPaths(storageRoot: string): Promise> { - const logsRoot = join(storageRoot, "logs"); - const paths: string[] = []; - let hashes: string[]; +function isPlainObject(v: unknown): v is Record { + return v !== null && typeof v === "object" && !Array.isArray(v); +} + +function parseHistoryLine(jsonLine: string): ThreadHistoryEntry | null { + let raw: unknown; try { - hashes = await readdir(logsRoot); + raw = JSON.parse(jsonLine) as unknown; + } catch { + return null; + } + if (!isPlainObject(raw)) { + return null; + } + const threadId = raw.threadId; + const head = raw.head; + const start = raw.start; + const completedAt = raw.completedAt; + if ( + typeof threadId !== "string" || + typeof head !== "string" || + typeof start !== "string" || + typeof completedAt !== "number" + ) { + return null; + } + return { threadId, head, start, completedAt }; +} + +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: walks threads index + optional history dir +async function collectGcRootsFromBundle(bundleDir: string): Promise> { + const roots: string[] = []; + + let activeIndex: ThreadIndex; + try { + activeIndex = await readThreadsIndex(bundleDir); + } catch (e) { + return err(`failed to read threads.json under ${bundleDir}: ${String(e)}`); + } + + for (const entry of Object.values(activeIndex)) { + roots.push(entry.head); + roots.push(entry.start); + } + + const histDir = join(bundleDir, "history"); + let histFiles: string[]; + try { + histFiles = await readdir(histDir); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return ok(roots); + } + return err(`failed to read history directory ${histDir}: ${String(e)}`); + } + + for (const name of histFiles) { + if (!name.endsWith(".jsonl")) { + continue; + } + let text: string; + try { + text = await readFile(join(histDir, name), "utf8"); + } catch (e) { + return err(`failed to read history file ${name}: ${String(e)}`); + } + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + const entry = parseHistoryLine(trimmed); + if (entry === null) { + continue; + } + roots.push(entry.head); + roots.push(entry.start); + } + } + + return ok(roots); +} + +async function collectAllGcRoots(storageRoot: string): Promise> { + const bundlesRoot = join(storageRoot, "bundles"); + let entries: string[]; + try { + entries = await readdir(bundlesRoot); } catch (e) { const errObj = e as NodeJS.ErrnoException; if (errObj.code === "ENOENT") { return ok([]); } - return err(`failed to read logs directory: ${String(e)}`); + return err(`failed to read bundles directory: ${String(e)}`); } - for (const hash of hashes) { - const dir = join(logsRoot, hash); - let entries: string[]; + const roots: string[] = []; + for (const name of entries) { + const bundleDir = join(bundlesRoot, name); + let st: Stats; try { - entries = await readdir(dir); + st = await stat(bundleDir); } catch { continue; } - for (const fileName of entries) { - if (fileName.endsWith(".data.jsonl")) { - paths.push(join(dir, fileName)); - } + if (!st.isDirectory()) { + continue; } + const chunk = await collectGcRootsFromBundle(bundleDir); + if (!chunk.ok) { + return chunk; + } + roots.push(...chunk.value); } - paths.sort(); - return ok(paths); + return ok(roots); } -async function collectActiveRefsFromDataPaths( - dataPaths: string[], -): Promise, string>> { - const activeRefs = new Set(); - for (const dataPath of dataPaths) { - let text: string; - try { - text = await readFile(dataPath, "utf8"); - } catch (e) { - return err(`failed to read ${dataPath}: ${String(e)}`); - } - const parsed = parseThreadDataJsonl(text); - if (!parsed.ok) { - return err(`${dataPath}: ${parsed.error}`); - } - for (const step of parsed.value.roleSteps) { - for (const ref of step.refs) { - activeRefs.add(ref); - } - } - } - return ok(activeRefs); -} - -async function deleteCasNotInSet( - cas: CasStore, - activeRefs: Set, -): Promise> { +async function deleteCasNotMarked(cas: CasStore, marked: ReadonlySet): Promise { let listed: string[]; try { listed = await cas.list(); } catch (e) { - return err(`failed to list cas entries: ${String(e)}`); + throw new Error(`failed to list cas entries: ${String(e)}`); } const deletedHashes: string[] = []; for (const hash of listed) { - if (activeRefs.has(hash)) { + if (marked.has(hash)) { continue; } try { await cas.delete(hash); } catch (e) { - return err(`failed to delete cas ${hash}: ${String(e)}`); + throw new Error(`failed to delete cas ${hash}: ${String(e)}`); } deletedHashes.push(hash); } deletedHashes.sort(); - return ok(deletedHashes); + return deletedHashes; } /** - * Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`, - * then delete CAS blobs not referenced by any surviving thread data. + * Mark-and-sweep CAS GC: roots are every `head` / `start` hash from `threads.json` and + * `history/*.jsonl` across bundle dirs; marks closure via `refs[]`; deletes unreachable blobs. */ export async function garbageCollectCas(storageRoot: string): Promise> { - const pathsResult = await listThreadDataJsonlPaths(storageRoot); - if (!pathsResult.ok) { - return pathsResult; + const rootsResult = await collectAllGcRoots(storageRoot); + if (!rootsResult.ok) { + return rootsResult; } - const paths = pathsResult.value; - - const refsResult = await collectActiveRefsFromDataPaths(paths); - if (!refsResult.ok) { - return refsResult; - } - const activeRefs = refsResult.value; + const roots = rootsResult.value; const cas = createCasStore(getGlobalCasDir(storageRoot)); - const deletedResult = await deleteCasNotInSet(cas, activeRefs); - if (!deletedResult.ok) { - return deletedResult; + + const marked = await findReachableHashes(roots, cas); + + let deletedHashes: string[]; + try { + deletedHashes = await deleteCasNotMarked(cas, marked); + } catch (e) { + return err(String(e)); } - const deletedHashes = deletedResult.value; return ok({ - scannedThreads: paths.length, - activeRefs: activeRefs.size, + scannedThreads: roots.length, + activeRefs: marked.size, deletedEntries: deletedHashes.length, deletedHashes, }); diff --git a/packages/workflow-execute/src/engine/index.ts b/packages/workflow-execute/src/engine/index.ts index 85bcaa4..4a64b4d 100644 --- a/packages/workflow-execute/src/engine/index.ts +++ b/packages/workflow-execute/src/engine/index.ts @@ -1,11 +1,10 @@ export { createWorkflow } from "./create-workflow.js"; export { executeThread } from "./engine.js"; export { - buildForkPlan, - parseThreadDataJsonl, - selectForkHistoricalSteps, - tryParseRoleStepRecord, + FORK_BRANCH_ROLE, + prepareCasFork, tryParseWorkflowResultRecord, + walkStateFramesNewestFirst, } from "./fork-thread.js"; export { garbageCollectCas } from "./gc.js"; export { createThreadPauseGate } from "./thread-pause-gate.js"; @@ -13,18 +12,22 @@ export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./thread export { appendThreadHistoryEntry, getBundleDir, + readThreadsIndex, removeThreadEntry, + removeThreadHistoryEntries, upsertThreadEntry, + writeThreadsIndex, } from "./threads-index.js"; export type { + CasForkPlan, + ChainState, ExecuteThreadIo, ExecuteThreadOptions, - ForkHistoricalStep, - ForkPlan, + ForkContinuationOptions, GcResult, - ParsedThreadStartRecord, PrefilledDiskStep, SupervisorDecision, ThreadPauseGate, } from "./types.js"; +export { EMPTY_CHAIN_STATE } from "./types.js"; export { getWorkerHostScriptPath } from "./worker-entry-path.js"; diff --git a/packages/workflow-execute/src/engine/threads-index.ts b/packages/workflow-execute/src/engine/threads-index.ts index 89d698a..2f4f7cf 100644 --- a/packages/workflow-execute/src/engine/threads-index.ts +++ b/packages/workflow-execute/src/engine/threads-index.ts @@ -1,6 +1,8 @@ -import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises"; +import { appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises"; import { dirname, join } from "node:path"; +import { err, ok, type Result } from "@uncaged/workflow-util"; + /** * Active-thread index entry stored in `/threads.json`. * @@ -71,7 +73,8 @@ function parseThreadIndex(text: string): ThreadIndex { return out; } -async function readThreadIndex(bundleDir: string): Promise { +/** Read `/threads.json` (empty object when missing or invalid). */ +export async function readThreadsIndex(bundleDir: string): Promise { const path = threadsJsonPath(bundleDir); let text: string; try { @@ -86,7 +89,7 @@ async function readThreadIndex(bundleDir: string): Promise { return parseThreadIndex(text); } -async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise { +export async function writeThreadsIndex(bundleDir: string, index: ThreadIndex): Promise { const path = threadsJsonPath(bundleDir); await mkdir(dirname(path), { recursive: true }); const tmp = `${path}.tmp.${process.pid}.${Date.now()}`; @@ -101,19 +104,19 @@ export async function upsertThreadEntry( threadId: string, entry: ThreadIndexEntry, ): Promise { - const index = await readThreadIndex(bundleDir); + const index = await readThreadsIndex(bundleDir); index[threadId] = entry; - await writeThreadIndex(bundleDir, index); + await writeThreadsIndex(bundleDir, index); } /** Remove a thread entry from `threads.json` (no-op when absent). */ export async function removeThreadEntry(bundleDir: string, threadId: string): Promise { - const index = await readThreadIndex(bundleDir); + const index = await readThreadsIndex(bundleDir); if (!(threadId in index)) { return; } delete index[threadId]; - await writeThreadIndex(bundleDir, index); + await writeThreadsIndex(bundleDir, index); } function dateKey(epochMs: number): string { @@ -134,3 +137,63 @@ export async function appendThreadHistoryEntry( const line = `${JSON.stringify(entry)}\n`; await appendFile(path, line, "utf8"); } + +/** Removes every `history/*.jsonl` line whose `threadId` matches (rewrite files in place). */ +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: per-file JSONL filtering keeps RM deterministic +export async function removeThreadHistoryEntries( + bundleDir: string, + threadId: string, +): Promise> { + const histRoot = join(bundleDir, "history"); + let files: string[]; + try { + files = await readdir(histRoot); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return ok(0); + } + return err(`failed to read history directory: ${String(e)}`); + } + + let removed = 0; + for (const name of files) { + if (!name.endsWith(".jsonl")) { + continue; + } + const path = join(histRoot, name); + let text: string; + try { + text = await readFile(path, "utf8"); + } catch { + continue; + } + const kept: string[] = []; + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + let rec: unknown; + try { + rec = JSON.parse(trimmed) as unknown; + } catch { + kept.push(`${trimmed}\n`); + continue; + } + if (rec === null || typeof rec !== "object") { + kept.push(`${trimmed}\n`); + continue; + } + const id = (rec as Record).threadId; + if (id === threadId) { + removed++; + continue; + } + kept.push(`${trimmed}\n`); + } + await writeFile(path, kept.join(""), "utf8"); + } + + return ok(removed); +} diff --git a/packages/workflow-execute/src/engine/types.ts b/packages/workflow-execute/src/engine/types.ts index 0bbc863..0b878ac 100644 --- a/packages/workflow-execute/src/engine/types.ts +++ b/packages/workflow-execute/src/engine/types.ts @@ -11,7 +11,25 @@ export type ExecuteThreadIo = { cas: CasStore; }; -/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */ +/** CAS chain tail state before the next appended {@link StateNode}. */ +export type ChainState = { + parentStateHash: string | null; + parentAncestors: readonly string[]; +}; + +export const EMPTY_CHAIN_STATE: ChainState = { parentStateHash: null, parentAncestors: [] }; + +/** + * When forking, the worker continues from an existing {@link StartNode} plus an optional + * branch marker {@link StateNode} instead of allocating a new start blob. + */ +export type ForkContinuationOptions = { + startHash: string; + forkHeadHash: string; + initialChain: ChainState; +}; + +/** One replayed role step (prefill) before the generator runs (same layout as disk replay rows). */ export type PrefilledDiskStep = { role: string; contentHash: string; @@ -30,37 +48,36 @@ export type ExecuteThreadOptions = { /** When non-null, written into the start record so tooling can trace lineage. */ forkSourceThreadId: string | null; /** - * Written to `.data.jsonl` immediately after the start record, before the generator runs. + * When non-null, replays these steps into CAS before the generator runs. * Must match `input.steps` length and order when present. */ prefilledDiskSteps: PrefilledDiskStep[] | null; + /** When non-null, skip creating a new {@link StartNode} and continue this CAS chain. */ + forkContinuation: ForkContinuationOptions | null; + /** + * When non-null, must match `input.steps.length`; supplies persisted timestamps for + * {@link ThreadContext.steps} (used when restoring history without prefilled CAS replay). + */ + replayTimestamps: readonly number[] | null; /** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */ storageRoot: string; }; -/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */ -export type ForkHistoricalStep = RoleOutput & { timestamp: number }; - -export type ParsedThreadStartRecord = { - workflowName: string; - hash: string; - threadId: string; - prompt: string; - maxRounds: number; - depth: number; -}; - -export type ForkPlan = { +export type CasForkPlan = { workflowName: string; hash: string; sourceThreadId: string; prompt: string; runOptions: { maxRounds: number; depth: number }; - historicalSteps: ForkHistoricalStep[]; + steps: RoleOutput[]; + stepTimestamps: number[]; + forkContinuation: ForkContinuationOptions; }; export type GcResult = { + /** Count of root hashes seeded from thread indexes (`head`/`start` per entry). */ scannedThreads: number; + /** Reachable CAS blobs after the mark phase. */ activeRefs: number; deletedEntries: number; deletedHashes: string[]; diff --git a/packages/workflow-execute/src/engine/worker.ts b/packages/workflow-execute/src/engine/worker.ts index 0323dfb..f284938 100644 --- a/packages/workflow-execute/src/engine/worker.ts +++ b/packages/workflow-execute/src/engine/worker.ts @@ -17,7 +17,12 @@ import { } from "@uncaged/workflow-util"; import { executeThread } from "./engine.js"; import { createThreadPauseGate } from "./thread-pause-gate.js"; -import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js"; +import type { + ExecuteThreadIo, + ForkContinuationOptions, + PrefilledDiskStep, + ThreadPauseGate, +} from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); @@ -28,9 +33,10 @@ type RunCommand = { prompt: string; options: { maxRounds: number; depth: number }; steps: RoleOutput[]; - /** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */ + /** Timestamps aligned with `steps` for replay / fork restore; length must match `steps` when steps are non-empty. */ stepTimestamps: number[] | null; forkSourceThreadId: string | null; + forkContinuation: ForkContinuationOptions | null; }; type KillCommand = { @@ -73,6 +79,7 @@ function parseRoleOutputRecord(obj: Record): RoleOutput | null }; } +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: mirrors permissive worker IPC decoding shape checks function parseRunStepsPayload(rec: Record): { steps: RoleOutput[]; stepTimestamps: number[] | null; @@ -107,12 +114,60 @@ function parseRunStepsPayload(rec: Record): { return null; } } + + const parallelTsRaw = rec.stepTimestamps; + if ( + steps.length > 0 && + Array.isArray(parallelTsRaw) && + parallelTsRaw.length === steps.length && + parallelTsRaw.every((x): x is number => typeof x === "number") + ) { + return { steps, stepTimestamps: [...parallelTsRaw] }; + } + return { steps, stepTimestamps: anyTimestamp ? timestamps : null, }; } +function parseForkContinuation(rec: Record): ForkContinuationOptions | null { + const raw = rec.forkContinuation; + if (raw === undefined || raw === null) { + return null; + } + if (typeof raw !== "object") { + return null; + } + const o = raw as Record; + const startHash = o.startHash; + const forkHeadHash = o.forkHeadHash; + const ic = o.initialChain; + if (typeof startHash !== "string" || typeof forkHeadHash !== "string") { + return null; + } + if (ic === null || typeof ic !== "object") { + return null; + } + const ich = ic as Record; + const pph = ich.parentStateHash; + const pa = ich.parentAncestors; + if (!(pph === null || typeof pph === "string")) { + return null; + } + if (!Array.isArray(pa) || !pa.every((x) => typeof x === "string")) { + return null; + } + return { + startHash, + forkHeadHash, + initialChain: { + parentStateHash: pph, + parentAncestors: pa, + }, + }; +} + function parseRunControlPayload(rec: Record): RunCommand | null { const threadId = rec.threadId; const workflowName = rec.workflowName; @@ -148,6 +203,7 @@ function parseRunControlPayload(rec: Record): RunCommand | null } forkSourceThreadId = rawFork; } + const forkContinuation = parseForkContinuation(rec); return { type: "run", threadId, @@ -157,6 +213,7 @@ function parseRunControlPayload(rec: Record): RunCommand | null steps: parsedSteps.steps, stepTimestamps: parsedSteps.stepTimestamps, forkSourceThreadId, + forkContinuation, }; } @@ -357,6 +414,7 @@ async function main(): Promise { } } + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: TCP worker multiplexes lifecycle + runs async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise { if (cmd.type !== "run") { dispatchThreadLifecycleCommand(threads, socket, cmd); @@ -394,7 +452,19 @@ async function main(): Promise { const baseTs = Date.now(); let prefilledDiskSteps: PrefilledDiskStep[] | null = null; - if (cmd.steps.length > 0) { + let replayTimestamps: readonly number[] | null = null; + + if (cmd.forkContinuation !== null) { + if ( + cmd.steps.length > 0 && + (cmd.stepTimestamps === null || cmd.stepTimestamps.length !== cmd.steps.length) + ) { + bootLog("J5WQ8NXT", "forkContinuation requires stepTimestamps aligned with steps"); + throw new Error("forkContinuation requires stepTimestamps aligned with steps"); + } + replayTimestamps = + cmd.steps.length === 0 ? null : (cmd.stepTimestamps as readonly number[]); + } else if (cmd.steps.length > 0) { prefilledDiskSteps = cmd.steps.map((step, i) => { const ts = cmd.stepTimestamps?.[i]; return { @@ -417,6 +487,8 @@ async function main(): Promise { awaitAfterEachYield: () => pauseGate.awaitAfterYield(), forkSourceThreadId: cmd.forkSourceThreadId, prefilledDiskSteps, + forkContinuation: cmd.forkContinuation, + replayTimestamps, storageRoot, }, io, diff --git a/packages/workflow-execute/src/index.ts b/packages/workflow-execute/src/index.ts index 9632d48..af078dc 100644 --- a/packages/workflow-execute/src/index.ts +++ b/packages/workflow-execute/src/index.ts @@ -1,25 +1,39 @@ export { createWorkflow } from "./engine/create-workflow.js"; export { executeThread } from "./engine/engine.js"; export { - buildForkPlan, - parseThreadDataJsonl, - selectForkHistoricalSteps, - tryParseRoleStepRecord, + FORK_BRANCH_ROLE, + prepareCasFork, tryParseWorkflowResultRecord, + walkStateFramesNewestFirst, } from "./engine/fork-thread.js"; export { garbageCollectCas } from "./engine/gc.js"; export { createThreadPauseGate } from "./engine/thread-pause-gate.js"; export type { + ThreadHistoryEntry, + ThreadIndex, + ThreadIndexEntry, +} from "./engine/threads-index.js"; +export { + appendThreadHistoryEntry, + getBundleDir, + readThreadsIndex, + removeThreadEntry, + removeThreadHistoryEntries, + upsertThreadEntry, + writeThreadsIndex, +} from "./engine/threads-index.js"; +export type { + CasForkPlan, + ChainState, ExecuteThreadIo, ExecuteThreadOptions, - ForkHistoricalStep, - ForkPlan, + ForkContinuationOptions, GcResult, - ParsedThreadStartRecord, PrefilledDiskStep, SupervisorDecision, ThreadPauseGate, } from "./engine/types.js"; +export { EMPTY_CHAIN_STATE } from "./engine/types.js"; export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js"; export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js"; export { diff --git a/packages/workflow-execute/src/workflow-as-agent.ts b/packages/workflow-execute/src/workflow-as-agent.ts index c6a6e33..c0e6aa7 100644 --- a/packages/workflow-execute/src/workflow-as-agent.ts +++ b/packages/workflow-execute/src/workflow-as-agent.ts @@ -101,6 +101,8 @@ export function workflowAsAgent( awaitAfterEachYield: async () => {}, forkSourceThreadId: ctx.threadId, prefilledDiskSteps: null, + forkContinuation: null, + replayTimestamps: null, storageRoot, }, io,