feat: Phase 2 — engine write path (CAS nodes + threads.json)

- Engine writes StartNode, StateNode, ContentMerkleNode as CAS blobs
- threads.json tracks active threads, completed → history/{date}.jsonl
- No more .data.jsonl writes
- ancestors skip-list: [parent, ...parentAncestors] capped at 11
- Tests: 4 pass (engine write path)

Refs #155, closes #157

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-09 07:53:44 +00:00
parent 6f000512d2
commit 81c582ae0e
13 changed files with 884 additions and 136 deletions
@@ -0,0 +1,317 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import type {
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { parse as parseYaml } from "yaml";
import { executeThread } from "../src/engine/engine.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
const TEST_REGISTRY_YAML = `config:
maxDepth: 3
supervisorInterval: 0
providers:
stub:
baseUrl: http://127.0.0.1:9
apiKey: test
models:
default: stub/m
workflows: {}
`;
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
maxRounds: 5,
depth: 0,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
storageRoot: "/tmp/never",
...overrides,
};
}
async function setupStorage(): Promise<{
storageRoot: string;
casDir: string;
bundleHash: string;
bundleDir: string;
}> {
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-engine-"));
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
const bundleHash = "TESTHASH00001";
const bundleDir = join(storageRoot, "bundles", bundleHash);
return { storageRoot, casDir, bundleHash, bundleDir };
}
function readCasNode(casDir: string, hash: string): Record<string, unknown> {
const text = require("node:fs").readFileSync(join(casDir, `${hash}.txt`), "utf8") as string;
return parseYaml(text) as Record<string, unknown>;
}
describe("executeThread (Phase 2 — CAS thread storage)", () => {
let storageRoot: string;
let casDir: string;
let bundleHash: string;
let bundleDir: string;
beforeEach(async () => {
const setup = await setupStorage();
storageRoot = setup.storageRoot;
casDir = setup.casDir;
bundleHash = setup.bundleHash;
bundleDir = setup.bundleDir;
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("writes a StartNode whose refs[0] is the prompt CAS hash", async () => {
const cas = createCasStore(casDir);
// biome-ignore lint/correctness/useYield: deliberately empty generator — exercises the start/end path with no role steps
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "no-op" };
};
const io: ExecuteThreadIo = {
threadId: "T01",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T01.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "hello", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const historyText = await readFile(
(await import("node:fs/promises")).readdir ? await firstHistoryFile(bundleDir) : "",
"utf8",
);
const histLine = historyText.trim().split("\n")[0] ?? "";
const histEntry = JSON.parse(histLine) as Record<string, unknown>;
expect(histEntry.threadId).toBe("T01");
const startHash = histEntry.start as string;
const startNode = readCasNode(casDir, startHash);
expect(startNode.type).toBe("start");
expect((startNode.payload as Record<string, unknown>).name).toBe("demo");
expect((startNode.payload as Record<string, unknown>).hash).toBe(bundleHash);
expect((startNode.payload as Record<string, unknown>).maxRounds).toBe(5);
const refs = startNode.refs as string[];
expect(refs.length).toBe(1);
const promptBlob = await cas.get(refs[0] ?? "");
expect(promptBlob).not.toBeNull();
const promptParsed = parseYaml(promptBlob ?? "") as Record<string, unknown>;
expect(promptParsed.payload).toBe("hello");
});
test("each role yield produces a chained StateNode and updates threads.json head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("plan-text");
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
const h2 = await runtime.cas.put("code-text");
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T02",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T02.info.jsonl"),
cas,
};
let observedHead: string | null = null;
let observedHeadAtSecondYield: string | null = null;
const opts = makeOptions({
storageRoot,
maxRounds: 5,
awaitAfterEachYield: async () => {
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, { head: string }>;
const head = parsed.T02?.head ?? null;
if (observedHead === null) {
observedHead = head;
} else if (observedHeadAtSecondYield === null) {
observedHeadAtSecondYield = head;
}
},
});
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
opts,
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
expect(observedHead).not.toBeNull();
expect(observedHeadAtSecondYield).not.toBeNull();
expect(observedHead).not.toBe(observedHeadAtSecondYield);
const firstState = readCasNode(casDir, observedHead ?? "");
expect(firstState.type).toBe("state");
expect((firstState.payload as Record<string, unknown>).role).toBe("planner");
expect((firstState.payload as Record<string, unknown>).ancestors).toEqual([]);
const secondState = readCasNode(casDir, observedHeadAtSecondYield ?? "");
expect(secondState.type).toBe("state");
expect((secondState.payload as Record<string, unknown>).role).toBe("coder");
expect((secondState.payload as Record<string, unknown>).ancestors).toEqual([observedHead]);
expect((secondState.payload as Record<string, unknown>).start).toBe(
(firstState.payload as Record<string, unknown>).start,
);
});
test("on completion: removes threads.json entry, appends history with __end__ head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("only-step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "completed" };
};
const io: ExecuteThreadIo = {
threadId: "T03",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T03.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const indexText = await readFile(join(bundleDir, "threads.json"), "utf8");
const indexParsed = JSON.parse(indexText) as Record<string, unknown>;
expect(indexParsed).toEqual({});
const historyPath = await firstHistoryFile(bundleDir);
const historyText = await readFile(historyPath, "utf8");
const lines = historyText.trim().split("\n");
expect(lines.length).toBe(1);
const entry = JSON.parse(lines[0] ?? "") as Record<string, unknown>;
expect(entry.threadId).toBe("T03");
expect(entry.head).toBe(result.rootHash);
const endNode = readCasNode(casDir, String(entry.head));
expect(endNode.type).toBe("state");
expect((endNode.payload as Record<string, unknown>).role).toBe("__end__");
expect((endNode.payload as Record<string, unknown>).meta).toEqual({
returnCode: 0,
summary: "completed",
});
});
test("does not write any .data.jsonl file under storageRoot", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T04",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T04.info.jsonl"),
cas,
};
await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
const fsp = await import("node:fs/promises");
const found: string[] = [];
async function walk(dir: string): Promise<void> {
let entries: { name: string; isDirectory: () => boolean; isFile: () => boolean }[];
try {
entries = await fsp.readdir(dir, { withFileTypes: true });
} catch {
return;
}
for (const ent of entries) {
const p = join(dir, ent.name);
if (ent.isDirectory()) {
await walk(p);
} else if (ent.isFile() && ent.name.endsWith(".data.jsonl")) {
found.push(p);
}
}
}
await walk(storageRoot);
expect(found).toEqual([]);
});
});
async function firstHistoryFile(bundleDir: string): Promise<string> {
const fsp = await import("node:fs/promises");
const dir = join(bundleDir, "history");
const entries = await fsp.readdir(dir);
const file = entries.find((n) => n.endsWith(".jsonl"));
if (file === undefined) {
throw new Error(`no history file under ${dir}`);
}
return join(dir, file);
}
@@ -0,0 +1,91 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, readdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
appendThreadHistoryEntry,
removeThreadEntry,
upsertThreadEntry,
} from "../src/engine/threads-index.js";
describe("threads-index", () => {
let bundleDir: string;
beforeEach(async () => {
bundleDir = await mkdtemp(join(tmpdir(), "uncaged-wf-threads-"));
});
afterEach(async () => {
await rm(bundleDir, { recursive: true, force: true });
});
test("upsertThreadEntry creates threads.json and persists entries", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1", start: "S1", updatedAt: 100 },
});
});
test("upsertThreadEntry overwrites the head while preserving siblings", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await upsertThreadEntry(bundleDir, "T1", { head: "H1B", start: "S1", updatedAt: 300 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1B", start: "S1", updatedAt: 300 },
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry deletes the entry but keeps the file", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await removeThreadEntry(bundleDir, "T1");
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry on a missing thread is a no-op", async () => {
await removeThreadEntry(bundleDir, "MISSING");
const dirEntries = await readdir(bundleDir);
expect(dirEntries.includes("threads.json")).toBe(false);
});
test("appendThreadHistoryEntry writes one JSONL line per call into a date-keyed file", async () => {
const ts = Date.UTC(2026, 4, 9, 12, 0, 0);
await appendThreadHistoryEntry(bundleDir, {
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
await appendThreadHistoryEntry(bundleDir, {
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
const text = await readFile(join(bundleDir, "history", "2026-05-09.jsonl"), "utf8");
const lines = text.trim().split("\n");
expect(lines.length).toBe(2);
expect(JSON.parse(lines[0] ?? "{}")).toEqual({
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
expect(JSON.parse(lines[1] ?? "{}")).toEqual({
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
});
});
+225 -118
View File
@@ -1,5 +1,18 @@
import { appendFile, mkdir } from "node:fs/promises";
import { mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import {
type CasStore,
getContentMerklePayload,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNode } from "@uncaged/workflow-protocol";
import {
readWorkflowRegistry,
resolveModel,
type WorkflowConfig,
} from "@uncaged/workflow-register";
import type {
LlmProvider,
RoleOutput,
@@ -9,21 +22,38 @@ import type {
WorkflowResult,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { START } from "@uncaged/workflow-runtime";
import {
type CasStore,
getContentMerklePayload,
putStepMerkleNode,
putThreadMerkleNode,
} from "@uncaged/workflow-cas";
import { resolveModel } from "@uncaged/workflow-register";
import { createExtract } from "../extract/index.js";
import { readWorkflowRegistry, type WorkflowConfig } from "@uncaged/workflow-register";
import { err, type LogFn, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import { END, START } from "@uncaged/workflow-runtime";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import { createExtract } from "../extract/index.js";
import { runSupervisor } from "./supervisor.js";
import {
appendThreadHistoryEntry,
getBundleDir,
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
type ChainState = {
/** State hash of the most recently written {@link StateNode}, or `null` before the first step. */
parentStateHash: string | null;
/** Ancestors recorded on the most recently written {@link StateNode}. */
parentAncestors: readonly string[];
};
const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] };
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
async function resolveEngineRegistryRuntime(
storageRoot: string,
cas: CasStore,
@@ -57,51 +87,108 @@ async function resolveEngineRegistryRuntime(
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
const line = `${JSON.stringify(record)}\n`;
await appendFile(path, line, "utf8");
async function appendStateForStep(params: {
cas: CasStore;
startHash: string;
chain: ChainState;
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: readonly string[];
timestamp: number;
}): Promise<{ stateHash: string; chain: ChainState }> {
const text = await getContentMerklePayload(params.cas, params.contentHash);
if (text === null) {
throw new Error(
`role step ${params.role}: CAS blob missing for contentHash ${params.contentHash}`,
);
}
const artifactRefs = params.refs.filter((r) => r !== params.contentHash);
const contentHash = await putContentNodeWithRefs(params.cas, text, artifactRefs);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: params.role,
meta: params.meta,
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
const stateHash = await putStateNode(params.cas, payload);
return {
stateHash,
chain: { parentStateHash: stateHash, parentAncestors: ancestors },
};
}
async function finalizeThreadResult(params: {
async function appendEndState(params: {
cas: CasStore;
workflowName: string;
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
timestamp: number;
}): Promise<string> {
const contentHash = await putContentNodeWithRefs(params.cas, params.completion.summary, []);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: END,
meta: { returnCode: params.completion.returnCode, summary: params.completion.summary },
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
return putStateNode(params.cas, payload);
}
async function finalizeThread(params: {
cas: CasStore;
bundleDir: string;
threadId: string;
stepMerkleHashes: readonly string[];
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
}): Promise<WorkflowResult> {
const rootHash = await putThreadMerkleNode(
params.cas,
{
workflow: params.workflowName,
threadId: params.threadId,
result: {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
},
},
params.stepMerkleHashes,
);
const ts = Date.now();
const endHash = await appendEndState({
cas: params.cas,
startHash: params.startHash,
chain: params.chain,
completion: params.completion,
timestamp: ts,
});
await removeThreadEntry(params.bundleDir, params.threadId);
await appendThreadHistoryEntry(params.bundleDir, {
threadId: params.threadId,
head: endHash,
start: params.startHash,
completedAt: ts,
});
return {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
rootHash,
rootHash: endHash,
};
}
async function finalizeAbortedThread(params: {
cas: CasStore;
workflowName: string;
bundleDir: string;
threadId: string;
stepMerkleHashes: string[];
startHash: string;
chain: ChainState;
logger: LogFn;
abortLogTag: string;
}): Promise<WorkflowResult> {
params.logger(params.abortLogTag, `thread ${params.threadId} aborted`);
return finalizeThreadResult({
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
@@ -114,8 +201,9 @@ async function maybeSupervisorHaltsThread(params: {
logger: LogFn;
threadId: string;
cas: CasStore;
workflowName: string;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult | null> {
const interval = params.workflowConfig.supervisorInterval;
if (interval <= 0 || params.written % interval !== 0) {
@@ -135,41 +223,55 @@ async function maybeSupervisorHaltsThread(params: {
return null;
}
params.logger("M4QX8VHN", `thread ${params.threadId} stopped by supervisor`);
return finalizeThreadResult({
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 0, summary: "completed: supervisor stopped thread" },
});
}
async function publishHead(params: {
bundleDir: string;
threadId: string;
startHash: string;
headHash: string;
}): Promise<void> {
await upsertThreadEntry(params.bundleDir, params.threadId, {
head: params.headHash,
start: params.startHash,
updatedAt: Date.now(),
});
}
async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
workflowConfig: WorkflowConfig;
thread: ThreadContext;
runtime: WorkflowRuntime;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
logger: LogFn;
cas: CasStore;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult> {
const {
fn,
workflowName,
workflowConfig,
thread,
runtime,
executeOptions,
dataJsonlPath,
threadId,
logger,
cas,
stepMerkleHashes,
bundleDir,
startHash,
} = params;
let chain: ChainState = params.chain;
const gen = fn(thread, runtime);
let written = 0;
const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({
@@ -181,9 +283,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP2",
});
@@ -191,11 +294,12 @@ async function driveWorkflowGenerator(params: {
if (written >= executeOptions.maxRounds) {
logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
@@ -207,39 +311,31 @@ async function driveWorkflowGenerator(params: {
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
const completion = iterResult.value;
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
completion,
startHash,
chain,
completion: iterResult.value,
});
}
written++;
const step = iterResult.value;
const resolved = await getContentMerklePayload(cas, step.contentHash);
if (resolved === null) {
throw new Error(
`role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`,
);
}
const ts = Date.now();
await appendDataLine(dataJsonlPath, {
const written_ = await appendStateForStep({
cas,
startHash,
chain,
role: step.role,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
refs: step.refs,
timestamp: ts,
});
const stepNodeHash = await putStepMerkleNode(
cas,
{ role: step.role, meta: step.meta },
step.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written_.chain;
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`);
@@ -262,9 +358,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP4",
});
@@ -278,8 +375,9 @@ async function driveWorkflowGenerator(params: {
logger,
threadId,
cas,
workflowName,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
if (supervised !== null) {
return supervised;
@@ -288,8 +386,16 @@ async function driveWorkflowGenerator(params: {
}
/**
* Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records,
* debug lines via `logger` to `.info.jsonl`.
* Execute a workflow thread by driving the bundle's `AsyncGenerator`.
*
* Persistence layout (RFC v3 — CAS-based thread storage):
* - Thread chain is written as immutable CAS blobs: a single {@link StartNode}
* plus one {@link StateNode} per role step (including a final `__end__`
* state on completion / abort / `maxRounds`).
* - The active thread head is published in `<bundleDir>/threads.json`; on
* completion it is removed and a record is appended to
* `<bundleDir>/history/{YYYY-MM-DD}.jsonl`.
* - Debug logging continues to flow through `logger` to `.info.jsonl`.
*/
export async function executeThread(
fn: WorkflowFn,
@@ -299,7 +405,6 @@ export async function executeThread(
io: ExecuteThreadIo,
logger: LogFn,
): Promise<WorkflowResult> {
await mkdir(dirname(io.dataJsonlPath), { recursive: true });
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const prefilled = options.prefilledDiskSteps;
@@ -309,61 +414,63 @@ export async function executeThread(
);
}
const nowMs = Date.now();
const startRecord: Record<string, unknown> = {
name: workflowName,
hash: io.hash,
threadId: io.threadId,
parameters: {
prompt: input.prompt,
options: {
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
};
if (options.forkSourceThreadId !== null) {
startRecord.forkFrom = { threadId: options.forkSourceThreadId };
}
const bundleDir = getBundleDir(options.storageRoot, io.hash);
await appendDataLine(io.dataJsonlPath, startRecord);
const promptHash = await io.cas.put(input.prompt);
const startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
const stepMerkleHashes: string[] = [];
let chain: ChainState = EMPTY_CHAIN;
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
if (prefilledPayload === null) {
throw new Error(
`prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`,
);
}
await appendDataLine(io.dataJsonlPath, {
const written = await appendStateForStep({
cas: io.cas,
startHash,
chain,
role: row.role,
contentHash: row.contentHash,
meta: row.meta,
refs: normalizeRefsField(row.refs),
refs: row.refs,
timestamp: row.timestamp,
});
const stepNodeHash = await putStepMerkleNode(
io.cas,
{ role: row.role, meta: row.meta },
row.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written.chain;
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: written.stateHash,
});
}
}
const nowMs = Date.now();
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas: io.cas,
workflowName,
bundleDir,
threadId: io.threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
@@ -401,15 +508,15 @@ export async function executeThread(
return await driveWorkflowGenerator({
fn,
workflowName,
workflowConfig: registryRuntime.value.workflowConfig,
thread,
runtime,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
logger,
cas: io.cas,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
}
@@ -9,6 +9,13 @@ export {
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
export type {
ExecuteThreadIo,
ExecuteThreadOptions,
@@ -75,7 +75,7 @@ export async function runSupervisor(
});
if (!result.ok) {
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
args.logger("R9CW4PHM", `supervisor failed: ${result.error}`);
return err(`supervisor: ${result.error}`);
}
@@ -0,0 +1,136 @@
import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
/**
* Active-thread index entry stored in `<bundleDir>/threads.json`.
*
* Once the thread reaches `__end__`, the entry is removed from `threads.json`
* and a corresponding line is appended to `history/{YYYY-MM-DD}.jsonl`.
*/
export type ThreadIndexEntry = {
head: string;
start: string;
updatedAt: number;
};
export type ThreadHistoryEntry = {
threadId: string;
head: string;
start: string;
completedAt: number;
};
export type ThreadIndex = Record<string, ThreadIndexEntry>;
export function getBundleDir(storageRoot: string, bundleHash: string): string {
return join(storageRoot, "bundles", bundleHash);
}
function threadsJsonPath(bundleDir: string): string {
return join(bundleDir, "threads.json");
}
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseThreadIndexEntry(raw: unknown): ThreadIndexEntry | null {
if (!isPlainObject(raw)) {
return null;
}
const head = raw.head;
const start = raw.start;
const updatedAt = raw.updatedAt;
if (typeof head !== "string" || typeof start !== "string" || typeof updatedAt !== "number") {
return null;
}
return { head, start, updatedAt };
}
function parseThreadIndex(text: string): ThreadIndex {
const trimmed = text.trim();
if (trimmed === "") {
return {};
}
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
} catch {
return {};
}
if (!isPlainObject(raw)) {
return {};
}
const out: ThreadIndex = {};
for (const [k, v] of Object.entries(raw)) {
const entry = parseThreadIndexEntry(v);
if (entry !== null) {
out[k] = entry;
}
}
return out;
}
async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
const path = threadsJsonPath(bundleDir);
let text: string;
try {
text = await readFile(path, "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return {};
}
throw e;
}
return parseThreadIndex(text);
}
async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
const path = threadsJsonPath(bundleDir);
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
const json = `${JSON.stringify(index, null, 2)}\n`;
await writeFile(tmp, json, "utf8");
await rename(tmp, path);
}
/** Insert/update a thread entry in `threads.json`. */
export async function upsertThreadEntry(
bundleDir: string,
threadId: string,
entry: ThreadIndexEntry,
): Promise<void> {
const index = await readThreadIndex(bundleDir);
index[threadId] = entry;
await writeThreadIndex(bundleDir, index);
}
/** Remove a thread entry from `threads.json` (no-op when absent). */
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
const index = await readThreadIndex(bundleDir);
if (!(threadId in index)) {
return;
}
delete index[threadId];
await writeThreadIndex(bundleDir, index);
}
function dateKey(epochMs: number): string {
const d = new Date(epochMs);
const y = d.getUTCFullYear().toString().padStart(4, "0");
const m = (d.getUTCMonth() + 1).toString().padStart(2, "0");
const day = d.getUTCDate().toString().padStart(2, "0");
return `${y}-${m}-${day}`;
}
/** Append a completion record to `history/{YYYY-MM-DD}.jsonl` keyed off `completedAt`. */
export async function appendThreadHistoryEntry(
bundleDir: string,
entry: ThreadHistoryEntry,
): Promise<void> {
const path = join(bundleDir, "history", `${dateKey(entry.completedAt)}.jsonl`);
await mkdir(dirname(path), { recursive: true });
const line = `${JSON.stringify(entry)}\n`;
await appendFile(path, line, "utf8");
}
@@ -1,5 +1,5 @@
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { CasStore } from "@uncaged/workflow-cas";
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { Result } from "@uncaged/workflow-util";
export type SupervisorDecision = "continue" | "stop";
@@ -7,7 +7,6 @@ export type SupervisorDecision = "continue" | "stop";
export type ExecuteThreadIo = {
threadId: string;
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
cas: CasStore;
};
@@ -1,7 +1,7 @@
import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import type { RoleOutput, WorkflowFn, WorkflowResult } from "@uncaged/workflow-runtime";
import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import {
@@ -364,13 +364,11 @@ async function main(): Promise<void> {
const threadId = cmd.threadId;
const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`);
const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId,
hash,
dataJsonlPath,
infoJsonlPath,
cas,
};
@@ -387,7 +385,6 @@ async function main(): Promise<void> {
try {
await mkdir(dirname(runningPath), { recursive: true });
await mkdir(dirname(dataJsonlPath), { recursive: true });
await writeFile(runningPath, "", "utf8");
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
@@ -407,7 +404,7 @@ async function main(): Promise<void> {
});
}
const runResult = await executeThread(
await executeThread(
workflowFn,
cmd.workflowName,
{ prompt: cmd.prompt, steps: cmd.steps },
@@ -422,12 +419,9 @@ async function main(): Promise<void> {
io,
logger,
);
await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" };
await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {});
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
@@ -74,13 +74,11 @@ export function workflowAsAgent(
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
cas: createCasStore(getGlobalCasDir(storageRoot)),
};