From 81c582ae0e79a9390dba65cc11311d1c425d1e10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 9 May 2026 07:53:44 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=202=20=E2=80=94=20engine=20write?= =?UTF-8?q?=20path=20(CAS=20nodes=20+=20threads.json)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Engine writes StartNode, StateNode, ContentMerkleNode as CAS blobs - threads.json tracks active threads, completed → history/{date}.jsonl - No more .data.jsonl writes - ancestors skip-list: [parent, ...parentAncestors] capped at 11 - Tests: 4 pass (engine write path) Refs #155, closes #157 小橘 --- packages/workflow-cas/src/cas.ts | 4 + packages/workflow-cas/src/index.ts | 7 + packages/workflow-cas/src/merkle.ts | 17 +- packages/workflow-cas/src/nodes.ts | 79 ++++ .../workflow-execute/__tests__/engine.test.ts | 317 ++++++++++++++++ .../__tests__/threads-index.test.ts | 91 +++++ .../workflow-execute/src/engine/engine.ts | 343 ++++++++++++------ packages/workflow-execute/src/engine/index.ts | 7 + .../workflow-execute/src/engine/supervisor.ts | 2 +- .../src/engine/threads-index.ts | 136 +++++++ packages/workflow-execute/src/engine/types.ts | 3 +- .../workflow-execute/src/engine/worker.ts | 12 +- .../workflow-execute/src/workflow-as-agent.ts | 2 - 13 files changed, 884 insertions(+), 136 deletions(-) create mode 100644 packages/workflow-cas/src/nodes.ts create mode 100644 packages/workflow-execute/__tests__/engine.test.ts create mode 100644 packages/workflow-execute/__tests__/threads-index.test.ts create mode 100644 packages/workflow-execute/src/engine/threads-index.ts diff --git a/packages/workflow-cas/src/cas.ts b/packages/workflow-cas/src/cas.ts index b6a1cce..e84b0a4 100644 --- a/packages/workflow-cas/src/cas.ts +++ b/packages/workflow-cas/src/cas.ts @@ -3,10 +3,14 @@ import { join } from "node:path"; import { hashString } from "./hash.js"; import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "./merkle.js"; +import { isCasNodeYaml } from "./nodes.js"; import type { CasStore } from "./types.js"; /** Raw strings become content merkle YAML; already-valid merkle documents pass through. */ function normalizeCasPutContent(content: string): string { + if (isCasNodeYaml(content)) { + return content; + } try { parseMerkleNode(content); return content; diff --git a/packages/workflow-cas/src/index.ts b/packages/workflow-cas/src/index.ts index b993d45..bbea3cf 100644 --- a/packages/workflow-cas/src/index.ts +++ b/packages/workflow-cas/src/index.ts @@ -10,6 +10,13 @@ export { putThreadMerkleNode, serializeMerkleNode, } from "./merkle.js"; +export { + isCasNodeYaml, + putContentNodeWithRefs, + putStartNode, + putStateNode, + serializeCasNode, +} from "./nodes.js"; export { findReachableHashes } from "./reachable.js"; export type { CasStore, diff --git a/packages/workflow-cas/src/merkle.ts b/packages/workflow-cas/src/merkle.ts index 178f51e..6dba6fd 100644 --- a/packages/workflow-cas/src/merkle.ts +++ b/packages/workflow-cas/src/merkle.ts @@ -82,7 +82,12 @@ export async function putContentMerkleNode(store: CasStore, content: string): Pr return store.put(content); } -/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */ +/** + * Loads a CAS blob and returns the payload string for a `content` node. + * + * Accepts both the legacy `{type:content, payload, children}` Merkle layout + * and the RFC v3 `{type:content, payload, refs}` content node layout. + */ export async function getContentMerklePayload( store: CasStore, hash: string, @@ -91,9 +96,13 @@ export async function getContentMerklePayload( if (yamlText === null) { return null; } - const node = parseMerkleNode(yamlText); - if (node.type !== "content" || typeof node.payload !== "string") { + const raw = parse(yamlText) as unknown; + if (raw === null || typeof raw !== "object") { return null; } - return node.payload; + const rec = raw as Record; + if (rec.type !== "content" || typeof rec.payload !== "string") { + return null; + } + return rec.payload; } diff --git a/packages/workflow-cas/src/nodes.ts b/packages/workflow-cas/src/nodes.ts new file mode 100644 index 0000000..f821a26 --- /dev/null +++ b/packages/workflow-cas/src/nodes.ts @@ -0,0 +1,79 @@ +import type { ContentMerkleNode, StartNode, StateNode } from "@uncaged/workflow-protocol"; +import { parse, stringify } from "yaml"; + +import { collectRefs } from "./collect-refs.js"; +import type { CasStore } from "./types.js"; + +/** YAML-serialize a CAS node carrying `{type, payload, refs}` (RFC v3 thread storage format). */ +export function serializeCasNode(node: StartNode | StateNode | ContentMerkleNode): string { + return stringify({ type: node.type, payload: node.payload, refs: node.refs }, { indent: 2 }); +} + +/** + * Recognizes a YAML CAS blob with the `{type, payload, refs[]}` shape used by + * `start` / `state` / `content` thread nodes. Used by {@link createCasStore} + * to skip the legacy auto-wrap step when the caller already supplied a + * pre-serialized RFC v3 node. + */ +export function isCasNodeYaml(content: string): boolean { + let raw: unknown; + try { + raw = parse(content) as unknown; + } catch { + return false; + } + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + return false; + } + const rec = raw as Record; + if (typeof rec.type !== "string") { + return false; + } + if (!Array.isArray(rec.refs)) { + return false; + } + for (const r of rec.refs) { + if (typeof r !== "string") { + return false; + } + } + return true; +} + +export async function putStartNode( + store: CasStore, + payload: StartNode["payload"], + promptHash: string, +): Promise { + const node: StartNode = { + type: "start", + payload, + refs: [promptHash], + }; + return store.put(serializeCasNode(node)); +} + +export async function putStateNode( + store: CasStore, + payload: StateNode["payload"], +): Promise { + const node: StateNode = { + type: "state", + payload, + refs: collectRefs(payload), + }; + return store.put(serializeCasNode(node)); +} + +export async function putContentNodeWithRefs( + store: CasStore, + payload: string, + refs: readonly string[], +): Promise { + const node: ContentMerkleNode = { + type: "content", + payload, + refs: [...refs], + }; + return store.put(serializeCasNode(node)); +} diff --git a/packages/workflow-execute/__tests__/engine.test.ts b/packages/workflow-execute/__tests__/engine.test.ts new file mode 100644 index 0000000..220dec1 --- /dev/null +++ b/packages/workflow-execute/__tests__/engine.test.ts @@ -0,0 +1,317 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { createCasStore } from "@uncaged/workflow-cas"; + +import type { + RoleOutput, + ThreadContext, + WorkflowCompletion, + WorkflowFn, + WorkflowRuntime, +} from "@uncaged/workflow-runtime"; +import { parse as parseYaml } from "yaml"; + +import { executeThread } from "../src/engine/engine.js"; +import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js"; + +const TEST_REGISTRY_YAML = `config: + maxDepth: 3 + supervisorInterval: 0 + providers: + stub: + baseUrl: http://127.0.0.1:9 + apiKey: test + models: + default: stub/m +workflows: {} +`; + +function noLogger(): (tag: string, content: string) => void { + return () => {}; +} + +function makeOptions(overrides: Partial): ExecuteThreadOptions { + return { + maxRounds: 5, + depth: 0, + signal: new AbortController().signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, + storageRoot: "/tmp/never", + ...overrides, + }; +} + +async function setupStorage(): Promise<{ + storageRoot: string; + casDir: string; + bundleHash: string; + bundleDir: string; +}> { + const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-engine-")); + await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8"); + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + const bundleHash = "TESTHASH00001"; + const bundleDir = join(storageRoot, "bundles", bundleHash); + return { storageRoot, casDir, bundleHash, bundleDir }; +} + +function readCasNode(casDir: string, hash: string): Record { + const text = require("node:fs").readFileSync(join(casDir, `${hash}.txt`), "utf8") as string; + return parseYaml(text) as Record; +} + +describe("executeThread (Phase 2 — CAS thread storage)", () => { + let storageRoot: string; + let casDir: string; + let bundleHash: string; + let bundleDir: string; + + beforeEach(async () => { + const setup = await setupStorage(); + storageRoot = setup.storageRoot; + casDir = setup.casDir; + bundleHash = setup.bundleHash; + bundleDir = setup.bundleDir; + }); + + afterEach(async () => { + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("writes a StartNode whose refs[0] is the prompt CAS hash", async () => { + const cas = createCasStore(casDir); + + // biome-ignore lint/correctness/useYield: deliberately empty generator — exercises the start/end path with no role steps + const wf: WorkflowFn = async function* ( + _thread: ThreadContext, + _runtime: WorkflowRuntime, + ): AsyncGenerator { + return { returnCode: 0, summary: "no-op" }; + }; + + const io: ExecuteThreadIo = { + threadId: "T01", + hash: bundleHash, + infoJsonlPath: join(storageRoot, "logs", bundleHash, "T01.info.jsonl"), + cas, + }; + + const result = await executeThread( + wf, + "demo", + { prompt: "hello", steps: [] }, + makeOptions({ storageRoot, maxRounds: 5 }), + io, + noLogger(), + ); + + expect(result.returnCode).toBe(0); + + const historyText = await readFile( + (await import("node:fs/promises")).readdir ? await firstHistoryFile(bundleDir) : "", + "utf8", + ); + const histLine = historyText.trim().split("\n")[0] ?? ""; + const histEntry = JSON.parse(histLine) as Record; + expect(histEntry.threadId).toBe("T01"); + + const startHash = histEntry.start as string; + const startNode = readCasNode(casDir, startHash); + expect(startNode.type).toBe("start"); + expect((startNode.payload as Record).name).toBe("demo"); + expect((startNode.payload as Record).hash).toBe(bundleHash); + expect((startNode.payload as Record).maxRounds).toBe(5); + + const refs = startNode.refs as string[]; + expect(refs.length).toBe(1); + + const promptBlob = await cas.get(refs[0] ?? ""); + expect(promptBlob).not.toBeNull(); + const promptParsed = parseYaml(promptBlob ?? "") as Record; + expect(promptParsed.payload).toBe("hello"); + }); + + test("each role yield produces a chained StateNode and updates threads.json head", async () => { + const cas = createCasStore(casDir); + + const wf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h1 = await runtime.cas.put("plan-text"); + yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] }; + const h2 = await runtime.cas.put("code-text"); + yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] }; + return { returnCode: 0, summary: "done" }; + }; + + const io: ExecuteThreadIo = { + threadId: "T02", + hash: bundleHash, + infoJsonlPath: join(storageRoot, "logs", bundleHash, "T02.info.jsonl"), + cas, + }; + + let observedHead: string | null = null; + let observedHeadAtSecondYield: string | null = null; + + const opts = makeOptions({ + storageRoot, + maxRounds: 5, + awaitAfterEachYield: async () => { + const text = await readFile(join(bundleDir, "threads.json"), "utf8"); + const parsed = JSON.parse(text) as Record; + const head = parsed.T02?.head ?? null; + if (observedHead === null) { + observedHead = head; + } else if (observedHeadAtSecondYield === null) { + observedHeadAtSecondYield = head; + } + }, + }); + + const result = await executeThread( + wf, + "demo", + { prompt: "p", steps: [] }, + opts, + io, + noLogger(), + ); + expect(result.returnCode).toBe(0); + + expect(observedHead).not.toBeNull(); + expect(observedHeadAtSecondYield).not.toBeNull(); + expect(observedHead).not.toBe(observedHeadAtSecondYield); + + const firstState = readCasNode(casDir, observedHead ?? ""); + expect(firstState.type).toBe("state"); + expect((firstState.payload as Record).role).toBe("planner"); + expect((firstState.payload as Record).ancestors).toEqual([]); + + const secondState = readCasNode(casDir, observedHeadAtSecondYield ?? ""); + expect(secondState.type).toBe("state"); + expect((secondState.payload as Record).role).toBe("coder"); + expect((secondState.payload as Record).ancestors).toEqual([observedHead]); + expect((secondState.payload as Record).start).toBe( + (firstState.payload as Record).start, + ); + }); + + test("on completion: removes threads.json entry, appends history with __end__ head", async () => { + const cas = createCasStore(casDir); + + const wf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h = await runtime.cas.put("only-step"); + yield { role: "only", contentHash: h, meta: {}, refs: [h] }; + return { returnCode: 0, summary: "completed" }; + }; + + const io: ExecuteThreadIo = { + threadId: "T03", + hash: bundleHash, + infoJsonlPath: join(storageRoot, "logs", bundleHash, "T03.info.jsonl"), + cas, + }; + + const result = await executeThread( + wf, + "demo", + { prompt: "p", steps: [] }, + makeOptions({ storageRoot, maxRounds: 5 }), + io, + noLogger(), + ); + + expect(result.returnCode).toBe(0); + + const indexText = await readFile(join(bundleDir, "threads.json"), "utf8"); + const indexParsed = JSON.parse(indexText) as Record; + expect(indexParsed).toEqual({}); + + const historyPath = await firstHistoryFile(bundleDir); + const historyText = await readFile(historyPath, "utf8"); + const lines = historyText.trim().split("\n"); + expect(lines.length).toBe(1); + const entry = JSON.parse(lines[0] ?? "") as Record; + expect(entry.threadId).toBe("T03"); + expect(entry.head).toBe(result.rootHash); + + const endNode = readCasNode(casDir, String(entry.head)); + expect(endNode.type).toBe("state"); + expect((endNode.payload as Record).role).toBe("__end__"); + expect((endNode.payload as Record).meta).toEqual({ + returnCode: 0, + summary: "completed", + }); + }); + + test("does not write any .data.jsonl file under storageRoot", async () => { + const cas = createCasStore(casDir); + + const wf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h = await runtime.cas.put("step"); + yield { role: "only", contentHash: h, meta: {}, refs: [h] }; + return { returnCode: 0, summary: "done" }; + }; + + const io: ExecuteThreadIo = { + threadId: "T04", + hash: bundleHash, + infoJsonlPath: join(storageRoot, "logs", bundleHash, "T04.info.jsonl"), + cas, + }; + + await executeThread( + wf, + "demo", + { prompt: "p", steps: [] }, + makeOptions({ storageRoot, maxRounds: 5 }), + io, + noLogger(), + ); + + const fsp = await import("node:fs/promises"); + const found: string[] = []; + async function walk(dir: string): Promise { + let entries: { name: string; isDirectory: () => boolean; isFile: () => boolean }[]; + try { + entries = await fsp.readdir(dir, { withFileTypes: true }); + } catch { + return; + } + for (const ent of entries) { + const p = join(dir, ent.name); + if (ent.isDirectory()) { + await walk(p); + } else if (ent.isFile() && ent.name.endsWith(".data.jsonl")) { + found.push(p); + } + } + } + await walk(storageRoot); + expect(found).toEqual([]); + }); +}); + +async function firstHistoryFile(bundleDir: string): Promise { + const fsp = await import("node:fs/promises"); + const dir = join(bundleDir, "history"); + const entries = await fsp.readdir(dir); + const file = entries.find((n) => n.endsWith(".jsonl")); + if (file === undefined) { + throw new Error(`no history file under ${dir}`); + } + return join(dir, file); +} diff --git a/packages/workflow-execute/__tests__/threads-index.test.ts b/packages/workflow-execute/__tests__/threads-index.test.ts new file mode 100644 index 0000000..5c8ac1d --- /dev/null +++ b/packages/workflow-execute/__tests__/threads-index.test.ts @@ -0,0 +1,91 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, readdir, readFile, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { + appendThreadHistoryEntry, + removeThreadEntry, + upsertThreadEntry, +} from "../src/engine/threads-index.js"; + +describe("threads-index", () => { + let bundleDir: string; + + beforeEach(async () => { + bundleDir = await mkdtemp(join(tmpdir(), "uncaged-wf-threads-")); + }); + + afterEach(async () => { + await rm(bundleDir, { recursive: true, force: true }); + }); + + test("upsertThreadEntry creates threads.json and persists entries", async () => { + await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 }); + const text = await readFile(join(bundleDir, "threads.json"), "utf8"); + const parsed = JSON.parse(text) as Record; + expect(parsed).toEqual({ + T1: { head: "H1", start: "S1", updatedAt: 100 }, + }); + }); + + test("upsertThreadEntry overwrites the head while preserving siblings", async () => { + await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 }); + await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 }); + await upsertThreadEntry(bundleDir, "T1", { head: "H1B", start: "S1", updatedAt: 300 }); + const text = await readFile(join(bundleDir, "threads.json"), "utf8"); + const parsed = JSON.parse(text) as Record; + expect(parsed).toEqual({ + T1: { head: "H1B", start: "S1", updatedAt: 300 }, + T2: { head: "H2", start: "S2", updatedAt: 200 }, + }); + }); + + test("removeThreadEntry deletes the entry but keeps the file", async () => { + await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 }); + await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 }); + await removeThreadEntry(bundleDir, "T1"); + const text = await readFile(join(bundleDir, "threads.json"), "utf8"); + const parsed = JSON.parse(text) as Record; + expect(parsed).toEqual({ + T2: { head: "H2", start: "S2", updatedAt: 200 }, + }); + }); + + test("removeThreadEntry on a missing thread is a no-op", async () => { + await removeThreadEntry(bundleDir, "MISSING"); + const dirEntries = await readdir(bundleDir); + expect(dirEntries.includes("threads.json")).toBe(false); + }); + + test("appendThreadHistoryEntry writes one JSONL line per call into a date-keyed file", async () => { + const ts = Date.UTC(2026, 4, 9, 12, 0, 0); + await appendThreadHistoryEntry(bundleDir, { + threadId: "T1", + head: "H1", + start: "S1", + completedAt: ts, + }); + await appendThreadHistoryEntry(bundleDir, { + threadId: "T2", + head: "H2", + start: "S2", + completedAt: ts, + }); + const text = await readFile(join(bundleDir, "history", "2026-05-09.jsonl"), "utf8"); + const lines = text.trim().split("\n"); + expect(lines.length).toBe(2); + expect(JSON.parse(lines[0] ?? "{}")).toEqual({ + threadId: "T1", + head: "H1", + start: "S1", + completedAt: ts, + }); + expect(JSON.parse(lines[1] ?? "{}")).toEqual({ + threadId: "T2", + head: "H2", + start: "S2", + completedAt: ts, + }); + }); +}); diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index 16eafc4..b713314 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -1,5 +1,18 @@ -import { appendFile, mkdir } from "node:fs/promises"; +import { mkdir } from "node:fs/promises"; import { dirname } from "node:path"; +import { + type CasStore, + getContentMerklePayload, + putContentNodeWithRefs, + putStartNode, + putStateNode, +} from "@uncaged/workflow-cas"; +import type { StateNode } from "@uncaged/workflow-protocol"; +import { + readWorkflowRegistry, + resolveModel, + type WorkflowConfig, +} from "@uncaged/workflow-register"; import type { LlmProvider, RoleOutput, @@ -9,21 +22,38 @@ import type { WorkflowResult, WorkflowRuntime, } from "@uncaged/workflow-runtime"; -import { START } from "@uncaged/workflow-runtime"; -import { - type CasStore, - getContentMerklePayload, - putStepMerkleNode, - putThreadMerkleNode, -} from "@uncaged/workflow-cas"; -import { resolveModel } from "@uncaged/workflow-register"; -import { createExtract } from "../extract/index.js"; -import { readWorkflowRegistry, type WorkflowConfig } from "@uncaged/workflow-register"; -import { err, type LogFn, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util"; +import { END, START } from "@uncaged/workflow-runtime"; +import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util"; +import { createExtract } from "../extract/index.js"; import { runSupervisor } from "./supervisor.js"; +import { + appendThreadHistoryEntry, + getBundleDir, + removeThreadEntry, + upsertThreadEntry, +} from "./threads-index.js"; import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js"; +/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */ +const ANCESTORS_CAP = 11; + +type ChainState = { + /** State hash of the most recently written {@link StateNode}, or `null` before the first step. */ + parentStateHash: string | null; + /** Ancestors recorded on the most recently written {@link StateNode}. */ + parentAncestors: readonly string[]; +}; + +const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] }; + +function computeAncestors(chain: ChainState): string[] { + if (chain.parentStateHash === null) { + return []; + } + return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP); +} + async function resolveEngineRegistryRuntime( storageRoot: string, cas: CasStore, @@ -57,51 +87,108 @@ async function resolveEngineRegistryRuntime( return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg }); } -async function appendDataLine(path: string, record: unknown): Promise { - const line = `${JSON.stringify(record)}\n`; - await appendFile(path, line, "utf8"); +async function appendStateForStep(params: { + cas: CasStore; + startHash: string; + chain: ChainState; + role: string; + contentHash: string; + meta: Record; + refs: readonly string[]; + timestamp: number; +}): Promise<{ stateHash: string; chain: ChainState }> { + const text = await getContentMerklePayload(params.cas, params.contentHash); + if (text === null) { + throw new Error( + `role step ${params.role}: CAS blob missing for contentHash ${params.contentHash}`, + ); + } + const artifactRefs = params.refs.filter((r) => r !== params.contentHash); + const contentHash = await putContentNodeWithRefs(params.cas, text, artifactRefs); + const ancestors = computeAncestors(params.chain); + const payload: StateNode["payload"] = { + role: params.role, + meta: params.meta, + start: params.startHash, + content: contentHash, + ancestors, + compact: null, + timestamp: params.timestamp, + }; + const stateHash = await putStateNode(params.cas, payload); + return { + stateHash, + chain: { parentStateHash: stateHash, parentAncestors: ancestors }, + }; } -async function finalizeThreadResult(params: { +async function appendEndState(params: { cas: CasStore; - workflowName: string; + startHash: string; + chain: ChainState; + completion: WorkflowCompletion; + timestamp: number; +}): Promise { + const contentHash = await putContentNodeWithRefs(params.cas, params.completion.summary, []); + const ancestors = computeAncestors(params.chain); + const payload: StateNode["payload"] = { + role: END, + meta: { returnCode: params.completion.returnCode, summary: params.completion.summary }, + start: params.startHash, + content: contentHash, + ancestors, + compact: null, + timestamp: params.timestamp, + }; + return putStateNode(params.cas, payload); +} + +async function finalizeThread(params: { + cas: CasStore; + bundleDir: string; threadId: string; - stepMerkleHashes: readonly string[]; + startHash: string; + chain: ChainState; completion: WorkflowCompletion; }): Promise { - const rootHash = await putThreadMerkleNode( - params.cas, - { - workflow: params.workflowName, - threadId: params.threadId, - result: { - returnCode: params.completion.returnCode, - summary: params.completion.summary, - }, - }, - params.stepMerkleHashes, - ); + const ts = Date.now(); + const endHash = await appendEndState({ + cas: params.cas, + startHash: params.startHash, + chain: params.chain, + completion: params.completion, + timestamp: ts, + }); + await removeThreadEntry(params.bundleDir, params.threadId); + await appendThreadHistoryEntry(params.bundleDir, { + threadId: params.threadId, + head: endHash, + start: params.startHash, + completedAt: ts, + }); return { returnCode: params.completion.returnCode, summary: params.completion.summary, - rootHash, + rootHash: endHash, }; } async function finalizeAbortedThread(params: { cas: CasStore; - workflowName: string; + bundleDir: string; threadId: string; - stepMerkleHashes: string[]; + startHash: string; + chain: ChainState; logger: LogFn; abortLogTag: string; }): Promise { params.logger(params.abortLogTag, `thread ${params.threadId} aborted`); - return finalizeThreadResult({ + return finalizeThread({ cas: params.cas, - workflowName: params.workflowName, + bundleDir: params.bundleDir, threadId: params.threadId, - stepMerkleHashes: params.stepMerkleHashes, + startHash: params.startHash, + chain: params.chain, completion: { returnCode: 130, summary: "thread aborted" }, }); } @@ -114,8 +201,9 @@ async function maybeSupervisorHaltsThread(params: { logger: LogFn; threadId: string; cas: CasStore; - workflowName: string; - stepMerkleHashes: string[]; + bundleDir: string; + startHash: string; + chain: ChainState; }): Promise { const interval = params.workflowConfig.supervisorInterval; if (interval <= 0 || params.written % interval !== 0) { @@ -135,41 +223,55 @@ async function maybeSupervisorHaltsThread(params: { return null; } params.logger("M4QX8VHN", `thread ${params.threadId} stopped by supervisor`); - return finalizeThreadResult({ + return finalizeThread({ cas: params.cas, - workflowName: params.workflowName, + bundleDir: params.bundleDir, threadId: params.threadId, - stepMerkleHashes: params.stepMerkleHashes, + startHash: params.startHash, + chain: params.chain, completion: { returnCode: 0, summary: "completed: supervisor stopped thread" }, }); } +async function publishHead(params: { + bundleDir: string; + threadId: string; + startHash: string; + headHash: string; +}): Promise { + await upsertThreadEntry(params.bundleDir, params.threadId, { + head: params.headHash, + start: params.startHash, + updatedAt: Date.now(), + }); +} + async function driveWorkflowGenerator(params: { fn: WorkflowFn; - workflowName: string; workflowConfig: WorkflowConfig; thread: ThreadContext; runtime: WorkflowRuntime; executeOptions: ExecuteThreadOptions; - dataJsonlPath: string; threadId: string; logger: LogFn; cas: CasStore; - stepMerkleHashes: string[]; + bundleDir: string; + startHash: string; + chain: ChainState; }): Promise { const { fn, - workflowName, workflowConfig, thread, runtime, executeOptions, - dataJsonlPath, threadId, logger, cas, - stepMerkleHashes, + bundleDir, + startHash, } = params; + let chain: ChainState = params.chain; const gen = fn(thread, runtime); let written = 0; const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({ @@ -181,9 +283,10 @@ async function driveWorkflowGenerator(params: { if (executeOptions.signal.aborted) { return await finalizeAbortedThread({ cas, - workflowName, + bundleDir, threadId, - stepMerkleHashes, + startHash, + chain, logger, abortLogTag: "V8JX4NP2", }); @@ -191,11 +294,12 @@ async function driveWorkflowGenerator(params: { if (written >= executeOptions.maxRounds) { logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`); - return await finalizeThreadResult({ + return await finalizeThread({ cas, - workflowName, + bundleDir, threadId, - stepMerkleHashes, + startHash, + chain, completion: { returnCode: 0, summary: `completed: reached maxRounds (${executeOptions.maxRounds})`, @@ -207,39 +311,31 @@ async function driveWorkflowGenerator(params: { if (iterResult.done) { logger("F3HN8QKP", `thread ${threadId} generator finished`); - const completion = iterResult.value; - return await finalizeThreadResult({ + return await finalizeThread({ cas, - workflowName, + bundleDir, threadId, - stepMerkleHashes, - completion, + startHash, + chain, + completion: iterResult.value, }); } written++; const step = iterResult.value; - const resolved = await getContentMerklePayload(cas, step.contentHash); - if (resolved === null) { - throw new Error( - `role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`, - ); - } const ts = Date.now(); - await appendDataLine(dataJsonlPath, { + const written_ = await appendStateForStep({ + cas, + startHash, + chain, role: step.role, contentHash: step.contentHash, meta: step.meta, - refs: normalizeRefsField(step.refs), + refs: step.refs, timestamp: ts, }); - - const stepNodeHash = await putStepMerkleNode( - cas, - { role: step.role, meta: step.meta }, - step.contentHash, - ); - stepMerkleHashes.push(stepNodeHash); + chain = written_.chain; + await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash }); logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`); @@ -262,9 +358,10 @@ async function driveWorkflowGenerator(params: { if (executeOptions.signal.aborted) { return await finalizeAbortedThread({ cas, - workflowName, + bundleDir, threadId, - stepMerkleHashes, + startHash, + chain, logger, abortLogTag: "V8JX4NP4", }); @@ -278,8 +375,9 @@ async function driveWorkflowGenerator(params: { logger, threadId, cas, - workflowName, - stepMerkleHashes, + bundleDir, + startHash, + chain, }); if (supervised !== null) { return supervised; @@ -288,8 +386,16 @@ async function driveWorkflowGenerator(params: { } /** - * Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records, - * debug lines via `logger` to `.info.jsonl`. + * Execute a workflow thread by driving the bundle's `AsyncGenerator`. + * + * Persistence layout (RFC v3 — CAS-based thread storage): + * - Thread chain is written as immutable CAS blobs: a single {@link StartNode} + * plus one {@link StateNode} per role step (including a final `__end__` + * state on completion / abort / `maxRounds`). + * - The active thread head is published in `/threads.json`; on + * completion it is removed and a record is appended to + * `/history/{YYYY-MM-DD}.jsonl`. + * - Debug logging continues to flow through `logger` to `.info.jsonl`. */ export async function executeThread( fn: WorkflowFn, @@ -299,7 +405,6 @@ export async function executeThread( io: ExecuteThreadIo, logger: LogFn, ): Promise { - await mkdir(dirname(io.dataJsonlPath), { recursive: true }); await mkdir(dirname(io.infoJsonlPath), { recursive: true }); const prefilled = options.prefilledDiskSteps; @@ -309,61 +414,63 @@ export async function executeThread( ); } - const nowMs = Date.now(); - const startRecord: Record = { - name: workflowName, - hash: io.hash, - threadId: io.threadId, - parameters: { - prompt: input.prompt, - options: { - maxRounds: options.maxRounds, - depth: options.depth, - }, - }, - timestamp: nowMs, - }; - if (options.forkSourceThreadId !== null) { - startRecord.forkFrom = { threadId: options.forkSourceThreadId }; - } + const bundleDir = getBundleDir(options.storageRoot, io.hash); - await appendDataLine(io.dataJsonlPath, startRecord); + const promptHash = await io.cas.put(input.prompt); + const startHash = await putStartNode( + io.cas, + { + name: workflowName, + hash: io.hash, + maxRounds: options.maxRounds, + depth: options.depth, + }, + promptHash, + ); + + await publishHead({ + bundleDir, + threadId: io.threadId, + startHash, + headHash: startHash, + }); logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`); - const stepMerkleHashes: string[] = []; + let chain: ChainState = EMPTY_CHAIN; if (prefilled !== null) { for (const row of prefilled) { - const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash); - if (prefilledPayload === null) { - throw new Error( - `prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`, - ); - } - await appendDataLine(io.dataJsonlPath, { + const written = await appendStateForStep({ + cas: io.cas, + startHash, + chain, role: row.role, contentHash: row.contentHash, meta: row.meta, - refs: normalizeRefsField(row.refs), + refs: row.refs, timestamp: row.timestamp, }); - const stepNodeHash = await putStepMerkleNode( - io.cas, - { role: row.role, meta: row.meta }, - row.contentHash, - ); - stepMerkleHashes.push(stepNodeHash); + chain = written.chain; + await publishHead({ + bundleDir, + threadId: io.threadId, + startHash, + headHash: written.stateHash, + }); } } + const nowMs = Date.now(); + if (options.maxRounds <= 0) { logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); - return await finalizeThreadResult({ + return await finalizeThread({ cas: io.cas, - workflowName, + bundleDir, threadId: io.threadId, - stepMerkleHashes, + startHash, + chain, completion: { returnCode: 0, summary: `completed: reached maxRounds (${options.maxRounds})`, @@ -401,15 +508,15 @@ export async function executeThread( return await driveWorkflowGenerator({ fn, - workflowName, workflowConfig: registryRuntime.value.workflowConfig, thread, runtime, executeOptions: options, - dataJsonlPath: io.dataJsonlPath, threadId: io.threadId, logger, cas: io.cas, - stepMerkleHashes, + bundleDir, + startHash, + chain, }); } diff --git a/packages/workflow-execute/src/engine/index.ts b/packages/workflow-execute/src/engine/index.ts index f7b6d74..85bcaa4 100644 --- a/packages/workflow-execute/src/engine/index.ts +++ b/packages/workflow-execute/src/engine/index.ts @@ -9,6 +9,13 @@ export { } from "./fork-thread.js"; export { garbageCollectCas } from "./gc.js"; export { createThreadPauseGate } from "./thread-pause-gate.js"; +export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js"; +export { + appendThreadHistoryEntry, + getBundleDir, + removeThreadEntry, + upsertThreadEntry, +} from "./threads-index.js"; export type { ExecuteThreadIo, ExecuteThreadOptions, diff --git a/packages/workflow-execute/src/engine/supervisor.ts b/packages/workflow-execute/src/engine/supervisor.ts index b5debe0..88c0940 100644 --- a/packages/workflow-execute/src/engine/supervisor.ts +++ b/packages/workflow-execute/src/engine/supervisor.ts @@ -75,7 +75,7 @@ export async function runSupervisor( }); if (!result.ok) { - args.logger("R9CW4PLM", `supervisor failed: ${result.error}`); + args.logger("R9CW4PHM", `supervisor failed: ${result.error}`); return err(`supervisor: ${result.error}`); } diff --git a/packages/workflow-execute/src/engine/threads-index.ts b/packages/workflow-execute/src/engine/threads-index.ts new file mode 100644 index 0000000..89d698a --- /dev/null +++ b/packages/workflow-execute/src/engine/threads-index.ts @@ -0,0 +1,136 @@ +import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises"; +import { dirname, join } from "node:path"; + +/** + * Active-thread index entry stored in `/threads.json`. + * + * Once the thread reaches `__end__`, the entry is removed from `threads.json` + * and a corresponding line is appended to `history/{YYYY-MM-DD}.jsonl`. + */ +export type ThreadIndexEntry = { + head: string; + start: string; + updatedAt: number; +}; + +export type ThreadHistoryEntry = { + threadId: string; + head: string; + start: string; + completedAt: number; +}; + +export type ThreadIndex = Record; + +export function getBundleDir(storageRoot: string, bundleHash: string): string { + return join(storageRoot, "bundles", bundleHash); +} + +function threadsJsonPath(bundleDir: string): string { + return join(bundleDir, "threads.json"); +} + +function isPlainObject(v: unknown): v is Record { + return v !== null && typeof v === "object" && !Array.isArray(v); +} + +function parseThreadIndexEntry(raw: unknown): ThreadIndexEntry | null { + if (!isPlainObject(raw)) { + return null; + } + const head = raw.head; + const start = raw.start; + const updatedAt = raw.updatedAt; + if (typeof head !== "string" || typeof start !== "string" || typeof updatedAt !== "number") { + return null; + } + return { head, start, updatedAt }; +} + +function parseThreadIndex(text: string): ThreadIndex { + const trimmed = text.trim(); + if (trimmed === "") { + return {}; + } + let raw: unknown; + try { + raw = JSON.parse(trimmed) as unknown; + } catch { + return {}; + } + if (!isPlainObject(raw)) { + return {}; + } + const out: ThreadIndex = {}; + for (const [k, v] of Object.entries(raw)) { + const entry = parseThreadIndexEntry(v); + if (entry !== null) { + out[k] = entry; + } + } + return out; +} + +async function readThreadIndex(bundleDir: string): Promise { + const path = threadsJsonPath(bundleDir); + let text: string; + try { + text = await readFile(path, "utf8"); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return {}; + } + throw e; + } + return parseThreadIndex(text); +} + +async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise { + const path = threadsJsonPath(bundleDir); + await mkdir(dirname(path), { recursive: true }); + const tmp = `${path}.tmp.${process.pid}.${Date.now()}`; + const json = `${JSON.stringify(index, null, 2)}\n`; + await writeFile(tmp, json, "utf8"); + await rename(tmp, path); +} + +/** Insert/update a thread entry in `threads.json`. */ +export async function upsertThreadEntry( + bundleDir: string, + threadId: string, + entry: ThreadIndexEntry, +): Promise { + const index = await readThreadIndex(bundleDir); + index[threadId] = entry; + await writeThreadIndex(bundleDir, index); +} + +/** Remove a thread entry from `threads.json` (no-op when absent). */ +export async function removeThreadEntry(bundleDir: string, threadId: string): Promise { + const index = await readThreadIndex(bundleDir); + if (!(threadId in index)) { + return; + } + delete index[threadId]; + await writeThreadIndex(bundleDir, index); +} + +function dateKey(epochMs: number): string { + const d = new Date(epochMs); + const y = d.getUTCFullYear().toString().padStart(4, "0"); + const m = (d.getUTCMonth() + 1).toString().padStart(2, "0"); + const day = d.getUTCDate().toString().padStart(2, "0"); + return `${y}-${m}-${day}`; +} + +/** Append a completion record to `history/{YYYY-MM-DD}.jsonl` keyed off `completedAt`. */ +export async function appendThreadHistoryEntry( + bundleDir: string, + entry: ThreadHistoryEntry, +): Promise { + const path = join(bundleDir, "history", `${dateKey(entry.completedAt)}.jsonl`); + await mkdir(dirname(path), { recursive: true }); + const line = `${JSON.stringify(entry)}\n`; + await appendFile(path, line, "utf8"); +} diff --git a/packages/workflow-execute/src/engine/types.ts b/packages/workflow-execute/src/engine/types.ts index 32f7b54..0bbc863 100644 --- a/packages/workflow-execute/src/engine/types.ts +++ b/packages/workflow-execute/src/engine/types.ts @@ -1,5 +1,5 @@ -import type { RoleOutput } from "@uncaged/workflow-runtime"; import type { CasStore } from "@uncaged/workflow-cas"; +import type { RoleOutput } from "@uncaged/workflow-runtime"; import type { Result } from "@uncaged/workflow-util"; export type SupervisorDecision = "continue" | "stop"; @@ -7,7 +7,6 @@ export type SupervisorDecision = "continue" | "stop"; export type ExecuteThreadIo = { threadId: string; hash: string; - dataJsonlPath: string; infoJsonlPath: string; cas: CasStore; }; diff --git a/packages/workflow-execute/src/engine/worker.ts b/packages/workflow-execute/src/engine/worker.ts index 6c85901..a29826d 100644 --- a/packages/workflow-execute/src/engine/worker.ts +++ b/packages/workflow-execute/src/engine/worker.ts @@ -1,7 +1,7 @@ -import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises"; +import { mkdir, unlink, writeFile } from "node:fs/promises"; import { createServer, type Socket } from "node:net"; import { dirname, join } from "node:path"; -import type { RoleOutput, WorkflowFn, WorkflowResult } from "@uncaged/workflow-runtime"; +import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime"; import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register"; import { createCasStore } from "@uncaged/workflow-cas"; import { @@ -364,13 +364,11 @@ async function main(): Promise { const threadId = cmd.threadId; const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`); - const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`); const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`); const io: ExecuteThreadIo = { threadId, hash, - dataJsonlPath, infoJsonlPath, cas, }; @@ -387,7 +385,6 @@ async function main(): Promise { try { await mkdir(dirname(runningPath), { recursive: true }); - await mkdir(dirname(dataJsonlPath), { recursive: true }); await writeFile(runningPath, "", "utf8"); const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); @@ -407,7 +404,7 @@ async function main(): Promise { }); } - const runResult = await executeThread( + await executeThread( workflowFn, cmd.workflowName, { prompt: cmd.prompt, steps: cmd.steps }, @@ -422,12 +419,9 @@ async function main(): Promise { io, logger, ); - await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8"); } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); - const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" }; - await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {}); } finally { threads.delete(threadId); await unlink(runningPath).catch(() => {}); diff --git a/packages/workflow-execute/src/workflow-as-agent.ts b/packages/workflow-execute/src/workflow-as-agent.ts index 2813361..e59b3ac 100644 --- a/packages/workflow-execute/src/workflow-as-agent.ts +++ b/packages/workflow-execute/src/workflow-as-agent.ts @@ -74,13 +74,11 @@ export function workflowAsAgent( }; const childThreadId = generateUlid(Date.now()); - const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`); const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`); const io: ExecuteThreadIo = { threadId: childThreadId, hash: entry.hash, - dataJsonlPath, infoJsonlPath, cas: createCasStore(getGlobalCasDir(storageRoot)), };