Compare commits

...

2 Commits

Author SHA1 Message Date
xiaoju f3aedf8d6c feat: Phase 4 — CAS-based fork + mark-and-sweep GC
- Rewrite fork to create StateNode pointing to fork point (zero duplication)
- Rewrite GC as mark-and-sweep: roots from threads.json + history, findReachableHashes via refs[]
- Remove .data.jsonl code paths
- Fix all 7 previously failing CLI tests
- New: gc-mark-sweep.test.ts verifying shared nodes survive GC
- All 166 tests pass

Refs #155, closes #159

小橘 <xiaoju@shazhou.work>
2026-05-09 08:12:49 +00:00
xiaoju 26cf51366f feat: Phase 3 — engine read path + runtime context builder
- Add buildThreadContext(headHash, cas) to workflow-runtime
- Expand extract phase to return { meta, contentPayload, refs[] }
- Add parseCasThreadNode() to workflow-cas for node type parsing
- Update createWorkflow to write ContentMerkleNode with artifact refs
- Tests: 4 pass (build-context + extract-refs)
- Biome format pass on all files

Refs #155, closes #158

小橘 <xiaoju@shazhou.work>
2026-05-09 08:00:24 +00:00
66 changed files with 2422 additions and 1304 deletions
@@ -2,10 +2,9 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow-cas";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/commands/cas/index.js";
import {
cmdAdd,
@@ -1,12 +1,16 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdFork, cmdRun } from "../src/commands/thread/index.js";
import { cmdAdd } from "../src/commands/workflow/index.js";
import { pathExists } from "../src/fs-utils.js";
import { resolveThreadRecord } from "../src/thread-scan.js";
import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
@@ -41,27 +45,6 @@ export const run = async function* (input, options) {
};
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(dataPath: string, minLines: number): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
if (!(await pathExists(runningPath))) {
@@ -71,6 +54,41 @@ async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
}
}
async function waitUntilThreadCompletes(storageRoot: string, threadId: string): Promise<void> {
for (let attempt = 0; attempt < 120; attempt++) {
const row = await resolveThreadRecord(storageRoot, threadId);
if (row?.source === "history") {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function listMeaningfulRoleContents(
storageRoot: string,
threadId: string,
): Promise<Array<{ role: string; content: string }>> {
const row = await resolveThreadRecord(storageRoot, threadId);
if (row === null) {
return [];
}
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, row.head);
const chronological = [...frames].reverse();
const out: Array<{ role: string; content: string }> = [];
for (const fr of chronological) {
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
const content = await getContentMerklePayload(cas, fr.payload.content);
out.push({
role: fr.payload.role,
content: content ?? "",
});
}
return out;
}
describe("cli fork", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -110,10 +128,12 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilThreadCompletes(storageRoot, sourceId);
const histBefore = await resolveThreadRecord(storageRoot, sourceId);
expect(histBefore?.source).toBe("history");
const forked = await cmdFork(storageRoot, sourceId, "planner");
expect(forked.ok).toBe(true);
@@ -121,25 +141,18 @@ describe("cli fork", () => {
return;
}
const newId = forked.value.threadId;
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 5);
await waitUntilThreadCompletes(storageRoot, newId);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(5);
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(start.threadId).toBe(newId);
expect(start.forkFrom).toEqual({ threadId: sourceId });
const forkHist = await resolveThreadRecord(storageRoot, newId);
expect(forkHist?.source).toBe("history");
expect(forkHist?.start).toBe(histBefore?.start);
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1");
const steps = await listMeaningfulRoleContents(storageRoot, newId);
const tail = steps[steps.length - 1];
expect(tail?.role).toBe("reviewer");
expect(tail?.content).toBe("rev-1");
});
test("fork without --from-role retries last role", async () => {
@@ -161,10 +174,8 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${sourceId}.running`));
await waitUntilThreadCompletes(storageRoot, sourceId);
const forked = await cmdFork(storageRoot, sourceId, null);
expect(forked.ok).toBe(true);
@@ -172,26 +183,17 @@ describe("cli fork", () => {
return;
}
const newId = forked.value.threadId;
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 5);
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${newId}.running`));
await waitUntilThreadCompletes(storageRoot, newId);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(5);
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(replayCoder.role).toBe("coder");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2");
const steps = await listMeaningfulRoleContents(storageRoot, newId);
expect(steps.length).toBeGreaterThanOrEqual(3);
const coderReplay = steps[steps.length - 2];
expect(coderReplay?.role).toBe("coder");
expect(coderReplay?.content).toBe("c1");
const tail = steps[steps.length - 1];
expect(tail?.role).toBe("reviewer");
expect(tail?.content).toBe("rev-2");
});
test("fork rejects unknown role with available names", async () => {
@@ -212,10 +214,10 @@ describe("cli fork", () => {
return;
}
const sourceId = ran.value.threadId;
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 5);
await waitUntilRunningAbsent(
join(storageRoot, "logs", added.value.hash, `${sourceId}.running`),
);
await waitUntilThreadCompletes(storageRoot, sourceId);
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
expect(bad.ok).toBe(false);
+60 -63
View File
@@ -1,45 +1,17 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { createCasStore, putStartNode } from "@uncaged/workflow-cas";
import { garbageCollectCas, getBundleDir, upsertThreadEntry } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { cmdThreadRemove } from "../src/commands/thread/index.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
async function writeDemoDataJsonl(params: {
path: string;
threadId: string;
bundleHash: string;
cas: ReturnType<typeof createCasStore>;
activeHash: string;
}): Promise<void> {
const bodyHash = await putContentMerkleNode(params.cas, "p");
const text = [
JSON.stringify({
name: "demo",
hash: params.bundleHash,
threadId: params.threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
contentHash: bodyHash,
meta: {},
refs: [params.activeHash, bodyHash],
timestamp: 101,
}),
"",
].join("\n");
await writeFile(params.path, text, "utf8");
}
describe("gc cli and garbageCollectCas", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -59,22 +31,30 @@ describe("gc cli and garbageCollectCas", () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
test("garbageCollectCas keeps CAS entries reachable from threads.json roots", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01AAA1111111111111111111";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
activeHash,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const gc = await garbageCollectCas(storageRoot);
@@ -82,12 +62,12 @@ describe("gc cli and garbageCollectCas", () => {
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(2);
expect(gc.value.scannedThreads).toBe(2);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${startHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
@@ -110,19 +90,27 @@ describe("gc cli and garbageCollectCas", () => {
test("cli gc prints stats", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01BBB2222222222222222222";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("keep-me");
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await cas.put("drop-me");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
@@ -131,23 +119,32 @@ describe("gc cli and garbageCollectCas", () => {
encoding: "utf8",
});
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries");
expect(String(proc.stdout).trim()).toBe("scanned 2 threads, 2 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01CCC3333333333333333333";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
const promptHash = await cas.put("prompt-text");
const startHash = await putStartNode(
cas,
activeHash,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
await upsertThreadEntry(bundleDir, threadId, {
head: startHash,
start: startHash,
updatedAt: 100,
});
const orphanHash = await cas.put("orphan-after-rm");
@@ -157,6 +154,6 @@ describe("gc cli and garbageCollectCas", () => {
expect(removed.ok).toBe(true);
expect(await pathExists(orphanPath)).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(false);
});
});
+2 -241
View File
@@ -1,13 +1,10 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawn, spawnSync } from "node:child_process";
import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { spawnSync } from "node:child_process";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import {
formatLiveDebugLine,
formatLiveTimeLabel,
@@ -18,11 +15,6 @@ import {
import { parseLiveArgv } from "../src/live-argv.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */
const LIVE_FIXTURE_PLANNER_BODY =
"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11";
describe("live helpers", () => {
test("formatLiveTimeLabel pads HH:MM:SS", () => {
@@ -86,28 +78,6 @@ describe("live CLI", () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true });
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
);
const cas = createCasStore(getGlobalCasDir(storageRoot));
await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY);
await putContentMerkleNode(cas, "patch");
await putContentMerkleNode(cas, "still running");
});
afterEach(async () => {
@@ -119,170 +89,6 @@ describe("live CLI", () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("prints role steps and summary for a completed thread", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("coder");
expect(stdout).toContain("meta:");
expect(stdout).toContain('"phase":"plan"');
expect(stdout).toContain("LINE10");
expect(stdout).not.toContain("LINE11");
expect(stdout).toContain("more line");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("fixture completed");
});
test("--latest tails the newest thread by start timestamp", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("fixture completed");
expect(stdout).not.toContain("older thread");
});
test("--debug prints .info.jsonl records after data output", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("bundle loaded");
expect(stdout).toContain("[DEBUGTAG2]");
expect(stdout).toContain("multi line");
});
test("--role filters out non-matching roles", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("completed: returnCode=0");
});
test("--latest --debug --role combine", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "--latest", "--debug", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("fixture completed");
});
test("unknown thread id exits 1", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
@@ -292,51 +98,6 @@ describe("live CLI", () => {
expect(r.status).toBe(1);
expect(String(r.stderr ?? "")).toContain("thread not found");
});
test("follows file until WorkflowResult is appended", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const dataPath = join(
storageRoot,
"logs",
"C9NMV6V2TQT81",
"01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl",
);
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
await new Promise((r) => setTimeout(r, 120));
const prior = await readFile(dataPath, "utf8");
await writeFile(
dataPath,
`${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`,
"utf8",
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("caught up");
});
});
describe("live --latest with empty storage", () => {
@@ -1,9 +1,10 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { getBundleDir, readThreadsIndex } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdCasPut } from "../src/commands/cas/index.js";
import {
@@ -18,6 +19,7 @@ import {
} from "../src/commands/thread/index.js";
import { cmdAdd } from "../src/commands/workflow/index.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { resolveThreadRecord } from "../src/thread-scan.js";
import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
@@ -101,34 +103,21 @@ export const run = async function* (_input, options) {
};
`;
async function countDataJsonlLines(dataPath: string): Promise<number> {
try {
const text = await readFile(dataPath, "utf8");
return text
.trim()
.split("\n")
.filter((l) => l !== "").length;
} catch {
return 0;
}
}
async function waitUntilMinDataLines(
dataPath: string,
minLines: number,
maxAttempts: number,
): Promise<void> {
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if ((await countDataJsonlLines(dataPath)) >= minLines) {
if (!(await pathExists(runningPath))) {
return;
}
await new Promise((r) => setTimeout(r, 25));
}
}
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
async function waitUntilPredicate(
predicate: () => Promise<boolean>,
maxAttempts: number,
): Promise<void> {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (!(await pathExists(runningPath))) {
if (await predicate()) {
return;
}
await new Promise((r) => setTimeout(r, 25));
@@ -200,8 +189,7 @@ describe("cli thread commands", () => {
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
expect(await pathExists(dataPath)).toBe(false);
expect(await resolveThreadRecord(storageRoot, threadId)).toBeNull();
});
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
@@ -234,9 +222,9 @@ describe("cli thread commands", () => {
threads = await cmdThreads(storageRoot, []);
}
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 120);
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
const put = await cmdCasPut(storageRoot, "keep-after-thread-rm");
expect(put.ok).toBe(true);
@@ -323,24 +311,20 @@ describe("cli thread commands", () => {
const killed = await cmdKill(storageRoot, threadId);
expect(killed.ok).toBe(true);
await new Promise((r) => setTimeout(r, 900));
await waitUntilPredicate(async () => {
return (await resolveThreadRecord(storageRoot, threadId))?.source === "history";
}, 120);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const text = await readFile(dataPath, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
});
test("pause stops between yields and resume completes thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const srcDir = join(storageRoot, "src");
await mkdir(srcDir, { recursive: true });
const bundlePath = join(srcDir, "demo.esm.js");
await writeFile(bundlePath, pauseResumeBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
@@ -356,24 +340,33 @@ describe("cli thread commands", () => {
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const bundleDir = getBundleDir(storageRoot, added.value.hash);
await waitUntilMinDataLines(dataPath, 2, 80);
expect(await countDataJsonlLines(dataPath)).toBe(2);
await waitUntilPredicate(async () => {
const idx = await readThreadsIndex(bundleDir);
const ent = idx[threadId];
return ent !== undefined && ent.head !== ent.start;
}, 80);
const idxBeforePause = await readThreadsIndex(bundleDir);
const headAtPause = idxBeforePause[threadId]?.head;
const paused = await cmdPause(storageRoot, threadId);
expect(paused.ok).toBe(true);
await new Promise((r) => setTimeout(r, 400));
expect(await countDataJsonlLines(dataPath)).toBe(2);
const idxPaused = await readThreadsIndex(bundleDir);
expect(idxPaused[threadId]?.head).toBe(headAtPause);
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(true);
await waitUntilMinDataLines(dataPath, 4, 120);
expect(await countDataJsonlLines(dataPath)).toBe(4);
await waitUntilPredicate(async () => {
const row = await resolveThreadRecord(storageRoot, threadId);
return row?.source === "history";
}, 120);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
});
@@ -397,8 +390,7 @@ describe("cli thread commands", () => {
}
const threadId = ran.value.threadId;
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
expect(await pathExists(runningPath)).toBe(false);
+1 -1
View File
@@ -1,5 +1,5 @@
import type { Result } from "@uncaged/workflow-protocol";
import { type GcResult, garbageCollectCas } from "@uncaged/workflow-execute";
import type { Result } from "@uncaged/workflow-protocol";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasGet(
storageRoot: string,
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasList(storageRoot: string): Promise<Result<string[], string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasPut(
storageRoot: string,
+1 -1
View File
@@ -1,6 +1,6 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasRm(storageRoot: string, hash: string): Promise<Result<void, string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -1,6 +1,6 @@
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
export function createCasRoutes(storageRoot: string): Hono {
@@ -1,9 +1,18 @@
import { statSync, watch } from "node:fs";
import { dirname, join } from "node:path";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import {
FORK_BRANCH_ROLE,
readThreadsIndex,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
type PumpState = {
contentOffset: number;
@@ -21,7 +30,6 @@ function fileSize(path: string): number {
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
const size = fileSize(path);
if (size < state.contentOffset) {
// File was truncated — reset
state.contentOffset = 0;
state.carry = "";
}
@@ -42,15 +50,6 @@ function parseJsonLine(line: string): unknown {
}
}
function isWorkflowResult(record: unknown): boolean {
return (
record !== null &&
typeof record === "object" &&
"type" in (record as Record<string, unknown>) &&
(record as Record<string, unknown>).type === "workflow-result"
);
}
function parseNewLines(chunk: string, state: PumpState): string[] {
state.carry += chunk;
@@ -67,52 +66,192 @@ function parseNewLines(chunk: string, state: PumpState): string[] {
return lines;
}
type CasSseState = {
printedHashes: Set<string>;
lastHead: string | null;
completionEmitted: boolean;
};
type LiveSseStream = {
writeSSE: (opts: { event: string; data: string; id: string }) => Promise<void>;
};
function completionFromEndMeta(meta: Record<string, unknown>): {
returnCode: number;
summary: string;
} | null {
const returnCode = meta.returnCode;
const summary = meta.summary;
if (typeof returnCode !== "number" || typeof summary !== "string") {
return null;
}
return { returnCode, summary };
}
async function emitRecordsForHead(params: {
storageRoot: string;
bundleDir: string;
threadId: string;
headHash: string;
sseState: CasSseState;
stream: LiveSseStream;
eventId: { n: number };
}): Promise<boolean> {
const cas = createCasStore(getGlobalCasDir(params.storageRoot));
const frames = await walkStateFramesNewestFirst(cas, params.headHash);
const chronological = [...frames].reverse();
for (const fr of chronological) {
if (params.sseState.printedHashes.has(fr.hash)) {
continue;
}
params.sseState.printedHashes.add(fr.hash);
const role = fr.payload.role;
if (role === FORK_BRANCH_ROLE) {
continue;
}
if (role === END) {
const wf = completionFromEndMeta(fr.payload.meta);
if (wf !== null) {
params.eventId.n++;
await params.stream.writeSSE({
event: "record",
data: JSON.stringify({ type: "workflow-result", ...wf }),
id: String(params.eventId.n),
});
return true;
}
continue;
}
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
const content =
payloadText !== null
? payloadText
: `(content not in CAS; contentHash=${fr.payload.content})`;
params.eventId.n++;
await params.stream.writeSSE({
event: "record",
data: JSON.stringify({
role: fr.payload.role,
contentHash: fr.payload.content,
content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
}),
id: String(params.eventId.n),
});
}
return false;
}
async function pumpThreadsJsonSse(params: {
storageRoot: string;
bundleDir: string;
threadId: string;
sseState: CasSseState;
stream: LiveSseStream;
eventId: { n: number };
}): Promise<boolean> {
let idx: ThreadIndex;
try {
idx = await readThreadsIndex(params.bundleDir);
} catch {
idx = {};
}
const active = idx[params.threadId];
if (active === undefined) {
if (params.sseState.completionEmitted) {
return false;
}
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
if (hist === null || hist.source !== "history") {
return false;
}
params.sseState.completionEmitted = true;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: hist.head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
const head = active.head;
if (params.sseState.lastHead === null) {
params.sseState.lastHead = head;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
if (head !== params.sseState.lastHead) {
params.sseState.lastHead = head;
return await emitRecordsForHead({
storageRoot: params.storageRoot,
bundleDir: params.bundleDir,
threadId: params.threadId,
headHash: head,
sseState: params.sseState,
stream: params.stream,
eventId: params.eventId,
});
}
return false;
}
export function createLiveRoutes(storageRoot: string): Hono {
const app = new Hono();
app.get("/:threadId/live", async (c) => {
const threadId = c.req.param("threadId");
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const resolvedDataPath = dataPath;
const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`);
const threadTarget = resolved;
const threadsJsonPath = join(threadTarget.bundleDir, "threads.json");
const infoPath = join(storageRoot, "logs", threadTarget.bundleHash, `${threadId}.info.jsonl`);
return streamSSE(c, async (stream) => {
const dataState: PumpState = { contentOffset: 0, carry: "" };
const infoState: PumpState = { contentOffset: 0, carry: "" };
let eventId = 0;
const sseThreadState: CasSseState = {
printedHashes: new Set<string>(),
lastHead: null,
completionEmitted: false,
};
const eventId = { n: 0 };
async function pumpData(): Promise<boolean> {
let chunk: string | null;
try {
chunk = await readNewBytes(resolvedDataPath, dataState);
} catch {
return false;
}
if (chunk === null) {
return false;
}
const lines = parseNewLines(chunk, dataState);
for (const line of lines) {
const record = parseJsonLine(line);
eventId++;
await stream.writeSSE({
event: "record",
data: JSON.stringify(record),
id: String(eventId),
});
if (isWorkflowResult(record)) {
return true;
}
}
return false;
const finished = await pumpThreadsJsonSse({
storageRoot,
bundleDir: threadTarget.bundleDir,
threadId,
sseState: sseThreadState,
stream,
eventId,
});
return finished;
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: SSE newline framing mirrors legacy pump
async function pumpInfo(): Promise<void> {
let chunk: string | null;
try {
@@ -134,28 +273,46 @@ export function createLiveRoutes(storageRoot: string): Hono {
) {
continue;
}
eventId++;
eventId.n++;
await stream.writeSSE({
event: "info",
data: JSON.stringify(record),
id: String(eventId),
id: String(eventId.n),
});
}
}
// Initial pump
eventId.n++;
await stream.writeSSE({
event: "record",
data: JSON.stringify({
type: "thread-start",
threadId: threadTarget.threadId,
bundleHash: threadTarget.bundleHash,
head: threadTarget.head,
start: threadTarget.start,
source: threadTarget.source,
}),
id: String(eventId.n),
});
const done = await pumpData();
await pumpInfo();
try {
await pumpInfo();
} catch {
// optional info file
}
if (done) {
return;
}
// Watch for changes
const controller = new AbortController();
let completed = false;
const dataWatcher = watch(resolvedDataPath, async () => {
if (completed) return;
const dataWatcher = watch(threadsJsonPath, async () => {
if (completed) {
return;
}
const finished = await pumpData();
if (finished) {
completed = true;
@@ -166,7 +323,9 @@ export function createLiveRoutes(storageRoot: string): Hono {
let infoWatcher: ReturnType<typeof watch> | null = null;
try {
infoWatcher = watch(infoPath, async () => {
if (completed) return;
if (completed) {
return;
}
await pumpInfo();
});
} catch {
@@ -179,7 +338,6 @@ export function createLiveRoutes(storageRoot: string): Hono {
infoWatcher?.close();
});
// Keep stream alive until completion or client disconnect
await new Promise<void>((resolve) => {
if (completed) {
resolve();
@@ -1,10 +1,13 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { Hono } from "hono";
import { readTextFileIfExists } from "../../fs-utils.js";
import {
listHistoricalThreads,
listRunningThreads,
resolveThreadDataPath,
resolveThreadRecord,
} from "../../thread-scan.js";
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
import { cmdRun } from "../thread/run.js";
@@ -25,22 +28,46 @@ export function createThreadRoutes(storageRoot: string): Hono {
app.get("/:threadId", async (c) => {
const threadId = c.req.param("threadId");
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return c.json({ error: `thread data missing: ${threadId}` }, 404);
}
const lines = text.trim().split("\n");
const records = lines.map((line) => {
try {
return JSON.parse(line) as unknown;
} catch {
return { raw: line };
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const records: unknown[] = [
{
type: "thread-start",
threadId: resolved.threadId,
bundleHash: resolved.bundleHash,
head: resolved.head,
start: resolved.start,
source: resolved.source,
},
];
for (const fr of chronological) {
if (fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
});
if (fr.payload.role === END) {
const returnCode = fr.payload.meta.returnCode;
const summary = fr.payload.meta.summary;
if (typeof returnCode === "number" && typeof summary === "string") {
records.push({ type: "workflow-result", returnCode, summary });
}
continue;
}
records.push({
role: fr.payload.role,
contentHash: fr.payload.content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
});
}
return c.json({ threadId, records });
});
@@ -1,11 +1,11 @@
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import { prepareCasFork } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { buildForkPlan } from "@uncaged/workflow-execute";
import { generateUlid, getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { pathExists } from "../../fs-utils.js";
import { resolveThreadRecord } from "../../thread-scan.js";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
export async function cmdFork(
@@ -13,49 +13,51 @@ export async function cmdFork(
threadId: string,
fromRole: string | null,
): Promise<Result<{ threadId: string }, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return err(`thread data missing: ${threadId}`);
const bundlePath = join(storageRoot, "bundles", `${resolved.bundleHash}.esm.js`);
if (!(await pathExists(bundlePath))) {
return err(`bundle file missing for thread hash ${resolved.bundleHash}`);
}
const plan = buildForkPlan(text, fromRole);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const newThreadId = generateUlid(Date.now());
const plan = await prepareCasFork({
cas,
bundleDir: resolved.bundleDir,
bundleHash: resolved.bundleHash,
sourceThreadId: threadId,
headHash: resolved.head,
startHash: resolved.start,
newThreadId,
fromRole,
});
if (!plan.ok) {
return plan;
}
const bundlePath = join(storageRoot, "bundles", `${plan.value.hash}.esm.js`);
if (!(await pathExists(bundlePath))) {
return err(`bundle file missing for thread hash ${plan.value.hash}`);
}
const worker = await ensureWorkerForHash(storageRoot, plan.value.hash, bundlePath);
if (!worker.ok) {
return worker;
}
const newThreadId = generateUlid(Date.now());
const stepsOnWire = plan.value.historicalSteps.map((s) => ({
role: s.role,
contentHash: s.contentHash,
meta: s.meta,
refs: s.refs,
timestamp: s.timestamp,
}));
const p = plan.value;
const sent = await sendWorkerTcpCommand(
worker.value.port,
{
type: "run",
threadId: newThreadId,
workflowName: plan.value.workflowName,
prompt: plan.value.prompt,
options: plan.value.runOptions,
steps: stepsOnWire,
forkSourceThreadId: plan.value.sourceThreadId,
workflowName: p.workflowName,
prompt: p.prompt,
options: p.runOptions,
steps: p.steps,
stepTimestamps: p.stepTimestamps.length > 0 ? p.stepTimestamps : null,
forkSourceThreadId: threadId,
forkContinuation: p.forkContinuation,
},
{ awaitResponseLine: false },
);
+186 -119
View File
@@ -1,17 +1,26 @@
import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { mkdir, readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { tryParseRoleStepRecord, tryParseWorkflowResultRecord } from "@uncaged/workflow-execute";
import {
FORK_BRANCH_ROLE,
readThreadsIndex,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { dimGreyLine, highlightLiveRole } from "../../cli-color.js";
import { printCliError, printCliLine } from "../../cli-output.js";
import { pathExists } from "../../fs-utils.js";
import type { ParsedLiveArgv } from "../../live-argv.js";
import { findLatestThreadDataPath, resolveThreadDataPath } from "../../thread-scan.js";
import {
findLatestThreadBundleTarget,
type LatestThreadTarget,
resolveThreadRecord,
} from "../../thread-scan.js";
import type { LiveRoleRow } from "./types.js";
export const LIVE_CONTENT_MAX_LINES = 10;
@@ -49,16 +58,15 @@ function printSummary(result: WorkflowCompletion): void {
printCliLine(`completed: returnCode=${result.returnCode}${result.summary}`);
}
type LiveSessionState = {
sawStart: boolean;
completed: boolean;
type InfoLiveState = {
carry: string;
contentOffset: number;
};
type InfoLiveState = {
carry: string;
contentOffset: number;
type CasLiveState = {
printedHashes: Set<string>;
lastHead: string | null;
completionEmitted: boolean;
};
function tryParseInfoRecord(obj: Record<string, unknown>): {
@@ -80,102 +88,140 @@ function tryParseInfoRecord(obj: Record<string, unknown>): {
return { tag, content, timestamp };
}
async function handleJsonlLine(
rawLine: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> {
const trimmed = rawLine.trim();
if (trimmed === "") {
return { parseError: null, workflowResult: null };
function completionFromEndMeta(meta: Record<string, unknown>): WorkflowCompletion | null {
const returnCode = meta.returnCode;
const summary = meta.summary;
if (typeof returnCode !== "number" || typeof summary !== "string") {
return null;
}
return { returnCode, summary };
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
return { parseError: "invalid JSON in thread data file", workflowResult: null };
async function emitRoleStepPrint(params: {
cas: CasStore;
role: string;
contentHash: string;
meta: Record<string, unknown>;
timestamp: number;
roleFilter: string | null;
}): Promise<void> {
if (params.roleFilter !== null && params.role !== params.roleFilter) {
return;
}
if (rec === null || typeof rec !== "object") {
return { parseError: "invalid record in thread data file", workflowResult: null };
}
const obj = rec as Record<string, unknown>;
if (!state.sawStart) {
state.sawStart = true;
return { parseError: null, workflowResult: null };
}
const wf = tryParseWorkflowResultRecord(obj);
if (wf !== null) {
state.completed = true;
return { parseError: null, workflowResult: wf };
}
const roleRow = tryParseRoleStepRecord(obj);
if (roleRow === null) {
return {
parseError: "unrecognized record in thread data (expected role step or result)",
workflowResult: null,
};
}
if (roleFilter !== null && roleRow.role !== roleFilter) {
return { parseError: null, workflowResult: null };
}
const payload = await getContentMerklePayload(cas, roleRow.contentHash);
const payload = await getContentMerklePayload(params.cas, params.contentHash);
const content =
payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`;
payload !== null ? payload : `(content not in CAS; contentHash=${params.contentHash})`;
const row: LiveRoleRow = {
role: roleRow.role,
role: params.role,
content,
meta: roleRow.meta,
timestamp: roleRow.timestamp,
meta: params.meta,
timestamp: params.timestamp,
};
for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) {
printCliLine(outLine);
}
return { parseError: null, workflowResult: null };
}
async function pumpNewContent(
dataPath: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<number | null> {
let text: string;
async function emitStatesReachableFromHead(params: {
cas: CasStore;
headHash: string;
state: CasLiveState;
roleFilter: string | null;
}): Promise<WorkflowCompletion | null> {
const frames = await walkStateFramesNewestFirst(params.cas, params.headHash);
const chronological = [...frames].reverse();
for (const fr of chronological) {
if (params.state.printedHashes.has(fr.hash)) {
continue;
}
params.state.printedHashes.add(fr.hash);
const role = fr.payload.role;
if (role === FORK_BRANCH_ROLE) {
continue;
}
if (role === END) {
const wf = completionFromEndMeta(fr.payload.meta);
if (wf !== null) {
printSummary(wf);
return wf;
}
continue;
}
await emitRoleStepPrint({
cas: params.cas,
role,
contentHash: fr.payload.content,
meta: fr.payload.meta,
timestamp: fr.payload.timestamp,
roleFilter: params.roleFilter,
});
}
return null;
}
async function pumpThreadsJson(params: {
storageRoot: string;
bundleDir: string;
bundleHash: string;
threadId: string;
state: CasLiveState;
roleFilter: string | null;
cas: CasStore;
}): Promise<number | null> {
let idx: ThreadIndex;
try {
text = await readFile(dataPath, "utf8");
idx = await readThreadsIndex(params.bundleDir);
} catch {
return null;
idx = {};
}
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
const active = idx[params.threadId];
if (active === undefined) {
if (params.state.completionEmitted) {
return null;
}
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
if (hist === null || hist.source !== "history") {
return null;
}
params.state.completionEmitted = true;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: hist.head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
state.carry += chunk;
const head = active.head;
if (params.state.lastHead === null) {
params.state.lastHead = head;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
const parts = state.carry.split("\n");
state.carry = parts.pop() ?? "";
for (const line of parts) {
const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas);
if (parseError !== null) {
printCliError(parseError);
return 1;
}
if (workflowResult !== null) {
printSummary(workflowResult);
return 0;
}
if (head !== params.state.lastHead) {
params.state.lastHead = head;
const wf = await emitStatesReachableFromHead({
cas: params.cas,
headHash: head,
state: params.state,
roleFilter: params.roleFilter,
});
return wf !== null ? 0 : null;
}
return null;
@@ -292,9 +338,9 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
schedulePump(path, pump);
});
watchers.push(watcher);
watcher.on("error", (err: Error) => {
watcher.on("error", (errObj: Error) => {
closeAll();
reject(err);
reject(errObj);
});
}
@@ -310,17 +356,14 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
});
}
type LiveThreadTarget = {
threadId: string;
dataPath: string;
};
type LiveThreadTarget = LatestThreadTarget;
async function resolveLiveThreadTarget(
storageRoot: string,
parsed: ParsedLiveArgv,
): Promise<LiveThreadTarget | null> {
if (parsed.latest) {
const found = await findLatestThreadDataPath(storageRoot);
const found = await findLatestThreadBundleTarget(storageRoot);
if (found === null) {
printCliError("live: no threads found");
return null;
@@ -333,36 +376,56 @@ async function resolveLiveThreadTarget(
printCliError("live: internal error: missing thread id");
return null;
}
const resolved = await resolveThreadDataPath(storageRoot, id);
const resolved = await resolveThreadRecord(storageRoot, id);
if (resolved === null) {
printCliError(`thread not found: ${id}`);
return null;
}
return { threadId: id, dataPath: resolved };
return {
threadId: id,
bundleHash: resolved.bundleHash,
bundleDir: resolved.bundleDir,
threadsJsonPath: join(resolved.bundleDir, "threads.json"),
};
}
async function buildLiveWatchTasks(params: {
dataPath: string;
infoPath: string;
storageRoot: string;
target: LiveThreadTarget;
debug: boolean;
dataState: LiveSessionState;
dataState: CasLiveState;
infoState: InfoLiveState;
roleFilter: string | null;
cas: CasStore;
}): Promise<WatchPumpTask[]> {
const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params;
const infoPath = join(
params.storageRoot,
"logs",
params.target.bundleHash,
`${params.target.threadId}.info.jsonl`,
);
const tasks: WatchPumpTask[] = [
{
path: dataPath,
pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas),
path: params.target.threadsJsonPath,
pump: () =>
pumpThreadsJson({
storageRoot: params.storageRoot,
bundleDir: params.target.bundleDir,
bundleHash: params.target.bundleHash,
threadId: params.target.threadId,
state: params.dataState,
roleFilter: params.roleFilter,
cas: params.cas,
}),
},
];
if (debug && (await pathExists(infoPath))) {
if (params.debug && (await pathExists(infoPath))) {
tasks.push({
path: infoPath,
pump: async () => {
await pumpNewInfoContent(infoPath, infoState);
await pumpNewInfoContent(infoPath, params.infoState);
return null;
},
});
@@ -377,16 +440,13 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
return 1;
}
const { threadId, dataPath } = target;
const roleFilter = parsed.role;
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const dataState: LiveSessionState = {
sawStart: false,
completed: false,
carry: "",
contentOffset: 0,
const dataState: CasLiveState = {
printedHashes: new Set<string>(),
lastHead: null,
completionEmitted: false,
};
const infoState: InfoLiveState = {
@@ -401,22 +461,29 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
process.on("SIGINT", onSigInt);
try {
const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas);
if (firstData === 1) {
return 1;
}
await mkdir(dirname(target.threadsJsonPath), { recursive: true });
const firstData = await pumpThreadsJson({
storageRoot,
bundleDir: target.bundleDir,
bundleHash: target.bundleHash,
threadId: target.threadId,
state: dataState,
roleFilter,
cas,
});
const infoPath = join(storageRoot, "logs", target.bundleHash, `${target.threadId}.info.jsonl`);
if (parsed.debug && (await pathExists(infoPath))) {
await pumpNewInfoContent(infoPath, infoState);
}
if (firstData === 0 || dataState.completed) {
if (firstData === 0) {
return 0;
}
const tasks = await buildLiveWatchTasks({
dataPath,
infoPath,
storageRoot,
target,
debug: parsed.debug,
dataState,
infoState,
+20 -10
View File
@@ -1,25 +1,35 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { join } from "node:path";
import {
garbageCollectCas,
removeThreadEntry,
removeThreadHistoryEntries,
} from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
export async function cmdThreadRemove(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
if (resolved.source === "active") {
await removeThreadEntry(resolved.bundleDir, threadId);
} else {
const hist = await removeThreadHistoryEntries(resolved.bundleDir, threadId);
if (!hist.ok) {
return hist;
}
}
const infoPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.info.jsonl`);
const runningPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.running`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
@@ -1,8 +1,8 @@
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { generateUlid } from "@uncaged/workflow-util";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,19 +1,44 @@
import { createCasStore } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
import { resolveThreadRecord } from "../../thread-scan.js";
export async function cmdThreadShow(
storageRoot: string,
threadId: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved === null) {
return err(`thread not found: ${threadId}`);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return err(`thread data missing: ${threadId}`);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const steps: Array<{ role: string; hash: string; timestamp: number }> = [];
for (const fr of chronological) {
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
continue;
}
steps.push({
role: fr.payload.role,
hash: fr.hash,
timestamp: fr.payload.timestamp,
});
}
return ok(text.endsWith("\n") ? text.slice(0, -1) : text);
const payload = {
threadId: resolved.threadId,
bundleHash: resolved.bundleHash,
head: resolved.head,
start: resolved.start,
source: resolved.source,
steps,
};
return ok(JSON.stringify(payload, null, 2));
}
@@ -1,8 +1,7 @@
import { readFile, stat } from "node:fs/promises";
import { basename, resolve } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { hashWorkflowBundleBytes } from "@uncaged/workflow-cas";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
extractBundleExports,
readWorkflowRegistry,
+264 -95
View File
@@ -1,23 +1,87 @@
import { readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
import {
readThreadsIndex,
type ThreadHistoryEntry,
type ThreadIndex,
} from "@uncaged/workflow-execute";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
function parseFirstJsonLineObject(text: string): Record<string, unknown> | null {
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
async function readWorkflowNameFromStartHash(
storageRoot: string,
startHash: string,
): Promise<string | null> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const yamlText = await cas.get(startHash);
if (yamlText === null) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
return parsed.node.payload.name;
}
async function listBundleHashDirs(storageRoot: string): Promise<string[]> {
const bundlesRoot = join(storageRoot, "bundles");
if (!(await pathExists(bundlesRoot))) {
return [];
}
return parsed as Record<string, unknown>;
const names = await readdir(bundlesRoot);
const out: string[] = [];
for (const name of names) {
const p = join(bundlesRoot, name);
try {
const st = await stat(p);
if (st.isDirectory()) {
out.push(name);
}
} catch {}
}
out.sort();
return out;
}
async function parseHistoryFile(path: string): Promise<ThreadHistoryEntry[]> {
const text = await readTextFileIfExists(path);
if (text === null) {
return [];
}
const out: ThreadHistoryEntry[] = [];
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
} catch {
continue;
}
if (raw === null || typeof raw !== "object") {
continue;
}
const rec = raw as Record<string, unknown>;
const threadId = rec.threadId;
const head = rec.head;
const start = rec.start;
const completedAt = rec.completedAt;
if (
typeof threadId !== "string" ||
typeof head !== "string" ||
typeof start !== "string" ||
typeof completedAt !== "number"
) {
continue;
}
out.push({ threadId, head, start, completedAt });
}
return out;
}
export type RunningThreadRow = {
@@ -32,30 +96,76 @@ export type HistoricalThreadRow = {
workflowName: string | null;
};
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
}
const parsed = parseFirstJsonLineObject(text);
if (parsed === null) {
return null;
}
const ts = parsed.timestamp;
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
}
export type ResolvedThreadRecord = {
threadId: string;
bundleHash: string;
bundleDir: string;
head: string;
start: string;
source: "active" | "history";
};
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
/** Resolve a thread via `threads.json` (active) or `history/*.jsonl` (completed). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: scans all bundle dirs for thread id
export async function resolveThreadRecord(
storageRoot: string,
threadId: string,
): Promise<ResolvedThreadRecord | null> {
const hashes = await listBundleHashDirs(storageRoot);
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
const active = index[threadId];
if (active !== undefined) {
return {
threadId,
bundleHash,
bundleDir,
head: active.head,
start: active.start,
source: "active",
};
}
}
const parsed = parseFirstJsonLineObject(text);
if (parsed === null) {
return null;
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
if (e.threadId === threadId) {
return {
threadId,
bundleHash,
bundleDir,
head: e.head,
start: e.start,
source: "history",
};
}
}
}
}
const name = parsed.name;
return typeof name === "string" ? name : null;
return null;
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
@@ -82,8 +192,9 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
continue;
}
const threadId = fileName.slice(0, -".running".length);
const dataPath = join(dir, `${threadId}.data.jsonl`);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
const resolved = await resolveThreadRecord(storageRoot, threadId);
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
}
}
@@ -98,41 +209,70 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
}
/**
* Historical threads discovered via `*.data.jsonl`.
* When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned.
* Threads discovered via `threads.json` (active) and `history/*.jsonl` (completed).
* When `workflowNameFilter` is non-null, only threads whose StartNode `name` matches are returned.
*/
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: merges active index + partitioned history
export async function listHistoricalThreads(
storageRoot: string,
workflowNameFilter: string | null,
): Promise<HistoricalThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const hashes = await listBundleHashDirs(storageRoot);
const seen = new Set<string>();
const out: HistoricalThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
entries = await readdir(dir);
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".data.jsonl")) {
for (const threadId of Object.keys(index)) {
const key = `${bundleHash}/${threadId}`;
if (seen.has(key)) {
continue;
}
const threadId = fileName.slice(0, -".data.jsonl".length);
const dataPath = join(dir, fileName);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
seen.add(key);
const entry = index[threadId];
if (entry === undefined) {
continue;
}
const workflowName = await readWorkflowNameFromStartHash(storageRoot, entry.start);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({ threadId, hash, workflowName });
out.push({ threadId, hash: bundleHash, workflowName });
}
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
const key = `${bundleHash}/${e.threadId}`;
if (seen.has(key)) {
continue;
}
seen.add(key);
const workflowName = await readWorkflowNameFromStartHash(storageRoot, e.start);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({ threadId: e.threadId, hash: bundleHash, workflowName });
}
}
}
@@ -145,64 +285,93 @@ export async function listHistoricalThreads(
return out;
}
export type LatestThreadTarget = {
threadId: string;
bundleHash: string;
bundleDir: string;
threadsJsonPath: string;
};
/**
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
* falling back to file `mtime` when the timestamp is missing.
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
* Picks the newest thread by StartNode timestamp approximation (`updatedAt` active,
* else `completedAt` history), falling back to lexical thread id order.
*/
export async function findLatestThreadDataPath(
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: compares active heads vs history tails
export async function findLatestThreadBundleTarget(
storageRoot: string,
): Promise<{ threadId: string; dataPath: string } | null> {
const threads = await listHistoricalThreads(storageRoot, null);
if (threads.length === 0) {
return null;
}
): Promise<LatestThreadTarget | null> {
const hashes = await listBundleHashDirs(storageRoot);
let best: {
threadId: string;
dataPath: string;
primary: number;
secondary: number;
bundleHash: string;
bundleDir: string;
ts: number;
} | null = null;
for (const t of threads) {
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
let mtimeMs = 0;
for (const bundleHash of hashes) {
const bundleDir = join(storageRoot, "bundles", bundleHash);
let index: ThreadIndex;
try {
const st = await stat(dataPath);
mtimeMs = st.mtimeMs;
index = await readThreadsIndex(bundleDir);
} catch {
continue;
}
const startTs = await readThreadStartTimestampMs(dataPath);
const primary = startTs !== null ? startTs : mtimeMs;
const secondary = mtimeMs;
if (
best === null ||
primary > best.primary ||
(primary === best.primary && secondary > best.secondary)
) {
best = { threadId: t.threadId, dataPath, primary, secondary };
for (const threadId of Object.keys(index)) {
const ent = index[threadId];
if (ent === undefined) {
continue;
}
const ts = ent.updatedAt;
const cand = { threadId, bundleHash, bundleDir, ts };
if (
best === null ||
cand.ts > best.ts ||
(cand.ts === best.ts &&
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
) {
best = cand;
}
}
const histDir = join(bundleDir, "history");
if (!(await pathExists(histDir))) {
continue;
}
let files: string[];
try {
files = await readdir(histDir);
} catch {
continue;
}
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const entries = await parseHistoryFile(join(histDir, name));
for (const e of entries) {
const ts = e.completedAt;
const cand = { threadId: e.threadId, bundleHash, bundleDir, ts };
if (
best === null ||
cand.ts > best.ts ||
(cand.ts === best.ts &&
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
) {
best = cand;
}
}
}
}
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
}
export async function resolveThreadDataPath(
storageRoot: string,
threadId: string,
): Promise<string | null> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
if (best === null) {
return null;
}
const hashes = await readdir(logsRoot);
for (const hash of hashes) {
const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`);
if (await pathExists(candidate)) {
return candidate;
}
}
return null;
return {
threadId: best.threadId,
bundleHash: best.bundleHash,
bundleDir: best.bundleDir,
threadsJsonPath: join(best.bundleDir, "threads.json"),
};
}
+1 -2
View File
@@ -2,9 +2,8 @@ import { type ChildProcess, spawn } from "node:child_process";
import { mkdir, readdir, unlink, writeFile } from "node:fs/promises";
import { createConnection } from "node:net";
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
@@ -7,7 +7,11 @@ const testExtract: ExtractFn = async <T extends Record<string, unknown>>(
_schema: z.ZodType<T>,
_prompt: string,
_ctx: ExtractContext,
): Promise<T> => ({ workspace: "/tmp" }) as unknown as T;
): Promise<{ meta: T; contentPayload: string; refs: string[] }> => ({
meta: { workspace: "/tmp" } as unknown as T,
contentPayload: "",
refs: [],
});
describe("validateCursorAgentConfig", () => {
test("accepts valid config", () => {
+2 -1
View File
@@ -48,11 +48,12 @@ export function createCursorAgent(config: CursorAgentConfig): AgentFn {
...ctx,
agentContent: "",
};
const { workspace } = await config.extract(
const extracted = await config.extract(
cursorWorkspaceSchema,
"From the thread context, determine the absolute filesystem path where the project/repository is located.",
extractCtx,
);
const { workspace } = extracted.meta;
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"-p",
+1 -3
View File
@@ -1,8 +1,6 @@
import { Buffer } from "node:buffer";
import XXH from "xxhashjs";
import { encodeUint64AsCrockford } from "@uncaged/workflow-util";
import XXH from "xxhashjs";
function digestToUint64(digest: { toString(radix?: number): string }): bigint {
const hex = digest.toString(16).padStart(16, "0");
+2
View File
@@ -10,8 +10,10 @@ export {
putThreadMerkleNode,
serializeMerkleNode,
} from "./merkle.js";
export type { ParsedCasThreadNode } from "./nodes.js";
export {
isCasNodeYaml,
parseCasThreadNode,
putContentNodeWithRefs,
putStartNode,
putStateNode,
+100 -1
View File
@@ -1,9 +1,108 @@
import type { ContentMerkleNode, StartNode, StateNode } from "@uncaged/workflow-protocol";
import type {
ContentMerkleNode,
StartNode,
StartNodePayload,
StateNode,
StateNodePayload,
} from "@uncaged/workflow-protocol";
import { parse, stringify } from "yaml";
import { collectRefs } from "./collect-refs.js";
import type { CasStore } from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isStartPayload(value: unknown): value is StartNodePayload {
if (!isRecord(value)) {
return false;
}
return (
typeof value.name === "string" &&
typeof value.hash === "string" &&
typeof value.maxRounds === "number" &&
typeof value.depth === "number"
);
}
function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(value)) {
return false;
}
const compact = value.compact;
if (!(compact === null || typeof compact === "string")) {
return false;
}
const ancestors = value.ancestors;
if (!Array.isArray(ancestors) || !ancestors.every((h) => typeof h === "string")) {
return false;
}
const meta = value.meta;
if (!isRecord(meta)) {
return false;
}
return (
typeof value.role === "string" &&
typeof value.start === "string" &&
typeof value.content === "string" &&
typeof value.timestamp === "number"
);
}
/** Parses a YAML CAS blob into a typed RFC v3 thread node (or legacy content layout with `children`). */
export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null {
let raw: unknown;
try {
raw = parse(yamlText) as unknown;
} catch {
return null;
}
if (!isRecord(raw)) {
return null;
}
const type = raw.type;
if (type !== "start" && type !== "state" && type !== "content") {
return null;
}
let refsRaw: unknown = raw.refs;
if (refsRaw === undefined && type === "content") {
refsRaw = raw.children;
}
if (!Array.isArray(refsRaw) || !refsRaw.every((r) => typeof r === "string")) {
return null;
}
const refs = refsRaw as string[];
if (type === "content") {
if (typeof raw.payload !== "string") {
return null;
}
const node: ContentMerkleNode = { type: "content", payload: raw.payload, refs: [...refs] };
return { kind: "content", node };
}
if (type === "start") {
if (!isStartPayload(raw.payload)) {
return null;
}
const node: StartNode = { type: "start", payload: raw.payload, refs: [...refs] };
return { kind: "start", node };
}
if (!isStatePayload(raw.payload)) {
return null;
}
const node: StateNode = { type: "state", payload: raw.payload, refs: [...refs] };
return { kind: "state", node };
}
export type ParsedCasThreadNode =
| { kind: "start"; node: StartNode }
| { kind: "state"; node: StateNode }
| { kind: "content"; node: ContentMerkleNode };
/** YAML-serialize a CAS node carrying `{type, payload, refs}` (RFC v3 thread storage format). */
export function serializeCasNode(node: StartNode | StateNode | ContentMerkleNode): string {
return stringify({ type: node.type, payload: node.payload, refs: node.refs }, { indent: 2 });
@@ -40,6 +40,8 @@ function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOpt
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot: "/tmp/never",
...overrides,
};
@@ -0,0 +1,72 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import { type ExtractContext, START } from "@uncaged/workflow-runtime";
import * as z from "zod/v4";
import { createExtract } from "../src/extract/extract-fn.js";
function installPlainJsonExtractMock(meta: Record<string, unknown>): () => void {
const origFetch = globalThis.fetch;
const mockFetch = async (): Promise<Response> =>
new Response(
JSON.stringify({
choices: [{ message: { content: JSON.stringify(meta) } }],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
describe("createExtract — ExtractResult shape", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("returns meta, contentPayload, and refs[]", async () => {
restoreFetch = installPlainJsonExtractMock({ confidence: 0.9 });
const dir = await mkdtemp(join(tmpdir(), "wf-extract-refs-"));
try {
const cas = createCasStore(join(dir, "cas"));
const extract = createExtract(
{ baseUrl: "http://127.0.0.1:9", apiKey: "key", model: "m" },
{ cas },
);
const schema = z.object({ confidence: z.number() });
const ctx: ExtractContext = {
threadId: "01THREADTESTAAAAAAAAAAAAAA",
depth: 0,
start: {
role: START,
content: "task text",
meta: { maxRounds: 10 },
timestamp: 100,
},
steps: [],
currentRole: { name: "analyst", systemPrompt: "be precise" },
agentContent: "model says hello",
};
const out = await extract(schema, "extract fields", ctx);
expect(out.meta).toEqual({ confidence: 0.9 });
expect(out.contentPayload).toBe("model says hello");
expect(Array.isArray(out.refs)).toBe(true);
expect(out.refs).toEqual([]);
} finally {
await rm(dir, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,112 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import { FORK_BRANCH_ROLE } from "../src/engine/fork-thread.js";
import { garbageCollectCas } from "../src/engine/gc.js";
import { getBundleDir, removeThreadEntry, upsertThreadEntry } from "../src/engine/threads-index.js";
describe("garbageCollectCas (mark-and-sweep)", () => {
let storageRoot: string;
let casDir: string;
beforeEach(async () => {
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-ms-"));
casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
await writeFile(
join(storageRoot, "workflow.yaml"),
"config:\n maxDepth: 1\n supervisorInterval: 0\n providers: {}\n models: {}\nworkflows: {}\n",
"utf8",
);
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("shared CAS prefix survives when one fork thread index entry is removed", async () => {
const bundleHash = "TESTGC0000001";
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(casDir);
const promptHash = await cas.put("prompt");
const startHash = await putStartNode(
cas,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
const c1 = await putContentNodeWithRefs(cas, "p1", []);
const h1 = await putStateNode(cas, {
role: "planner",
meta: {},
start: startHash,
content: c1,
ancestors: [],
compact: null,
timestamp: 1,
} satisfies StateNodePayload);
const c2 = await putContentNodeWithRefs(cas, "c1", []);
const h2 = await putStateNode(cas, {
role: "coder",
meta: {},
start: startHash,
content: c2,
ancestors: [h1],
compact: null,
timestamp: 2,
} satisfies StateNodePayload);
const ec = await putContentNodeWithRefs(cas, "", []);
const fm = await putStateNode(cas, {
role: FORK_BRANCH_ROLE,
meta: {},
start: startHash,
content: ec,
ancestors: [h1],
compact: null,
timestamp: 3,
} satisfies StateNodePayload);
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
head: h2,
start: startHash,
updatedAt: 10,
});
await upsertThreadEntry(bundleDir, "THREAD_BBBBBBB", {
head: fm,
start: startHash,
updatedAt: 20,
});
await removeThreadEntry(bundleDir, "THREAD_AAAAAAA");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(await cas.get(h2)).toBeNull();
expect(await cas.get(h1)).not.toBeNull();
expect(await cas.get(startHash)).not.toBeNull();
expect(await cas.get(promptHash)).not.toBeNull();
expect(await cas.get(fm)).not.toBeNull();
});
});
+42 -30
View File
@@ -33,20 +33,12 @@ import {
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import type { ChainState, ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
type ChainState = {
/** State hash of the most recently written {@link StateNode}, or `null` before the first step. */
parentStateHash: string | null;
/** Ancestors recorded on the most recently written {@link StateNode}. */
parentAncestors: readonly string[];
};
const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] };
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
@@ -408,36 +400,56 @@ export async function executeThread(
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const prefilled = options.prefilledDiskSteps;
const fork = options.forkContinuation;
if (fork !== null && prefilled !== null) {
throw new Error("forkContinuation and prefilledDiskSteps cannot both be set");
}
if (prefilled !== null && prefilled.length !== input.steps.length) {
throw new Error(
`prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`,
);
}
const replayTs = options.replayTimestamps;
if (replayTs !== null && replayTs.length !== input.steps.length) {
throw new Error(
`replayTimestamps length (${replayTs.length}) must match input.steps length (${input.steps.length})`,
);
}
const bundleDir = getBundleDir(options.storageRoot, io.hash);
const promptHash = await io.cas.put(input.prompt);
const startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
let startHash: string;
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
if (fork !== null) {
startHash = fork.startHash;
logger("T9HQ2KHM", `thread ${io.threadId} continued fork for workflow ${workflowName}`);
} else {
const promptHash = await io.cas.put(input.prompt);
startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
let chain: ChainState = EMPTY_CHAIN;
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
}
let chain: ChainState = fork !== null ? fork.initialChain : EMPTY_CHAIN_STATE;
if (prefilled !== null) {
for (const row of prefilled) {
@@ -497,7 +509,7 @@ export async function executeThread(
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
timestamp: replayTs?.[i] ?? prefilled?.[i]?.timestamp ?? nowMs + i,
})),
};
@@ -1,9 +1,29 @@
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import type { CasStore } from "@uncaged/workflow-cas";
import { parseCasThreadNode, putContentNodeWithRefs, putStateNode } from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import type { RoleOutput, WorkflowCompletion } from "@uncaged/workflow-runtime";
import { END } from "@uncaged/workflow-runtime";
import { err, ok, type Result } from "@uncaged/workflow-util";
import { parse as parseYaml } from "yaml";
import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js";
import { upsertThreadEntry } from "./threads-index.js";
import type { CasForkPlan, ChainState, ForkContinuationOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */
/** Internal branch marker; skipped when presenting fork selection / replay slices. */
export const FORK_BRANCH_ROLE = "__fork__";
/** Cap for {@link StateNodePayload}.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */
export function tryParseWorkflowResultRecord(
obj: Record<string, unknown>,
): WorkflowCompletion | null {
@@ -18,227 +38,288 @@ export function tryParseWorkflowResultRecord(
return { returnCode, summary };
}
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
const role = obj.role;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return null;
}
if (typeof contentHash !== "string") {
return null;
}
if (meta === null || typeof meta !== "object") {
return null;
}
if (typeof timestamp !== "number") {
return null;
}
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
};
}
function parseRoleLine(
obj: Record<string, unknown>,
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const parsed = tryParseRoleStepRecord(obj);
if (parsed === null) {
return err(`invalid role record at line ${lineIndex}`);
}
return ok(parsed);
}
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
let startParsed: unknown;
try {
startParsed = JSON.parse(firstLine) as unknown;
} catch {
return err("invalid JSON on line 1 (start record)");
}
if (startParsed === null || typeof startParsed !== "object") {
return err("invalid start record shape");
}
const startRec = startParsed as Record<string, unknown>;
const name = startRec.name;
const hash = startRec.hash;
const threadId = startRec.threadId;
const parameters = startRec.parameters;
if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") {
return err("start record missing name, hash, or threadId");
}
if (parameters === null || typeof parameters !== "object") {
return err("start record missing parameters");
}
const paramsRec = parameters as Record<string, unknown>;
const prompt = paramsRec.prompt;
const options = paramsRec.options;
if (typeof prompt !== "string") {
return err("start record missing parameters.prompt");
}
if (options === null || typeof options !== "object") {
return err("start record missing parameters.options");
}
const optRec = options as Record<string, unknown>;
const maxRounds = optRec.maxRounds;
if (typeof maxRounds !== "number") {
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[], string> {
const roleSteps: ForkHistoricalStep[] = [];
for (let i = 1; i < lines.length; i++) {
const line = lines[i];
if (line === undefined) {
/** Walk {@link StateNode} hashes from head toward the first step (newest → oldest). */
export async function walkStateFramesNewestFirst(
cas: CasStore,
headHash: string,
): Promise<Array<{ hash: string; payload: StateNodePayload }>> {
const frames: Array<{ hash: string; payload: StateNodePayload }> = [];
let cur = headHash;
while (true) {
const yamlText = await cas.get(cur);
if (yamlText === null) {
break;
}
let rec: unknown;
try {
rec = JSON.parse(line) as unknown;
} catch {
return err(`invalid JSON at line ${i + 1}`);
}
if (rec === null || typeof rec !== "object") {
return err(`invalid record at line ${i + 1}`);
}
const recObj = rec as Record<string, unknown>;
const wf = tryParseWorkflowResultRecord(recObj);
if (wf !== null) {
if (i !== lines.length - 1) {
return err("WorkflowResult record must be the final line in `.data.jsonl`");
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
break;
}
const parsed = parseRoleLine(recObj, i + 1);
if (!parsed.ok) {
return parsed;
frames.push({ hash: cur, payload: parsed.node.payload });
const ancestors = parsed.node.payload.ancestors;
if (ancestors.length === 0) {
break;
}
roleSteps.push(parsed.value);
const parent = ancestors[0];
if (parent === undefined || parent === "") {
break;
}
cur = parent;
}
return ok(roleSteps);
return frames;
}
/**
* Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs.
*/
export function parseThreadDataJsonl(text: string): Result<
{
start: ParsedThreadStartRecord;
roleSteps: ForkHistoricalStep[];
},
string
> {
const lines = text
.split("\n")
.map((l) => l.trim())
.filter((l) => l !== "");
if (lines.length === 0) {
return err("thread data is empty");
}
const firstLine = lines[0];
if (firstLine === undefined) {
return err("thread data is empty");
}
const start = parseStartRecordLine(firstLine);
if (!start.ok) {
return start;
}
const roleSteps = parseFollowingRoleLines(lines);
if (!roleSteps.ok) {
return roleSteps;
}
return ok({
start: start.value,
roleSteps: roleSteps.value,
});
}
function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] {
function orderedUniqueRoles(roles: string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const s of roleSteps) {
if (!seen.has(s.role)) {
seen.add(s.role);
out.push(s.role);
for (const r of roles) {
if (!seen.has(r)) {
seen.add(r);
out.push(r);
}
}
return out;
}
/**
* Select historical steps for a fork:
* - `fromRole === null`: drop the last step (retry the last role).
* - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive).
*/
export function selectForkHistoricalSteps(
roleSteps: ForkHistoricalStep[],
async function readPromptText(cas: CasStore, promptHash: string): Promise<Result<string, string>> {
const yamlText = await cas.get(promptHash);
if (yamlText === null) {
return err(`prompt CAS blob missing: ${promptHash}`);
}
let raw: unknown;
try {
raw = parseYaml(yamlText) as unknown;
} catch {
return err(`prompt CAS blob is not valid YAML: ${promptHash}`);
}
if (raw === null || typeof raw !== "object") {
return err(`prompt CAS blob has unexpected shape: ${promptHash}`);
}
const payload = (raw as Record<string, unknown>).payload;
if (typeof payload !== "string") {
return err(`prompt CAS blob missing string payload: ${promptHash}`);
}
return ok(payload);
}
async function readStartWorkflowIdentity(params: {
cas: CasStore;
startHash: string;
}): Promise<
Result<{ workflowName: string; maxRounds: number; depth: number; prompt: string }, string>
> {
const yamlText = await params.cas.get(params.startHash);
if (yamlText === null) {
return err(`start node missing in CAS: ${params.startHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return err(`CAS blob is not a StartNode: ${params.startHash}`);
}
const refs = parsed.node.refs;
const promptHash = refs[0];
if (typeof promptHash !== "string") {
return err("StartNode refs[0] must be the prompt hash");
}
const prompt = await readPromptText(params.cas, promptHash);
if (!prompt.ok) {
return prompt;
}
const p = parsed.node.payload;
return ok({
workflowName: p.name,
maxRounds: p.maxRounds,
depth: p.depth,
prompt: prompt.value,
});
}
async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Promise<RoleOutput> {
let refs: string[] = [];
const blob = await cas.get(payload.content);
if (blob !== null) {
const cn = parseCasThreadNode(blob);
if (cn?.kind === "content") {
refs = [...cn.node.refs];
}
}
return {
role: payload.role,
contentHash: payload.content,
meta: payload.meta,
refs,
};
}
function meaningfulFramesOldestFirst(
newestFirst: Array<{ hash: string; payload: StateNodePayload }>,
): Array<{ hash: string; payload: StateNodePayload }> {
const chronological = [...newestFirst].reverse();
return chronological.filter((f) => f.payload.role !== END && f.payload.role !== FORK_BRANCH_ROLE);
}
function selectForkPointStateHash(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
fromRole: string | null,
): Result<ForkHistoricalStep[], string> {
if (roleSteps.length === 0) {
): Result<string | null, string> {
if (meaningfulOldestFirst.length === 0) {
return err("thread has no completed role steps to fork from");
}
if (fromRole === null) {
if (roleSteps.length === 1) {
return ok([]);
if (meaningfulOldestFirst.length === 1) {
return ok(null);
}
return ok(roleSteps.slice(0, -1));
const forkFrame = meaningfulOldestFirst[meaningfulOldestFirst.length - 2];
if (forkFrame === undefined) {
return err("thread has no completed role steps to fork from");
}
return ok(forkFrame.hash);
}
const idx = roleSteps.findIndex((s) => s.role === fromRole);
const idx = meaningfulOldestFirst.findIndex((f) => f.payload.role === fromRole);
if (idx < 0) {
const available = orderedUniqueRoles(roleSteps);
const available = orderedUniqueRoles(meaningfulOldestFirst.map((f) => f.payload.role));
return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`);
}
return ok(roleSteps.slice(0, idx + 1));
const forkFrame = meaningfulOldestFirst[idx];
if (forkFrame === undefined) {
return err("fork frame missing");
}
return ok(forkFrame.hash);
}
function replayFramesThroughForkPoint(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
forkPointHash: string | null,
): Array<{ hash: string; payload: StateNodePayload }> {
if (forkPointHash === null) {
return [];
}
const idx = meaningfulOldestFirst.findIndex((f) => f.hash === forkPointHash);
if (idx < 0) {
return [];
}
return meaningfulOldestFirst.slice(0, idx + 1);
}
async function buildForkContinuation(params: {
cas: CasStore;
sourceThreadId: string;
startHash: string;
forkPointStateHash: string | null;
}): Promise<Result<ForkContinuationOptions, string>> {
const { cas, sourceThreadId, startHash, forkPointStateHash } = params;
if (forkPointStateHash === null) {
return ok({
startHash,
forkHeadHash: startHash,
initialChain: EMPTY_CHAIN_STATE,
});
}
const yamlText = await cas.get(forkPointStateHash);
if (yamlText === null) {
return err(`fork point state missing in CAS: ${forkPointStateHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
return err(`fork point blob is not a StateNode: ${forkPointStateHash}`);
}
const fpPayload = parsed.node.payload;
const chainBefore: ChainState = {
parentStateHash: forkPointStateHash,
parentAncestors: fpPayload.ancestors,
};
const ancestorsMarker = computeAncestors(chainBefore);
const emptyContentHash = await putContentNodeWithRefs(cas, "", []);
const markerPayload: StateNodePayload = {
role: FORK_BRANCH_ROLE,
meta: { forkFrom: sourceThreadId },
start: startHash,
content: emptyContentHash,
ancestors: ancestorsMarker,
compact: null,
timestamp: Date.now(),
};
const markerHash = await putStateNode(cas, markerPayload);
const initialChain: ChainState = {
parentStateHash: markerHash,
parentAncestors: ancestorsMarker,
};
return ok({
startHash,
forkHeadHash: markerHash,
initialChain,
});
}
/**
* Read `.data.jsonl` text and compute fork payload for the worker `run` command.
* Prepare a CAS fork: writes the branch marker {@link StateNode}, registers `threads.json`,
* and returns worker payload fields (shared {@link StartNode}, zero ancestor duplication).
*/
export function buildForkPlan(
dataJsonlText: string,
fromRole: string | null,
): Result<ForkPlan, string> {
const parsed = parseThreadDataJsonl(dataJsonlText);
if (!parsed.ok) {
return parsed;
export async function prepareCasFork(params: {
cas: CasStore;
bundleDir: string;
bundleHash: string;
sourceThreadId: string;
headHash: string;
startHash: string;
newThreadId: string;
fromRole: string | null;
}): Promise<Result<CasForkPlan, string>> {
const id = await readStartWorkflowIdentity({
cas: params.cas,
startHash: params.startHash,
});
if (!id.ok) {
return id;
}
const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole);
if (!selected.ok) {
return selected;
const newestFirst = await walkStateFramesNewestFirst(params.cas, params.headHash);
const meaningful = meaningfulFramesOldestFirst(newestFirst);
const forkPoint = selectForkPointStateHash(meaningful, params.fromRole);
if (!forkPoint.ok) {
return forkPoint;
}
const { start } = parsed.value;
const replayFrames = replayFramesThroughForkPoint(meaningful, forkPoint.value);
const steps: RoleOutput[] = [];
const stepTimestamps: number[] = [];
for (const fr of replayFrames) {
steps.push(await payloadToRoleOutput(params.cas, fr.payload));
stepTimestamps.push(fr.payload.timestamp);
}
const cont = await buildForkContinuation({
cas: params.cas,
sourceThreadId: params.sourceThreadId,
startHash: params.startHash,
forkPointStateHash: forkPoint.value,
});
if (!cont.ok) {
return cont;
}
await upsertThreadEntry(params.bundleDir, params.newThreadId, {
head: cont.value.forkHeadHash,
start: params.startHash,
updatedAt: Date.now(),
});
return ok({
workflowName: start.workflowName,
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
workflowName: id.value.workflowName,
hash: params.bundleHash,
sourceThreadId: params.sourceThreadId,
prompt: id.value.prompt,
runOptions: { maxRounds: id.value.maxRounds, depth: id.value.depth },
steps,
stepTimestamps,
forkContinuation: cont.value,
});
}
+129 -69
View File
@@ -1,122 +1,182 @@
import { readdir, readFile } from "node:fs/promises";
import type { Stats } from "node:fs";
import { readdir, readFile, stat } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "@uncaged/workflow-cas";
import { type CasStore, createCasStore, findReachableHashes } from "@uncaged/workflow-cas";
import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util";
import { parseThreadDataJsonl } from "./fork-thread.js";
import type { ThreadHistoryEntry, ThreadIndex } from "./threads-index.js";
import { readThreadsIndex } from "./threads-index.js";
import type { GcResult } from "./types.js";
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseHistoryLine(jsonLine: string): ThreadHistoryEntry | null {
let raw: unknown;
try {
hashes = await readdir(logsRoot);
raw = JSON.parse(jsonLine) as unknown;
} catch {
return null;
}
if (!isPlainObject(raw)) {
return null;
}
const threadId = raw.threadId;
const head = raw.head;
const start = raw.start;
const completedAt = raw.completedAt;
if (
typeof threadId !== "string" ||
typeof head !== "string" ||
typeof start !== "string" ||
typeof completedAt !== "number"
) {
return null;
}
return { threadId, head, start, completedAt };
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: walks threads index + optional history dir
async function collectGcRootsFromBundle(bundleDir: string): Promise<Result<string[], string>> {
const roots: string[] = [];
let activeIndex: ThreadIndex;
try {
activeIndex = await readThreadsIndex(bundleDir);
} catch (e) {
return err(`failed to read threads.json under ${bundleDir}: ${String(e)}`);
}
for (const entry of Object.values(activeIndex)) {
roots.push(entry.head);
roots.push(entry.start);
}
const histDir = join(bundleDir, "history");
let histFiles: string[];
try {
histFiles = await readdir(histDir);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(roots);
}
return err(`failed to read history directory ${histDir}: ${String(e)}`);
}
for (const name of histFiles) {
if (!name.endsWith(".jsonl")) {
continue;
}
let text: string;
try {
text = await readFile(join(histDir, name), "utf8");
} catch (e) {
return err(`failed to read history file ${name}: ${String(e)}`);
}
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
const entry = parseHistoryLine(trimmed);
if (entry === null) {
continue;
}
roots.push(entry.head);
roots.push(entry.start);
}
}
return ok(roots);
}
async function collectAllGcRoots(storageRoot: string): Promise<Result<string[], string>> {
const bundlesRoot = join(storageRoot, "bundles");
let entries: string[];
try {
entries = await readdir(bundlesRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
return err(`failed to read bundles directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
const roots: string[] = [];
for (const name of entries) {
const bundleDir = join(bundlesRoot, name);
let st: Stats;
try {
entries = await readdir(dir);
st = await stat(bundleDir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
if (!st.isDirectory()) {
continue;
}
const chunk = await collectGcRootsFromBundle(bundleDir);
if (!chunk.ok) {
return chunk;
}
roots.push(...chunk.value);
}
paths.sort();
return ok(paths);
return ok(roots);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
async function deleteCasNotMarked(cas: CasStore, marked: ReadonlySet<string>): Promise<string[]> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
throw new Error(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
if (marked.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
throw new Error(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
return deletedHashes;
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
* Mark-and-sweep CAS GC: roots are every `head` / `start` hash from `threads.json` and
* `history/*.jsonl` across bundle dirs; marks closure via `refs[]`; deletes unreachable blobs.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
const rootsResult = await collectAllGcRoots(storageRoot);
if (!rootsResult.ok) {
return rootsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const roots = rootsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
const marked = await findReachableHashes(roots, cas);
let deletedHashes: string[];
try {
deletedHashes = await deleteCasNotMarked(cas, marked);
} catch (e) {
return err(String(e));
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
scannedThreads: roots.length,
activeRefs: marked.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
+10 -7
View File
@@ -1,11 +1,10 @@
export { createWorkflow } from "./create-workflow.js";
export { executeThread } from "./engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
@@ -13,18 +12,22 @@ export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./thread
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./types.js";
export { EMPTY_CHAIN_STATE } from "./types.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
@@ -1,10 +1,9 @@
import * as z from "zod/v4";
import { resolveModel } from "@uncaged/workflow-register";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { resolveModel } from "@uncaged/workflow-register";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import type { SupervisorDecision } from "./types.js";
@@ -1,6 +1,8 @@
import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-util";
/**
* Active-thread index entry stored in `<bundleDir>/threads.json`.
*
@@ -71,7 +73,8 @@ function parseThreadIndex(text: string): ThreadIndex {
return out;
}
async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
/** Read `<bundleDir>/threads.json` (empty object when missing or invalid). */
export async function readThreadsIndex(bundleDir: string): Promise<ThreadIndex> {
const path = threadsJsonPath(bundleDir);
let text: string;
try {
@@ -86,7 +89,7 @@ async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
return parseThreadIndex(text);
}
async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
export async function writeThreadsIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
const path = threadsJsonPath(bundleDir);
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
@@ -101,19 +104,19 @@ export async function upsertThreadEntry(
threadId: string,
entry: ThreadIndexEntry,
): Promise<void> {
const index = await readThreadIndex(bundleDir);
const index = await readThreadsIndex(bundleDir);
index[threadId] = entry;
await writeThreadIndex(bundleDir, index);
await writeThreadsIndex(bundleDir, index);
}
/** Remove a thread entry from `threads.json` (no-op when absent). */
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
const index = await readThreadIndex(bundleDir);
const index = await readThreadsIndex(bundleDir);
if (!(threadId in index)) {
return;
}
delete index[threadId];
await writeThreadIndex(bundleDir, index);
await writeThreadsIndex(bundleDir, index);
}
function dateKey(epochMs: number): string {
@@ -134,3 +137,63 @@ export async function appendThreadHistoryEntry(
const line = `${JSON.stringify(entry)}\n`;
await appendFile(path, line, "utf8");
}
/** Removes every `history/*.jsonl` line whose `threadId` matches (rewrite files in place). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: per-file JSONL filtering keeps RM deterministic
export async function removeThreadHistoryEntries(
bundleDir: string,
threadId: string,
): Promise<Result<number, string>> {
const histRoot = join(bundleDir, "history");
let files: string[];
try {
files = await readdir(histRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(0);
}
return err(`failed to read history directory: ${String(e)}`);
}
let removed = 0;
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const path = join(histRoot, name);
let text: string;
try {
text = await readFile(path, "utf8");
} catch {
continue;
}
const kept: string[] = [];
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
kept.push(`${trimmed}\n`);
continue;
}
if (rec === null || typeof rec !== "object") {
kept.push(`${trimmed}\n`);
continue;
}
const id = (rec as Record<string, unknown>).threadId;
if (id === threadId) {
removed++;
continue;
}
kept.push(`${trimmed}\n`);
}
await writeFile(path, kept.join(""), "utf8");
}
return ok(removed);
}
+33 -16
View File
@@ -11,7 +11,25 @@ export type ExecuteThreadIo = {
cas: CasStore;
};
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
/** CAS chain tail state before the next appended {@link StateNode}. */
export type ChainState = {
parentStateHash: string | null;
parentAncestors: readonly string[];
};
export const EMPTY_CHAIN_STATE: ChainState = { parentStateHash: null, parentAncestors: [] };
/**
* When forking, the worker continues from an existing {@link StartNode} plus an optional
* branch marker {@link StateNode} instead of allocating a new start blob.
*/
export type ForkContinuationOptions = {
startHash: string;
forkHeadHash: string;
initialChain: ChainState;
};
/** One replayed role step (prefill) before the generator runs (same layout as disk replay rows). */
export type PrefilledDiskStep = {
role: string;
contentHash: string;
@@ -30,37 +48,36 @@ export type ExecuteThreadOptions = {
/** When non-null, written into the start record so tooling can trace lineage. */
forkSourceThreadId: string | null;
/**
* Written to `.data.jsonl` immediately after the start record, before the generator runs.
* When non-null, replays these steps into CAS before the generator runs.
* Must match `input.steps` length and order when present.
*/
prefilledDiskSteps: PrefilledDiskStep[] | null;
/** When non-null, skip creating a new {@link StartNode} and continue this CAS chain. */
forkContinuation: ForkContinuationOptions | null;
/**
* When non-null, must match `input.steps.length`; supplies persisted timestamps for
* {@link ThreadContext.steps} (used when restoring history without prefilled CAS replay).
*/
replayTimestamps: readonly number[] | null;
/** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */
storageRoot: string;
};
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
export type ParsedThreadStartRecord = {
workflowName: string;
hash: string;
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
export type ForkPlan = {
export type CasForkPlan = {
workflowName: string;
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
steps: RoleOutput[];
stepTimestamps: number[];
forkContinuation: ForkContinuationOptions;
};
export type GcResult = {
/** Count of root hashes seeded from thread indexes (`head`/`start` per entry). */
scannedThreads: number;
/** Reachable CAS blobs after the mark phase. */
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
+80 -5
View File
@@ -1,9 +1,12 @@
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import {
ensureUncagedWorkflowSymlink,
importWorkflowBundleModule,
} from "@uncaged/workflow-register";
import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime";
import {
createLogger,
err,
@@ -14,7 +17,12 @@ import {
} from "@uncaged/workflow-util";
import { executeThread } from "./engine.js";
import { createThreadPauseGate } from "./thread-pause-gate.js";
import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js";
import type {
ExecuteThreadIo,
ForkContinuationOptions,
PrefilledDiskStep,
ThreadPauseGate,
} from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
@@ -25,9 +33,10 @@ type RunCommand = {
prompt: string;
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
/** Timestamps aligned with `steps` for replay / fork restore; length must match `steps` when steps are non-empty. */
stepTimestamps: number[] | null;
forkSourceThreadId: string | null;
forkContinuation: ForkContinuationOptions | null;
};
type KillCommand = {
@@ -70,6 +79,7 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
};
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: mirrors permissive worker IPC decoding shape checks
function parseRunStepsPayload(rec: Record<string, unknown>): {
steps: RoleOutput[];
stepTimestamps: number[] | null;
@@ -104,12 +114,60 @@ function parseRunStepsPayload(rec: Record<string, unknown>): {
return null;
}
}
const parallelTsRaw = rec.stepTimestamps;
if (
steps.length > 0 &&
Array.isArray(parallelTsRaw) &&
parallelTsRaw.length === steps.length &&
parallelTsRaw.every((x): x is number => typeof x === "number")
) {
return { steps, stepTimestamps: [...parallelTsRaw] };
}
return {
steps,
stepTimestamps: anyTimestamp ? timestamps : null,
};
}
function parseForkContinuation(rec: Record<string, unknown>): ForkContinuationOptions | null {
const raw = rec.forkContinuation;
if (raw === undefined || raw === null) {
return null;
}
if (typeof raw !== "object") {
return null;
}
const o = raw as Record<string, unknown>;
const startHash = o.startHash;
const forkHeadHash = o.forkHeadHash;
const ic = o.initialChain;
if (typeof startHash !== "string" || typeof forkHeadHash !== "string") {
return null;
}
if (ic === null || typeof ic !== "object") {
return null;
}
const ich = ic as Record<string, unknown>;
const pph = ich.parentStateHash;
const pa = ich.parentAncestors;
if (!(pph === null || typeof pph === "string")) {
return null;
}
if (!Array.isArray(pa) || !pa.every((x) => typeof x === "string")) {
return null;
}
return {
startHash,
forkHeadHash,
initialChain: {
parentStateHash: pph,
parentAncestors: pa,
},
};
}
function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null {
const threadId = rec.threadId;
const workflowName = rec.workflowName;
@@ -145,6 +203,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
}
forkSourceThreadId = rawFork;
}
const forkContinuation = parseForkContinuation(rec);
return {
type: "run",
threadId,
@@ -154,6 +213,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
forkContinuation,
};
}
@@ -354,6 +414,7 @@ async function main(): Promise<void> {
}
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: TCP worker multiplexes lifecycle + runs
async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise<void> {
if (cmd.type !== "run") {
dispatchThreadLifecycleCommand(threads, socket, cmd);
@@ -391,7 +452,19 @@ async function main(): Promise<void> {
const baseTs = Date.now();
let prefilledDiskSteps: PrefilledDiskStep[] | null = null;
if (cmd.steps.length > 0) {
let replayTimestamps: readonly number[] | null = null;
if (cmd.forkContinuation !== null) {
if (
cmd.steps.length > 0 &&
(cmd.stepTimestamps === null || cmd.stepTimestamps.length !== cmd.steps.length)
) {
bootLog("J5WQ8NXT", "forkContinuation requires stepTimestamps aligned with steps");
throw new Error("forkContinuation requires stepTimestamps aligned with steps");
}
replayTimestamps =
cmd.steps.length === 0 ? null : (cmd.stepTimestamps as readonly number[]);
} else if (cmd.steps.length > 0) {
prefilledDiskSteps = cmd.steps.map((step, i) => {
const ts = cmd.stepTimestamps?.[i];
return {
@@ -414,6 +487,8 @@ async function main(): Promise<void> {
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
forkSourceThreadId: cmd.forkSourceThreadId,
prefilledDiskSteps,
forkContinuation: cmd.forkContinuation,
replayTimestamps,
storageRoot,
},
io,
@@ -1,7 +1,12 @@
import type { ExtractContext, ExtractFn, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type {
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
} from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
export type ExtractDeps = {
@@ -121,7 +126,7 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
): Promise<ExtractResult<T>> => {
const text = await buildExtractUserContent(ctx, prompt, deps);
const result = await reactor({
thread: { cas: deps.cas },
@@ -131,6 +136,10 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
if (!result.ok) {
throw new Error(`extract failed: ${result.error}`);
}
return result.value;
return {
meta: result.value,
contentPayload: ctx.agentContent,
refs: [],
};
};
}
@@ -1,6 +1,5 @@
import * as z from "zod/v4";
import { err, ok, type Result } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import type { LlmError, LlmExtractArgs } from "./types.js";
+23 -11
View File
@@ -1,35 +1,47 @@
export { createWorkflow } from "./engine/create-workflow.js";
export { executeThread } from "./engine/engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./engine/fork-thread.js";
export { garbageCollectCas } from "./engine/gc.js";
export { createThreadPauseGate } from "./engine/thread-pause-gate.js";
export type {
ThreadHistoryEntry,
ThreadIndex,
ThreadIndexEntry,
} from "./engine/threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./engine/threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./engine/types.js";
export { EMPTY_CHAIN_STATE } from "./engine/types.js";
export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export {
buildExtractUserContent,
createExtract,
type ExtractThreadContext,
} from "./extract/index.js";
export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
} from "./extract/index.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export { workflowAsAgent, type WorkflowAsAgentOptions } from "./workflow-as-agent.js";
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
@@ -1,17 +1,20 @@
import { join } from "node:path";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import { extractBundleExports } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import {
extractBundleExports,
getRegisteredWorkflow,
readWorkflowRegistry,
} from "@uncaged/workflow-register";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import {
createLogger,
generateUlid,
getDefaultWorkflowStorageRoot,
getGlobalCasDir,
} from "@uncaged/workflow-util";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
@@ -98,6 +101,8 @@ export function workflowAsAgent(
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot,
},
io,
+3
View File
@@ -3,7 +3,9 @@
export type {
ContentMerkleNode,
StartNode,
StartNodePayload,
StateNode,
StateNodePayload,
} from "./cas-types.js";
export type {
@@ -14,6 +16,7 @@ export type {
CasStore,
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
Moderator,
ModeratorContext,
+2 -2
View File
@@ -1,9 +1,9 @@
import type { Result } from "./types.js";
export function ok<T>(value: T): Result<T, never> {
return { ok: true, value };
return { ok: true, value };
}
export function err<E>(error: E): Result<never, E> {
return { ok: false, error };
return { ok: false, error };
}
+75 -68
View File
@@ -12,10 +12,10 @@ export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error:
// ── CAS ────────────────────────────────────────────────────────────
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
// ── Workflow Descriptor ────────────────────────────────────────────
@@ -23,13 +23,13 @@ export type CasStore = {
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
description: string;
schema: WorkflowRoleSchema;
};
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
// ── Role & Thread ──────────────────────────────────────────────────
@@ -37,131 +37,138 @@ export type WorkflowDescriptor = {
export type RoleMeta = Record<string, Record<string, unknown>>;
export type RoleOutput = {
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
};
export type StartStep = {
role: typeof START;
content: string;
meta: { maxRounds: number };
timestamp: number;
role: typeof START;
content: string;
meta: { maxRounds: number };
timestamp: number;
};
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: {
role: K;
meta: M[K];
contentHash: string;
refs: string[];
timestamp: number;
};
[K in keyof M & string]: {
role: K;
meta: M[K];
contentHash: string;
refs: string[];
timestamp: number;
};
}[keyof M & string];
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
depth: number;
start: StartStep;
steps: RoleStep<M>[];
threadId: string;
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
export type ModeratorContext<M extends RoleMeta = RoleMeta> = ThreadContext<M>;
export type AgentContext<M extends RoleMeta = RoleMeta> = ModeratorContext<M> & {
currentRole: {
name: string;
systemPrompt: string;
};
currentRole: {
name: string;
systemPrompt: string;
};
};
export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
agentContent: string;
};
// ── Workflow Completion ────────────────────────────────────────────
export type WorkflowCompletion = {
returnCode: number;
summary: string;
returnCode: number;
summary: string;
};
export type WorkflowResult = WorkflowCompletion & {
rootHash: string;
rootHash: string;
};
// ── LLM Provider ───────────────────────────────────────────────────
export type LlmProvider = {
baseUrl: string;
apiKey: string;
model: string;
baseUrl: string;
apiKey: string;
model: string;
};
export type ProviderConfig = {
baseUrl: string;
apiKey: string;
baseUrl: string;
apiKey: string;
};
export type ResolvedModel = {
baseUrl: string;
apiKey: string;
model: string;
baseUrl: string;
apiKey: string;
model: string;
};
export type WorkflowConfig = {
maxDepth: number;
supervisorInterval: number;
providers: Record<string, ProviderConfig>;
models: Record<string, string>;
maxDepth: number;
supervisorInterval: number;
providers: Record<string, ProviderConfig>;
models: Record<string, string>;
};
// ── Functions ──────────────────────────────────────────────────────
/** Structured output of the extract phase (RFC v3 content Merkle + artifact refs). */
export type ExtractResult<T extends Record<string, unknown>> = {
meta: T;
contentPayload: string;
refs: string[];
};
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<ExtractResult<T>>;
export type AgentFn = (ctx: AgentContext) => Promise<string>;
export type AgentBinding = {
agent: AgentFn;
overrides: Partial<Record<string, AgentFn>> | null;
agent: AgentFn;
overrides: Partial<Record<string, AgentFn>> | null;
};
// ── Workflow Runtime & Definition ──────────────────────────────────
export type WorkflowRuntime = {
cas: CasStore;
extract: ExtractFn;
cas: CasStore;
extract: ExtractFn;
};
export type WorkflowFn = (
thread: ThreadContext,
runtime: WorkflowRuntime,
thread: ThreadContext,
runtime: WorkflowRuntime,
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
export type RoleDefinition<Meta extends Record<string, unknown>> = {
description: string;
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
extractRefs: ((meta: Meta) => string[]) | null;
description: string;
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
extractRefs: ((meta: Meta) => string[]) | null;
};
export type Moderator<M extends RoleMeta> = (
ctx: ModeratorContext<M>,
ctx: ModeratorContext<M>,
) => (keyof M & string) | typeof END;
export type WorkflowDefinition<M extends RoleMeta> = {
description: string;
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
description: string;
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
export type AdvanceOutcome<M extends RoleMeta> =
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
@@ -1,6 +1,5 @@
import type * as z from "zod/v4";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import type * as z from "zod/v4";
import type {
ChatMessage,
+1 -2
View File
@@ -1,6 +1,5 @@
import type * as z from "zod/v4";
import type { Result } from "@uncaged/workflow-protocol";
import type * as z from "zod/v4";
export type ToolCall = {
id: string;
+1 -3
View File
@@ -5,7 +5,5 @@
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" }
]
"references": [{ "path": "../workflow-protocol" }]
}
@@ -1,4 +1,5 @@
import { isBuiltin } from "node:module";
import { err, ok, type Result } from "@uncaged/workflow-util";
import type {
CallExpression,
ExportAllDeclaration,
@@ -12,8 +13,6 @@ import type {
} from "acorn";
import * as acorn from "acorn";
import { err, ok, type Result } from "@uncaged/workflow-util";
import type { WorkflowBundleValidationInput } from "./types.js";
/** Acorn Node with index-access for property traversal. */
@@ -2,9 +2,9 @@ import type { WorkflowDescriptor, WorkflowFn } from "@uncaged/workflow-protocol"
export type {
WorkflowDescriptor,
WorkflowFn,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowFn,
} from "@uncaged/workflow-protocol";
export type WorkflowBundleValidationInput = {
+17 -19
View File
@@ -1,12 +1,3 @@
export {
buildDescriptor,
importWorkflowBundleModule,
validateWorkflowBundle,
ensureUncagedWorkflowSymlink,
extractBundleExports,
stringifyWorkflowDescriptor,
validateWorkflowDescriptor,
} from "./bundle/index.js";
export type {
ExtractBundleExportsOptions,
ExtractedBundleExports,
@@ -15,7 +6,23 @@ export type {
WorkflowRoleDescriptor,
WorkflowRoleSchema,
} from "./bundle/index.js";
export {
buildDescriptor,
ensureUncagedWorkflowSymlink,
extractBundleExports,
importWorkflowBundleModule,
stringifyWorkflowDescriptor,
validateWorkflowBundle,
validateWorkflowDescriptor,
} from "./bundle/index.js";
export type { ProviderConfig, ResolvedModel } from "./config/index.js";
export { resolveModel, splitProviderModelRef } from "./config/index.js";
export type {
WorkflowConfig,
WorkflowHistoryEntry,
WorkflowRegistryEntry,
WorkflowRegistryFile,
} from "./registry/index.js";
export {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
@@ -28,12 +35,3 @@ export {
workflowRegistryPath,
writeWorkflowRegistry,
} from "./registry/index.js";
export type {
WorkflowConfig,
WorkflowHistoryEntry,
WorkflowRegistryEntry,
WorkflowRegistryFile,
} from "./registry/index.js";
export { resolveModel, splitProviderModelRef } from "./config/index.js";
export type { ProviderConfig, ResolvedModel } from "./config/index.js";
@@ -1,6 +1,6 @@
import type { ProviderConfig } from "@uncaged/workflow-protocol";
import { splitProviderModelRef } from "../config/index.js";
import { createLogger, err, ok, type Result } from "@uncaged/workflow-util";
import { splitProviderModelRef } from "../config/index.js";
import type {
WorkflowConfig,
WorkflowHistoryEntry,
@@ -1,8 +1,7 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { parseDocument, stringify } from "yaml";
import { err, ok, type Result } from "@uncaged/workflow-util";
import { parseDocument, stringify } from "yaml";
import { normalizeWorkflowRegistryRoot } from "./registry-normalize.js";
import type { WorkflowHistoryEntry, WorkflowRegistryEntry, WorkflowRegistryFile } from "./types.js";
+1 -4
View File
@@ -1,8 +1,5 @@
{
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" }
],
"references": [{ "path": "../workflow-protocol" }, { "path": "../workflow-util" }],
"compilerOptions": {
"target": "ES2022",
"lib": ["ES2022"],
@@ -0,0 +1,121 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import { buildThreadContext, END, START } from "../src/index.js";
describe("buildThreadContext", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-build-ctx-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("walks ancestor chain, resolves prompt, orders steps chronologically", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("hello-task");
const bundleHash = "BHAAAAAAAAAAA";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, maxRounds: 99, depth: 2 },
promptHash,
);
const art = await cas.put("artifact-a");
const chPlan = await putContentNodeWithRefs(cas, "plan body", [art]);
const statePlan = await putStateNode(cas, {
role: "planner",
meta: { phase: 1 },
start: startHash,
content: chPlan,
ancestors: [],
compact: null,
timestamp: 1000,
});
const chCode = await putContentNodeWithRefs(cas, "code body", []);
const stateCode = await putStateNode(cas, {
role: "coder",
meta: { phase: 2 },
start: startHash,
content: chCode,
ancestors: [statePlan],
compact: null,
timestamp: 2000,
});
const ctx = await buildThreadContext(stateCode, cas);
expect(ctx.threadId).toBe("");
expect(ctx.depth).toBe(2);
expect(ctx.start.role).toBe(START);
expect(ctx.start.content).toBe("hello-task");
expect(ctx.start.meta.maxRounds).toBe(99);
expect(ctx.steps.map((s) => s.role)).toEqual(["planner", "coder"]);
expect(ctx.steps[0]?.refs).toEqual([art]);
expect(ctx.steps[1]?.refs).toEqual([]);
expect(ctx.steps[0]?.timestamp).toBe(1000);
expect(ctx.steps[1]?.timestamp).toBe(2000);
});
test("StartNode head yields empty steps", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("only-prompt");
const startHash = await putStartNode(
cas,
{ name: "solo", hash: "BHBBBBBBBBBBB", maxRounds: 3, depth: 1 },
promptHash,
);
const ctx = await buildThreadContext(startHash, cas);
expect(ctx.steps).toEqual([]);
expect(ctx.start.content).toBe("only-prompt");
expect(ctx.depth).toBe(1);
expect(ctx.start.meta.maxRounds).toBe(3);
});
test("omits __end__ states from steps", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("task");
const bundleHash = "BHCCCCCCCCCCC";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, maxRounds: 10, depth: 0 },
promptHash,
);
const ch1 = await putContentNodeWithRefs(cas, "step-one", []);
const state1 = await putStateNode(cas, {
role: "worker",
meta: { done: false },
start: startHash,
content: ch1,
ancestors: [],
compact: null,
timestamp: 500,
});
const endContent = await putContentNodeWithRefs(cas, "finished", []);
const endState = await putStateNode(cas, {
role: END,
meta: { returnCode: 0, summary: "finished" },
start: startHash,
content: endContent,
ancestors: [state1],
compact: null,
timestamp: 600,
});
const ctx = await buildThreadContext(endState, cas);
expect(ctx.steps.map((s) => s.role)).toEqual(["worker"]);
});
});
+1
View File
@@ -8,6 +8,7 @@
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow-cas": "workspace:*",
"@uncaged/workflow-protocol": "workspace:*"
},
"peerDependencies": {
@@ -0,0 +1,153 @@
import { getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
import type {
CasStore,
RoleMeta,
RoleStep,
StartNode,
StateNode,
ThreadContext,
} from "@uncaged/workflow-protocol";
import { END, START } from "@uncaged/workflow-protocol";
async function loadParsedNode(cas: CasStore, hash: string) {
const yamlText = await cas.get(hash);
if (yamlText === null) {
return null;
}
return parseCasThreadNode(yamlText);
}
async function resolvePromptText(cas: CasStore, promptHash: string): Promise<string> {
const text = await getContentMerklePayload(cas, promptHash);
if (text !== null) {
return text;
}
throw new Error(`buildThreadContext: could not resolve prompt text at ${promptHash}`);
}
async function collectStateChainFromHead(cas: CasStore, headHash: string): Promise<StateNode[]> {
const reversed: StateNode[] = [];
let hash: string | null = headHash;
while (hash !== null) {
const parsed = await loadParsedNode(cas, hash);
if (parsed === null || parsed.kind !== "state") {
throw new Error(`buildThreadContext: expected state node at ${hash}`);
}
reversed.push(parsed.node);
const anc = parsed.node.payload.ancestors;
hash = anc.length > 0 ? anc[0] : null;
}
reversed.reverse();
return reversed;
}
async function threadFromStartHead<M extends RoleMeta>(
node: StartNode,
cas: CasStore,
): Promise<ThreadContext<M>> {
const promptHash = node.refs[0];
if (promptHash === undefined) {
throw new Error("buildThreadContext: StartNode missing refs[0] prompt");
}
const prompt = await resolvePromptText(cas, promptHash);
const p = node.payload;
return {
threadId: "",
depth: p.depth,
start: {
role: START,
content: prompt,
meta: { maxRounds: p.maxRounds },
timestamp: 0,
},
steps: [],
};
}
async function buildRoleStepsFromStates<M extends RoleMeta>(
chronologicalStates: StateNode[],
cas: CasStore,
): Promise<RoleStep<M>[]> {
const steps: RoleStep<M>[] = [];
for (const st of chronologicalStates) {
if (st.payload.role === END) {
continue;
}
const contentParsed = await loadParsedNode(cas, st.payload.content);
if (contentParsed === null || contentParsed.kind !== "content") {
throw new Error(`buildThreadContext: expected content node at ${st.payload.content}`);
}
steps.push({
role: st.payload.role,
meta: st.payload.meta,
contentHash: st.payload.content,
refs: [...contentParsed.node.refs],
timestamp: st.payload.timestamp,
} as RoleStep<M>);
}
return steps;
}
async function threadFromStateHead<M extends RoleMeta>(
headHash: string,
cas: CasStore,
): Promise<ThreadContext<M>> {
const chronologicalStates = await collectStateChainFromHead(cas, headHash);
const firstState = chronologicalStates[0];
if (firstState === undefined) {
throw new Error("buildThreadContext: empty state chain");
}
const startBlob = await loadParsedNode(cas, firstState.payload.start);
if (startBlob === null || startBlob.kind !== "start") {
throw new Error(`buildThreadContext: StartNode missing at ${firstState.payload.start}`);
}
const promptHash = startBlob.node.refs[0];
if (promptHash === undefined) {
throw new Error("buildThreadContext: StartNode missing refs[0] prompt");
}
const prompt = await resolvePromptText(cas, promptHash);
const sp = startBlob.node.payload;
const steps = await buildRoleStepsFromStates<M>(chronologicalStates, cas);
const firstTs = steps[0]?.timestamp ?? 0;
return {
threadId: "",
depth: sp.depth,
start: {
role: START,
content: prompt,
meta: { maxRounds: sp.maxRounds },
timestamp: firstTs,
},
steps,
};
}
/**
* Reconstructs {@link ThreadContext} by walking the CAS state chain from {@link headHash}.
*
* Walks each {@link StateNode} via `payload.ancestors[0]` until the ancestor list is empty,
* resolves the prompt from the shared {@link StartNode} (`refs[0]` → prompt blob), and builds
* steps from non-`__end__` states in chronological order.
*
* `threadId` is set to `""` — callers that load from `threads.json` should overwrite it.
*/
export async function buildThreadContext<M extends RoleMeta = RoleMeta>(
headHash: string,
cas: CasStore,
): Promise<ThreadContext<M>> {
const headParsed = await loadParsedNode(cas, headHash);
if (headParsed === null) {
throw new Error(`buildThreadContext: missing or invalid CAS blob ${headHash}`);
}
if (headParsed.kind === "start") {
return threadFromStartHead(headParsed.node, cas);
}
if (headParsed.kind !== "state") {
throw new Error(`buildThreadContext: head ${headHash} must be start or state node`);
}
return threadFromStateHead(headHash, cas);
}
@@ -1,3 +1,4 @@
import { putContentNodeWithRefs } from "@uncaged/workflow-cas";
import type * as z from "zod/v4";
import {
@@ -5,7 +6,6 @@ import {
type AgentBinding,
type AgentContext,
type AgentFn,
type CasStore,
END,
type ExtractContext,
type ModeratorContext,
@@ -38,8 +38,16 @@ function resolveExtractedRefs(
return extractRefsFn(meta as Record<string, unknown>);
}
async function putContentBlob(store: CasStore, raw: string): Promise<string> {
return store.put(raw);
function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const h of [...a, ...b]) {
if (!seen.has(h)) {
seen.add(h);
out.push(h);
}
}
return out;
}
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
@@ -86,23 +94,29 @@ async function advanceOneRound<M extends RoleMeta>(
agentContent: raw,
};
const meta = await runtime.extract(
const extracted = await runtime.extract(
roleDef.schema as z.ZodType<Record<string, unknown>>,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
);
const contentHash = await putContentBlob(runtime.cas, raw);
const refsFromMeta = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
extracted.meta,
);
const refs = refsFromMeta.includes(contentHash) ? refsFromMeta : [...refsFromMeta, contentHash];
const artifactRefs = mergeUniqueHashes(extracted.refs, refsFromMeta);
const contentHash = await putContentNodeWithRefs(
runtime.cas,
extracted.contentPayload,
artifactRefs,
);
const refs = artifactRefs.includes(contentHash) ? artifactRefs : [...artifactRefs, contentHash];
const step = {
role: next,
contentHash,
meta,
meta: extracted.meta,
refs,
timestamp: Date.now(),
} as RoleStep<M>;
+26 -24
View File
@@ -1,29 +1,31 @@
export { buildThreadContext } from "./build-context.js";
export { createWorkflow } from "./create-workflow.js";
export { err, ok } from "./result.js";
export type {
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
+26 -25
View File
@@ -3,31 +3,32 @@
// imports from "@uncaged/workflow-runtime" continues to work.
export type {
AgentBinding,
AgentContext,
AgentFn,
AdvanceOutcome,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
AdvanceOutcome,
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "@uncaged/workflow-protocol";
export { END, START } from "@uncaged/workflow-protocol";
+1 -3
View File
@@ -18,7 +18,5 @@
"types": ["bun-types"]
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../workflow-protocol" }
]
"references": [{ "path": "../workflow-cas" }, { "path": "../workflow-protocol" }]
}
@@ -6,8 +6,5 @@
"composite": true
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../workflow-register" },
{ "path": "../workflow-runtime" }
]
"references": [{ "path": "../workflow-register" }, { "path": "../workflow-runtime" }]
}
@@ -5,8 +5,13 @@ import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import { createExtract } from "@uncaged/workflow-execute";
import { validateWorkflowDescriptor } from "@uncaged/workflow-register";
import { createWorkflow } from "@uncaged/workflow-runtime";
import { END, type ModeratorContext, type RoleStep, START } from "@uncaged/workflow-runtime";
import {
createWorkflow,
END,
type ModeratorContext,
type RoleStep,
START,
} from "@uncaged/workflow-runtime";
import { buildSolveIssueDescriptor } from "../src/descriptor.js";
import type { DeveloperMeta } from "../src/developer.js";
import { solveIssueModerator, solveIssueWorkflowDefinition } from "../src/index.js";
@@ -6,8 +6,5 @@
"composite": true
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../workflow-register" },
{ "path": "../workflow-runtime" }
]
"references": [{ "path": "../workflow-register" }, { "path": "../workflow-runtime" }]
}
+6 -6
View File
@@ -1,13 +1,13 @@
export { err, ok } from "@uncaged/workflow-protocol";
export {
CROCKFORD_BASE32_ALPHABET,
decodeCrockfordBase32Bits,
decodeCrockfordToUint64,
encodeCrockfordBase32Bits,
encodeUint64AsCrockford,
CROCKFORD_BASE32_ALPHABET,
decodeCrockfordBase32Bits,
decodeCrockfordToUint64,
encodeCrockfordBase32Bits,
encodeUint64AsCrockford,
} from "./base32.js";
export { createLogger } from "./logger.js";
export { mergeRefsWithContentHash, normalizeRefsField } from "./refs-field.js";
export { ok, err } from "@uncaged/workflow-protocol";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
export type { CreateLoggerOptions, LogFn, LoggerSink, Result } from "./types.js";
export { generateUlid } from "./ulid.js";
+1 -1
View File
@@ -3,7 +3,7 @@ export type { Result } from "@uncaged/workflow-protocol";
export type LoggerSink = { kind: "stderr" } | { kind: "file"; path: string };
export type CreateLoggerOptions = {
sink: LoggerSink;
sink: LoggerSink;
};
export type LogFn = (tag: string, content: string) => void;
+1 -3
View File
@@ -5,7 +5,5 @@
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" }
]
"references": [{ "path": "../workflow-protocol" }]
}