feat: Phase 4 — CAS-based fork + mark-and-sweep GC

- Rewrite fork to create StateNode pointing to fork point (zero duplication)
- Rewrite GC as mark-and-sweep: roots from threads.json + history, findReachableHashes via refs[]
- Remove .data.jsonl code paths
- Fix all 7 previously failing CLI tests
- New: gc-mark-sweep.test.ts verifying shared nodes survive GC
- All 166 tests pass

Refs #155, closes #159

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-09 08:12:49 +00:00
parent 26cf51366f
commit f3aedf8d6c
22 changed files with 1724 additions and 1073 deletions
@@ -40,6 +40,8 @@ function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOpt
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot: "/tmp/never",
...overrides,
};
@@ -0,0 +1,112 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import { FORK_BRANCH_ROLE } from "../src/engine/fork-thread.js";
import { garbageCollectCas } from "../src/engine/gc.js";
import { getBundleDir, removeThreadEntry, upsertThreadEntry } from "../src/engine/threads-index.js";
describe("garbageCollectCas (mark-and-sweep)", () => {
let storageRoot: string;
let casDir: string;
beforeEach(async () => {
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-ms-"));
casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
await writeFile(
join(storageRoot, "workflow.yaml"),
"config:\n maxDepth: 1\n supervisorInterval: 0\n providers: {}\n models: {}\nworkflows: {}\n",
"utf8",
);
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("shared CAS prefix survives when one fork thread index entry is removed", async () => {
const bundleHash = "TESTGC0000001";
const bundleDir = getBundleDir(storageRoot, bundleHash);
await mkdir(bundleDir, { recursive: true });
const cas = createCasStore(casDir);
const promptHash = await cas.put("prompt");
const startHash = await putStartNode(
cas,
{
name: "demo",
hash: bundleHash,
maxRounds: 5,
depth: 0,
},
promptHash,
);
const c1 = await putContentNodeWithRefs(cas, "p1", []);
const h1 = await putStateNode(cas, {
role: "planner",
meta: {},
start: startHash,
content: c1,
ancestors: [],
compact: null,
timestamp: 1,
} satisfies StateNodePayload);
const c2 = await putContentNodeWithRefs(cas, "c1", []);
const h2 = await putStateNode(cas, {
role: "coder",
meta: {},
start: startHash,
content: c2,
ancestors: [h1],
compact: null,
timestamp: 2,
} satisfies StateNodePayload);
const ec = await putContentNodeWithRefs(cas, "", []);
const fm = await putStateNode(cas, {
role: FORK_BRANCH_ROLE,
meta: {},
start: startHash,
content: ec,
ancestors: [h1],
compact: null,
timestamp: 3,
} satisfies StateNodePayload);
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
head: h2,
start: startHash,
updatedAt: 10,
});
await upsertThreadEntry(bundleDir, "THREAD_BBBBBBB", {
head: fm,
start: startHash,
updatedAt: 20,
});
await removeThreadEntry(bundleDir, "THREAD_AAAAAAA");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(await cas.get(h2)).toBeNull();
expect(await cas.get(h1)).not.toBeNull();
expect(await cas.get(startHash)).not.toBeNull();
expect(await cas.get(promptHash)).not.toBeNull();
expect(await cas.get(fm)).not.toBeNull();
});
});
+42 -30
View File
@@ -33,20 +33,12 @@ import {
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import type { ChainState, ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
type ChainState = {
/** State hash of the most recently written {@link StateNode}, or `null` before the first step. */
parentStateHash: string | null;
/** Ancestors recorded on the most recently written {@link StateNode}. */
parentAncestors: readonly string[];
};
const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] };
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
@@ -408,36 +400,56 @@ export async function executeThread(
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const prefilled = options.prefilledDiskSteps;
const fork = options.forkContinuation;
if (fork !== null && prefilled !== null) {
throw new Error("forkContinuation and prefilledDiskSteps cannot both be set");
}
if (prefilled !== null && prefilled.length !== input.steps.length) {
throw new Error(
`prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`,
);
}
const replayTs = options.replayTimestamps;
if (replayTs !== null && replayTs.length !== input.steps.length) {
throw new Error(
`replayTimestamps length (${replayTs.length}) must match input.steps length (${input.steps.length})`,
);
}
const bundleDir = getBundleDir(options.storageRoot, io.hash);
const promptHash = await io.cas.put(input.prompt);
const startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
let startHash: string;
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
if (fork !== null) {
startHash = fork.startHash;
logger("T9HQ2KHM", `thread ${io.threadId} continued fork for workflow ${workflowName}`);
} else {
const promptHash = await io.cas.put(input.prompt);
startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
let chain: ChainState = EMPTY_CHAIN;
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
}
let chain: ChainState = fork !== null ? fork.initialChain : EMPTY_CHAIN_STATE;
if (prefilled !== null) {
for (const row of prefilled) {
@@ -497,7 +509,7 @@ export async function executeThread(
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
timestamp: replayTs?.[i] ?? prefilled?.[i]?.timestamp ?? nowMs + i,
})),
};
@@ -1,9 +1,29 @@
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import type { CasStore } from "@uncaged/workflow-cas";
import { parseCasThreadNode, putContentNodeWithRefs, putStateNode } from "@uncaged/workflow-cas";
import type { StateNodePayload } from "@uncaged/workflow-protocol";
import type { RoleOutput, WorkflowCompletion } from "@uncaged/workflow-runtime";
import { END } from "@uncaged/workflow-runtime";
import { err, ok, type Result } from "@uncaged/workflow-util";
import { parse as parseYaml } from "yaml";
import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js";
import { upsertThreadEntry } from "./threads-index.js";
import type { CasForkPlan, ChainState, ForkContinuationOptions } from "./types.js";
import { EMPTY_CHAIN_STATE } from "./types.js";
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */
/** Internal branch marker; skipped when presenting fork selection / replay slices. */
export const FORK_BRANCH_ROLE = "__fork__";
/** Cap for {@link StateNodePayload}.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */
export function tryParseWorkflowResultRecord(
obj: Record<string, unknown>,
): WorkflowCompletion | null {
@@ -18,227 +38,288 @@ export function tryParseWorkflowResultRecord(
return { returnCode, summary };
}
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
const role = obj.role;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return null;
}
if (typeof contentHash !== "string") {
return null;
}
if (meta === null || typeof meta !== "object") {
return null;
}
if (typeof timestamp !== "number") {
return null;
}
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
};
}
function parseRoleLine(
obj: Record<string, unknown>,
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const parsed = tryParseRoleStepRecord(obj);
if (parsed === null) {
return err(`invalid role record at line ${lineIndex}`);
}
return ok(parsed);
}
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
let startParsed: unknown;
try {
startParsed = JSON.parse(firstLine) as unknown;
} catch {
return err("invalid JSON on line 1 (start record)");
}
if (startParsed === null || typeof startParsed !== "object") {
return err("invalid start record shape");
}
const startRec = startParsed as Record<string, unknown>;
const name = startRec.name;
const hash = startRec.hash;
const threadId = startRec.threadId;
const parameters = startRec.parameters;
if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") {
return err("start record missing name, hash, or threadId");
}
if (parameters === null || typeof parameters !== "object") {
return err("start record missing parameters");
}
const paramsRec = parameters as Record<string, unknown>;
const prompt = paramsRec.prompt;
const options = paramsRec.options;
if (typeof prompt !== "string") {
return err("start record missing parameters.prompt");
}
if (options === null || typeof options !== "object") {
return err("start record missing parameters.options");
}
const optRec = options as Record<string, unknown>;
const maxRounds = optRec.maxRounds;
if (typeof maxRounds !== "number") {
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[], string> {
const roleSteps: ForkHistoricalStep[] = [];
for (let i = 1; i < lines.length; i++) {
const line = lines[i];
if (line === undefined) {
/** Walk {@link StateNode} hashes from head toward the first step (newest → oldest). */
export async function walkStateFramesNewestFirst(
cas: CasStore,
headHash: string,
): Promise<Array<{ hash: string; payload: StateNodePayload }>> {
const frames: Array<{ hash: string; payload: StateNodePayload }> = [];
let cur = headHash;
while (true) {
const yamlText = await cas.get(cur);
if (yamlText === null) {
break;
}
let rec: unknown;
try {
rec = JSON.parse(line) as unknown;
} catch {
return err(`invalid JSON at line ${i + 1}`);
}
if (rec === null || typeof rec !== "object") {
return err(`invalid record at line ${i + 1}`);
}
const recObj = rec as Record<string, unknown>;
const wf = tryParseWorkflowResultRecord(recObj);
if (wf !== null) {
if (i !== lines.length - 1) {
return err("WorkflowResult record must be the final line in `.data.jsonl`");
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
break;
}
const parsed = parseRoleLine(recObj, i + 1);
if (!parsed.ok) {
return parsed;
frames.push({ hash: cur, payload: parsed.node.payload });
const ancestors = parsed.node.payload.ancestors;
if (ancestors.length === 0) {
break;
}
roleSteps.push(parsed.value);
const parent = ancestors[0];
if (parent === undefined || parent === "") {
break;
}
cur = parent;
}
return ok(roleSteps);
return frames;
}
/**
* Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs.
*/
export function parseThreadDataJsonl(text: string): Result<
{
start: ParsedThreadStartRecord;
roleSteps: ForkHistoricalStep[];
},
string
> {
const lines = text
.split("\n")
.map((l) => l.trim())
.filter((l) => l !== "");
if (lines.length === 0) {
return err("thread data is empty");
}
const firstLine = lines[0];
if (firstLine === undefined) {
return err("thread data is empty");
}
const start = parseStartRecordLine(firstLine);
if (!start.ok) {
return start;
}
const roleSteps = parseFollowingRoleLines(lines);
if (!roleSteps.ok) {
return roleSteps;
}
return ok({
start: start.value,
roleSteps: roleSteps.value,
});
}
function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] {
function orderedUniqueRoles(roles: string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const s of roleSteps) {
if (!seen.has(s.role)) {
seen.add(s.role);
out.push(s.role);
for (const r of roles) {
if (!seen.has(r)) {
seen.add(r);
out.push(r);
}
}
return out;
}
/**
* Select historical steps for a fork:
* - `fromRole === null`: drop the last step (retry the last role).
* - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive).
*/
export function selectForkHistoricalSteps(
roleSteps: ForkHistoricalStep[],
async function readPromptText(cas: CasStore, promptHash: string): Promise<Result<string, string>> {
const yamlText = await cas.get(promptHash);
if (yamlText === null) {
return err(`prompt CAS blob missing: ${promptHash}`);
}
let raw: unknown;
try {
raw = parseYaml(yamlText) as unknown;
} catch {
return err(`prompt CAS blob is not valid YAML: ${promptHash}`);
}
if (raw === null || typeof raw !== "object") {
return err(`prompt CAS blob has unexpected shape: ${promptHash}`);
}
const payload = (raw as Record<string, unknown>).payload;
if (typeof payload !== "string") {
return err(`prompt CAS blob missing string payload: ${promptHash}`);
}
return ok(payload);
}
async function readStartWorkflowIdentity(params: {
cas: CasStore;
startHash: string;
}): Promise<
Result<{ workflowName: string; maxRounds: number; depth: number; prompt: string }, string>
> {
const yamlText = await params.cas.get(params.startHash);
if (yamlText === null) {
return err(`start node missing in CAS: ${params.startHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return err(`CAS blob is not a StartNode: ${params.startHash}`);
}
const refs = parsed.node.refs;
const promptHash = refs[0];
if (typeof promptHash !== "string") {
return err("StartNode refs[0] must be the prompt hash");
}
const prompt = await readPromptText(params.cas, promptHash);
if (!prompt.ok) {
return prompt;
}
const p = parsed.node.payload;
return ok({
workflowName: p.name,
maxRounds: p.maxRounds,
depth: p.depth,
prompt: prompt.value,
});
}
async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Promise<RoleOutput> {
let refs: string[] = [];
const blob = await cas.get(payload.content);
if (blob !== null) {
const cn = parseCasThreadNode(blob);
if (cn?.kind === "content") {
refs = [...cn.node.refs];
}
}
return {
role: payload.role,
contentHash: payload.content,
meta: payload.meta,
refs,
};
}
function meaningfulFramesOldestFirst(
newestFirst: Array<{ hash: string; payload: StateNodePayload }>,
): Array<{ hash: string; payload: StateNodePayload }> {
const chronological = [...newestFirst].reverse();
return chronological.filter((f) => f.payload.role !== END && f.payload.role !== FORK_BRANCH_ROLE);
}
function selectForkPointStateHash(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
fromRole: string | null,
): Result<ForkHistoricalStep[], string> {
if (roleSteps.length === 0) {
): Result<string | null, string> {
if (meaningfulOldestFirst.length === 0) {
return err("thread has no completed role steps to fork from");
}
if (fromRole === null) {
if (roleSteps.length === 1) {
return ok([]);
if (meaningfulOldestFirst.length === 1) {
return ok(null);
}
return ok(roleSteps.slice(0, -1));
const forkFrame = meaningfulOldestFirst[meaningfulOldestFirst.length - 2];
if (forkFrame === undefined) {
return err("thread has no completed role steps to fork from");
}
return ok(forkFrame.hash);
}
const idx = roleSteps.findIndex((s) => s.role === fromRole);
const idx = meaningfulOldestFirst.findIndex((f) => f.payload.role === fromRole);
if (idx < 0) {
const available = orderedUniqueRoles(roleSteps);
const available = orderedUniqueRoles(meaningfulOldestFirst.map((f) => f.payload.role));
return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`);
}
return ok(roleSteps.slice(0, idx + 1));
const forkFrame = meaningfulOldestFirst[idx];
if (forkFrame === undefined) {
return err("fork frame missing");
}
return ok(forkFrame.hash);
}
function replayFramesThroughForkPoint(
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
forkPointHash: string | null,
): Array<{ hash: string; payload: StateNodePayload }> {
if (forkPointHash === null) {
return [];
}
const idx = meaningfulOldestFirst.findIndex((f) => f.hash === forkPointHash);
if (idx < 0) {
return [];
}
return meaningfulOldestFirst.slice(0, idx + 1);
}
async function buildForkContinuation(params: {
cas: CasStore;
sourceThreadId: string;
startHash: string;
forkPointStateHash: string | null;
}): Promise<Result<ForkContinuationOptions, string>> {
const { cas, sourceThreadId, startHash, forkPointStateHash } = params;
if (forkPointStateHash === null) {
return ok({
startHash,
forkHeadHash: startHash,
initialChain: EMPTY_CHAIN_STATE,
});
}
const yamlText = await cas.get(forkPointStateHash);
if (yamlText === null) {
return err(`fork point state missing in CAS: ${forkPointStateHash}`);
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "state") {
return err(`fork point blob is not a StateNode: ${forkPointStateHash}`);
}
const fpPayload = parsed.node.payload;
const chainBefore: ChainState = {
parentStateHash: forkPointStateHash,
parentAncestors: fpPayload.ancestors,
};
const ancestorsMarker = computeAncestors(chainBefore);
const emptyContentHash = await putContentNodeWithRefs(cas, "", []);
const markerPayload: StateNodePayload = {
role: FORK_BRANCH_ROLE,
meta: { forkFrom: sourceThreadId },
start: startHash,
content: emptyContentHash,
ancestors: ancestorsMarker,
compact: null,
timestamp: Date.now(),
};
const markerHash = await putStateNode(cas, markerPayload);
const initialChain: ChainState = {
parentStateHash: markerHash,
parentAncestors: ancestorsMarker,
};
return ok({
startHash,
forkHeadHash: markerHash,
initialChain,
});
}
/**
* Read `.data.jsonl` text and compute fork payload for the worker `run` command.
* Prepare a CAS fork: writes the branch marker {@link StateNode}, registers `threads.json`,
* and returns worker payload fields (shared {@link StartNode}, zero ancestor duplication).
*/
export function buildForkPlan(
dataJsonlText: string,
fromRole: string | null,
): Result<ForkPlan, string> {
const parsed = parseThreadDataJsonl(dataJsonlText);
if (!parsed.ok) {
return parsed;
export async function prepareCasFork(params: {
cas: CasStore;
bundleDir: string;
bundleHash: string;
sourceThreadId: string;
headHash: string;
startHash: string;
newThreadId: string;
fromRole: string | null;
}): Promise<Result<CasForkPlan, string>> {
const id = await readStartWorkflowIdentity({
cas: params.cas,
startHash: params.startHash,
});
if (!id.ok) {
return id;
}
const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole);
if (!selected.ok) {
return selected;
const newestFirst = await walkStateFramesNewestFirst(params.cas, params.headHash);
const meaningful = meaningfulFramesOldestFirst(newestFirst);
const forkPoint = selectForkPointStateHash(meaningful, params.fromRole);
if (!forkPoint.ok) {
return forkPoint;
}
const { start } = parsed.value;
const replayFrames = replayFramesThroughForkPoint(meaningful, forkPoint.value);
const steps: RoleOutput[] = [];
const stepTimestamps: number[] = [];
for (const fr of replayFrames) {
steps.push(await payloadToRoleOutput(params.cas, fr.payload));
stepTimestamps.push(fr.payload.timestamp);
}
const cont = await buildForkContinuation({
cas: params.cas,
sourceThreadId: params.sourceThreadId,
startHash: params.startHash,
forkPointStateHash: forkPoint.value,
});
if (!cont.ok) {
return cont;
}
await upsertThreadEntry(params.bundleDir, params.newThreadId, {
head: cont.value.forkHeadHash,
start: params.startHash,
updatedAt: Date.now(),
});
return ok({
workflowName: start.workflowName,
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
workflowName: id.value.workflowName,
hash: params.bundleHash,
sourceThreadId: params.sourceThreadId,
prompt: id.value.prompt,
runOptions: { maxRounds: id.value.maxRounds, depth: id.value.depth },
steps,
stepTimestamps,
forkContinuation: cont.value,
});
}
+129 -69
View File
@@ -1,122 +1,182 @@
import { readdir, readFile } from "node:fs/promises";
import type { Stats } from "node:fs";
import { readdir, readFile, stat } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "@uncaged/workflow-cas";
import { type CasStore, createCasStore, findReachableHashes } from "@uncaged/workflow-cas";
import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util";
import { parseThreadDataJsonl } from "./fork-thread.js";
import type { ThreadHistoryEntry, ThreadIndex } from "./threads-index.js";
import { readThreadsIndex } from "./threads-index.js";
import type { GcResult } from "./types.js";
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseHistoryLine(jsonLine: string): ThreadHistoryEntry | null {
let raw: unknown;
try {
hashes = await readdir(logsRoot);
raw = JSON.parse(jsonLine) as unknown;
} catch {
return null;
}
if (!isPlainObject(raw)) {
return null;
}
const threadId = raw.threadId;
const head = raw.head;
const start = raw.start;
const completedAt = raw.completedAt;
if (
typeof threadId !== "string" ||
typeof head !== "string" ||
typeof start !== "string" ||
typeof completedAt !== "number"
) {
return null;
}
return { threadId, head, start, completedAt };
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: walks threads index + optional history dir
async function collectGcRootsFromBundle(bundleDir: string): Promise<Result<string[], string>> {
const roots: string[] = [];
let activeIndex: ThreadIndex;
try {
activeIndex = await readThreadsIndex(bundleDir);
} catch (e) {
return err(`failed to read threads.json under ${bundleDir}: ${String(e)}`);
}
for (const entry of Object.values(activeIndex)) {
roots.push(entry.head);
roots.push(entry.start);
}
const histDir = join(bundleDir, "history");
let histFiles: string[];
try {
histFiles = await readdir(histDir);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(roots);
}
return err(`failed to read history directory ${histDir}: ${String(e)}`);
}
for (const name of histFiles) {
if (!name.endsWith(".jsonl")) {
continue;
}
let text: string;
try {
text = await readFile(join(histDir, name), "utf8");
} catch (e) {
return err(`failed to read history file ${name}: ${String(e)}`);
}
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
const entry = parseHistoryLine(trimmed);
if (entry === null) {
continue;
}
roots.push(entry.head);
roots.push(entry.start);
}
}
return ok(roots);
}
async function collectAllGcRoots(storageRoot: string): Promise<Result<string[], string>> {
const bundlesRoot = join(storageRoot, "bundles");
let entries: string[];
try {
entries = await readdir(bundlesRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
return err(`failed to read bundles directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
const roots: string[] = [];
for (const name of entries) {
const bundleDir = join(bundlesRoot, name);
let st: Stats;
try {
entries = await readdir(dir);
st = await stat(bundleDir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
if (!st.isDirectory()) {
continue;
}
const chunk = await collectGcRootsFromBundle(bundleDir);
if (!chunk.ok) {
return chunk;
}
roots.push(...chunk.value);
}
paths.sort();
return ok(paths);
return ok(roots);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
async function deleteCasNotMarked(cas: CasStore, marked: ReadonlySet<string>): Promise<string[]> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
throw new Error(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
if (marked.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
throw new Error(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
return deletedHashes;
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
* Mark-and-sweep CAS GC: roots are every `head` / `start` hash from `threads.json` and
* `history/*.jsonl` across bundle dirs; marks closure via `refs[]`; deletes unreachable blobs.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
const rootsResult = await collectAllGcRoots(storageRoot);
if (!rootsResult.ok) {
return rootsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const roots = rootsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
const marked = await findReachableHashes(roots, cas);
let deletedHashes: string[];
try {
deletedHashes = await deleteCasNotMarked(cas, marked);
} catch (e) {
return err(String(e));
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
scannedThreads: roots.length,
activeRefs: marked.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
+10 -7
View File
@@ -1,11 +1,10 @@
export { createWorkflow } from "./create-workflow.js";
export { executeThread } from "./engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
@@ -13,18 +12,22 @@ export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./thread
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./types.js";
export { EMPTY_CHAIN_STATE } from "./types.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
@@ -1,6 +1,8 @@
import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-util";
/**
* Active-thread index entry stored in `<bundleDir>/threads.json`.
*
@@ -71,7 +73,8 @@ function parseThreadIndex(text: string): ThreadIndex {
return out;
}
async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
/** Read `<bundleDir>/threads.json` (empty object when missing or invalid). */
export async function readThreadsIndex(bundleDir: string): Promise<ThreadIndex> {
const path = threadsJsonPath(bundleDir);
let text: string;
try {
@@ -86,7 +89,7 @@ async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
return parseThreadIndex(text);
}
async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
export async function writeThreadsIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
const path = threadsJsonPath(bundleDir);
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
@@ -101,19 +104,19 @@ export async function upsertThreadEntry(
threadId: string,
entry: ThreadIndexEntry,
): Promise<void> {
const index = await readThreadIndex(bundleDir);
const index = await readThreadsIndex(bundleDir);
index[threadId] = entry;
await writeThreadIndex(bundleDir, index);
await writeThreadsIndex(bundleDir, index);
}
/** Remove a thread entry from `threads.json` (no-op when absent). */
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
const index = await readThreadIndex(bundleDir);
const index = await readThreadsIndex(bundleDir);
if (!(threadId in index)) {
return;
}
delete index[threadId];
await writeThreadIndex(bundleDir, index);
await writeThreadsIndex(bundleDir, index);
}
function dateKey(epochMs: number): string {
@@ -134,3 +137,63 @@ export async function appendThreadHistoryEntry(
const line = `${JSON.stringify(entry)}\n`;
await appendFile(path, line, "utf8");
}
/** Removes every `history/*.jsonl` line whose `threadId` matches (rewrite files in place). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: per-file JSONL filtering keeps RM deterministic
export async function removeThreadHistoryEntries(
bundleDir: string,
threadId: string,
): Promise<Result<number, string>> {
const histRoot = join(bundleDir, "history");
let files: string[];
try {
files = await readdir(histRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok(0);
}
return err(`failed to read history directory: ${String(e)}`);
}
let removed = 0;
for (const name of files) {
if (!name.endsWith(".jsonl")) {
continue;
}
const path = join(histRoot, name);
let text: string;
try {
text = await readFile(path, "utf8");
} catch {
continue;
}
const kept: string[] = [];
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
kept.push(`${trimmed}\n`);
continue;
}
if (rec === null || typeof rec !== "object") {
kept.push(`${trimmed}\n`);
continue;
}
const id = (rec as Record<string, unknown>).threadId;
if (id === threadId) {
removed++;
continue;
}
kept.push(`${trimmed}\n`);
}
await writeFile(path, kept.join(""), "utf8");
}
return ok(removed);
}
+33 -16
View File
@@ -11,7 +11,25 @@ export type ExecuteThreadIo = {
cas: CasStore;
};
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
/** CAS chain tail state before the next appended {@link StateNode}. */
export type ChainState = {
parentStateHash: string | null;
parentAncestors: readonly string[];
};
export const EMPTY_CHAIN_STATE: ChainState = { parentStateHash: null, parentAncestors: [] };
/**
* When forking, the worker continues from an existing {@link StartNode} plus an optional
* branch marker {@link StateNode} instead of allocating a new start blob.
*/
export type ForkContinuationOptions = {
startHash: string;
forkHeadHash: string;
initialChain: ChainState;
};
/** One replayed role step (prefill) before the generator runs (same layout as disk replay rows). */
export type PrefilledDiskStep = {
role: string;
contentHash: string;
@@ -30,37 +48,36 @@ export type ExecuteThreadOptions = {
/** When non-null, written into the start record so tooling can trace lineage. */
forkSourceThreadId: string | null;
/**
* Written to `.data.jsonl` immediately after the start record, before the generator runs.
* When non-null, replays these steps into CAS before the generator runs.
* Must match `input.steps` length and order when present.
*/
prefilledDiskSteps: PrefilledDiskStep[] | null;
/** When non-null, skip creating a new {@link StartNode} and continue this CAS chain. */
forkContinuation: ForkContinuationOptions | null;
/**
* When non-null, must match `input.steps.length`; supplies persisted timestamps for
* {@link ThreadContext.steps} (used when restoring history without prefilled CAS replay).
*/
replayTimestamps: readonly number[] | null;
/** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */
storageRoot: string;
};
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
export type ParsedThreadStartRecord = {
workflowName: string;
hash: string;
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
export type ForkPlan = {
export type CasForkPlan = {
workflowName: string;
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
steps: RoleOutput[];
stepTimestamps: number[];
forkContinuation: ForkContinuationOptions;
};
export type GcResult = {
/** Count of root hashes seeded from thread indexes (`head`/`start` per entry). */
scannedThreads: number;
/** Reachable CAS blobs after the mark phase. */
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
+75 -3
View File
@@ -17,7 +17,12 @@ import {
} from "@uncaged/workflow-util";
import { executeThread } from "./engine.js";
import { createThreadPauseGate } from "./thread-pause-gate.js";
import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js";
import type {
ExecuteThreadIo,
ForkContinuationOptions,
PrefilledDiskStep,
ThreadPauseGate,
} from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
@@ -28,9 +33,10 @@ type RunCommand = {
prompt: string;
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
/** Timestamps aligned with `steps` for replay / fork restore; length must match `steps` when steps are non-empty. */
stepTimestamps: number[] | null;
forkSourceThreadId: string | null;
forkContinuation: ForkContinuationOptions | null;
};
type KillCommand = {
@@ -73,6 +79,7 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
};
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: mirrors permissive worker IPC decoding shape checks
function parseRunStepsPayload(rec: Record<string, unknown>): {
steps: RoleOutput[];
stepTimestamps: number[] | null;
@@ -107,12 +114,60 @@ function parseRunStepsPayload(rec: Record<string, unknown>): {
return null;
}
}
const parallelTsRaw = rec.stepTimestamps;
if (
steps.length > 0 &&
Array.isArray(parallelTsRaw) &&
parallelTsRaw.length === steps.length &&
parallelTsRaw.every((x): x is number => typeof x === "number")
) {
return { steps, stepTimestamps: [...parallelTsRaw] };
}
return {
steps,
stepTimestamps: anyTimestamp ? timestamps : null,
};
}
function parseForkContinuation(rec: Record<string, unknown>): ForkContinuationOptions | null {
const raw = rec.forkContinuation;
if (raw === undefined || raw === null) {
return null;
}
if (typeof raw !== "object") {
return null;
}
const o = raw as Record<string, unknown>;
const startHash = o.startHash;
const forkHeadHash = o.forkHeadHash;
const ic = o.initialChain;
if (typeof startHash !== "string" || typeof forkHeadHash !== "string") {
return null;
}
if (ic === null || typeof ic !== "object") {
return null;
}
const ich = ic as Record<string, unknown>;
const pph = ich.parentStateHash;
const pa = ich.parentAncestors;
if (!(pph === null || typeof pph === "string")) {
return null;
}
if (!Array.isArray(pa) || !pa.every((x) => typeof x === "string")) {
return null;
}
return {
startHash,
forkHeadHash,
initialChain: {
parentStateHash: pph,
parentAncestors: pa,
},
};
}
function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null {
const threadId = rec.threadId;
const workflowName = rec.workflowName;
@@ -148,6 +203,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
}
forkSourceThreadId = rawFork;
}
const forkContinuation = parseForkContinuation(rec);
return {
type: "run",
threadId,
@@ -157,6 +213,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
forkContinuation,
};
}
@@ -357,6 +414,7 @@ async function main(): Promise<void> {
}
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: TCP worker multiplexes lifecycle + runs
async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise<void> {
if (cmd.type !== "run") {
dispatchThreadLifecycleCommand(threads, socket, cmd);
@@ -394,7 +452,19 @@ async function main(): Promise<void> {
const baseTs = Date.now();
let prefilledDiskSteps: PrefilledDiskStep[] | null = null;
if (cmd.steps.length > 0) {
let replayTimestamps: readonly number[] | null = null;
if (cmd.forkContinuation !== null) {
if (
cmd.steps.length > 0 &&
(cmd.stepTimestamps === null || cmd.stepTimestamps.length !== cmd.steps.length)
) {
bootLog("J5WQ8NXT", "forkContinuation requires stepTimestamps aligned with steps");
throw new Error("forkContinuation requires stepTimestamps aligned with steps");
}
replayTimestamps =
cmd.steps.length === 0 ? null : (cmd.stepTimestamps as readonly number[]);
} else if (cmd.steps.length > 0) {
prefilledDiskSteps = cmd.steps.map((step, i) => {
const ts = cmd.stepTimestamps?.[i];
return {
@@ -417,6 +487,8 @@ async function main(): Promise<void> {
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
forkSourceThreadId: cmd.forkSourceThreadId,
prefilledDiskSteps,
forkContinuation: cmd.forkContinuation,
replayTimestamps,
storageRoot,
},
io,
+21 -7
View File
@@ -1,25 +1,39 @@
export { createWorkflow } from "./engine/create-workflow.js";
export { executeThread } from "./engine/engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
FORK_BRANCH_ROLE,
prepareCasFork,
tryParseWorkflowResultRecord,
walkStateFramesNewestFirst,
} from "./engine/fork-thread.js";
export { garbageCollectCas } from "./engine/gc.js";
export { createThreadPauseGate } from "./engine/thread-pause-gate.js";
export type {
ThreadHistoryEntry,
ThreadIndex,
ThreadIndexEntry,
} from "./engine/threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
readThreadsIndex,
removeThreadEntry,
removeThreadHistoryEntries,
upsertThreadEntry,
writeThreadsIndex,
} from "./engine/threads-index.js";
export type {
CasForkPlan,
ChainState,
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
ForkContinuationOptions,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./engine/types.js";
export { EMPTY_CHAIN_STATE } from "./engine/types.js";
export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export {
@@ -101,6 +101,8 @@ export function workflowAsAgent(
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot,
},
io,