From a98431a12a2f9594ceaa3cdc98af445da3f6ff77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Tue, 12 May 2026 01:42:10 +0000 Subject: [PATCH] =?UTF-8?q?feat(#194):=20Phase=201=20=E2=80=94=20parentSta?= =?UTF-8?q?te=20/=20childThread=20in=20CAS=20nodes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Protocol: StartNodePayload.parentState, StateNodePayload.childThread - CAS: putStartNode refs include parentState, collectRefs includes childThread - Parsing: legacy nodes without new fields default to null - Engine + fork: all callers pass parentState: null / childThread: null - Tests: 8 new cases for refs, parsing, collect-refs (+208 lines) Phase 1 of #194 (Merkle Call Stack). Closes #195. 小橘 --- .../cli-workflow/__tests__/gc-cli.test.ts | 6 +- .../__tests__/collect-refs.test.ts | 29 ++++ packages/workflow-cas/__tests__/nodes.test.ts | 161 ++++++++++++++++++ .../__tests__/gc-mark-sweep.test.ts | 4 + .../workflow-execute/src/engine/engine.ts | 3 + .../src/engine/fork-thread.ts | 1 + .../__tests__/build-context.test.ts | 10 +- 7 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 packages/workflow-cas/__tests__/nodes.test.ts diff --git a/packages/cli-workflow/__tests__/gc-cli.test.ts b/packages/cli-workflow/__tests__/gc-cli.test.ts index 4ba455c..6b72502 100644 --- a/packages/cli-workflow/__tests__/gc-cli.test.ts +++ b/packages/cli-workflow/__tests__/gc-cli.test.ts @@ -45,8 +45,8 @@ describe("gc cli and garbageCollectCas", () => { { name: "demo", hash: bundleHash, - depth: 0, + parentState: null, }, promptHash, ); @@ -100,8 +100,8 @@ describe("gc cli and garbageCollectCas", () => { { name: "demo", hash: bundleHash, - depth: 0, + parentState: null, }, promptHash, ); @@ -135,8 +135,8 @@ describe("gc cli and garbageCollectCas", () => { { name: "demo", hash: bundleHash, - depth: 0, + parentState: null, }, promptHash, ); diff --git a/packages/workflow-cas/__tests__/collect-refs.test.ts b/packages/workflow-cas/__tests__/collect-refs.test.ts index 05a259c..cc7812e 100644 --- a/packages/workflow-cas/__tests__/collect-refs.test.ts +++ b/packages/workflow-cas/__tests__/collect-refs.test.ts @@ -14,6 +14,7 @@ function payload( ancestors: partial.ancestors ?? [], compact: partial.compact ?? null, timestamp: partial.timestamp ?? 0, + childThread: partial.childThread ?? null, }; } @@ -62,4 +63,32 @@ describe("collectRefs", () => { ); expect(refs).toEqual(["S2", "C2"]); }); + + test("includes childThread hash when childThread is non-null", () => { + const refs = collectRefs( + payload({ + role: "developer", + start: "S3", + content: "C3", + ancestors: ["A3"], + compact: null, + childThread: "CHILDEND000000000000001", + }), + ); + expect(refs).toEqual(["S3", "C3", "A3", "CHILDEND000000000000001"]); + }); + + test("does not include childThread when childThread is null", () => { + const refs = collectRefs( + payload({ + role: "developer", + start: "S4", + content: "C4", + ancestors: [], + compact: null, + childThread: null, + }), + ); + expect(refs).toEqual(["S4", "C4"]); + }); }); diff --git a/packages/workflow-cas/__tests__/nodes.test.ts b/packages/workflow-cas/__tests__/nodes.test.ts new file mode 100644 index 0000000..f8e4748 --- /dev/null +++ b/packages/workflow-cas/__tests__/nodes.test.ts @@ -0,0 +1,161 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { stringify } from "yaml"; + +import { createCasStore } from "../src/cas.js"; +import { parseCasThreadNode, putStartNode, putStateNode } from "../src/nodes.js"; + +describe("putStartNode — parentState in refs", () => { + let dir: string; + + beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-")); + }); + + afterEach(async () => { + await rm(dir, { recursive: true, force: true }); + }); + + test("refs contains only promptHash when parentState is null", async () => { + const cas = createCasStore(join(dir, "cas")); + const promptHash = await cas.put("hello"); + const startHash = await putStartNode( + cas, + { name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0, parentState: null }, + promptHash, + ); + + const blob = await cas.get(startHash); + expect(blob).not.toBeNull(); + const parsed = parseCasThreadNode(blob ?? ""); + expect(parsed).not.toBeNull(); + expect(parsed?.kind).toBe("start"); + if (parsed?.kind !== "start") return; + + expect(parsed.node.refs).toEqual([promptHash]); + expect(parsed.node.payload.parentState).toBeNull(); + }); + + test("refs contains [promptHash, parentStateHash] when parentState is set", async () => { + const cas = createCasStore(join(dir, "cas")); + const parentStateHash = await cas.put("fake-parent-state"); + const promptHash = await cas.put("child-prompt"); + const startHash = await putStartNode( + cas, + { name: "develop", hash: "BUNDLEBBBBBBBBB", depth: 1, parentState: parentStateHash }, + promptHash, + ); + + const blob = await cas.get(startHash); + expect(blob).not.toBeNull(); + const parsed = parseCasThreadNode(blob ?? ""); + expect(parsed).not.toBeNull(); + expect(parsed?.kind).toBe("start"); + if (parsed?.kind !== "start") return; + + expect(parsed.node.refs).toEqual([promptHash, parentStateHash]); + expect(parsed.node.payload.parentState).toBe(parentStateHash); + }); +}); + +describe("putStateNode — childThread in refs", () => { + let dir: string; + + beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-state-")); + }); + + afterEach(async () => { + await rm(dir, { recursive: true, force: true }); + }); + + test("refs does not include childThread when childThread is null", async () => { + const cas = createCasStore(join(dir, "cas")); + const startHash = await cas.put("start"); + const contentHash = await cas.put("content"); + const stateHash = await putStateNode(cas, { + role: "planner", + meta: {}, + start: startHash, + content: contentHash, + ancestors: [], + compact: null, + timestamp: 1000, + childThread: null, + }); + + const blob = await cas.get(stateHash); + expect(blob).not.toBeNull(); + const parsed = parseCasThreadNode(blob ?? ""); + expect(parsed?.kind).toBe("state"); + if (parsed?.kind !== "state") return; + + expect(parsed.node.refs).not.toContain("anything-else"); + expect(parsed.node.refs).toEqual([startHash, contentHash]); + expect(parsed.node.payload.childThread).toBeNull(); + }); + + test("refs includes childThread hash when childThread is set", async () => { + const cas = createCasStore(join(dir, "cas")); + const startHash = await cas.put("start"); + const contentHash = await cas.put("content"); + const childEndHash = await cas.put("child-end-state"); + const stateHash = await putStateNode(cas, { + role: "developer", + meta: { pr: 42 }, + start: startHash, + content: contentHash, + ancestors: [], + compact: null, + timestamp: 2000, + childThread: childEndHash, + }); + + const blob = await cas.get(stateHash); + expect(blob).not.toBeNull(); + const parsed = parseCasThreadNode(blob ?? ""); + expect(parsed?.kind).toBe("state"); + if (parsed?.kind !== "state") return; + + expect(parsed.node.refs).toContain(childEndHash); + expect(parsed.node.payload.childThread).toBe(childEndHash); + }); +}); + +describe("parseCasThreadNode — legacy node compatibility", () => { + test("start node without parentState field defaults to null", () => { + const yaml = stringify({ + type: "start", + payload: { name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0 }, + refs: ["PROMPTHASH00001"], + }); + const parsed = parseCasThreadNode(yaml); + expect(parsed).not.toBeNull(); + expect(parsed?.kind).toBe("start"); + if (parsed?.kind !== "start") return; + expect(parsed.node.payload.parentState).toBeNull(); + }); + + test("state node without childThread field defaults to null", () => { + const yaml = stringify({ + type: "state", + payload: { + role: "planner", + meta: {}, + start: "STARTHASH00001", + content: "CONTENTHASH0001", + ancestors: [], + compact: null, + timestamp: 1000, + }, + refs: ["STARTHASH00001", "CONTENTHASH0001"], + }); + const parsed = parseCasThreadNode(yaml); + expect(parsed).not.toBeNull(); + expect(parsed?.kind).toBe("state"); + if (parsed?.kind !== "state") return; + expect(parsed.node.payload.childThread).toBeNull(); + }); +}); diff --git a/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts b/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts index 96afe02..9143dc0 100644 --- a/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts +++ b/packages/workflow-execute/__tests__/gc-mark-sweep.test.ts @@ -46,6 +46,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => { name: "demo", hash: bundleHash, depth: 0, + parentState: null, }, promptHash, ); @@ -59,6 +60,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => { ancestors: [], compact: null, timestamp: 1, + childThread: null, } satisfies StateNodePayload); const c2 = await putContentNodeWithRefs(cas, "c1", []); @@ -70,6 +72,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => { ancestors: [h1], compact: null, timestamp: 2, + childThread: null, } satisfies StateNodePayload); const ec = await putContentNodeWithRefs(cas, "", []); @@ -81,6 +84,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => { ancestors: [h1], compact: null, timestamp: 3, + childThread: null, } satisfies StateNodePayload); await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", { diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index 6198e78..8adde0f 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -112,6 +112,7 @@ async function appendStateForStep(params: { ancestors, compact: null, timestamp: params.timestamp, + childThread: null, }; const stateHash = await putStateNode(params.cas, payload); return { @@ -137,6 +138,7 @@ async function appendEndState(params: { ancestors, compact: null, timestamp: params.timestamp, + childThread: null, }; return putStateNode(params.cas, payload); } @@ -439,6 +441,7 @@ export async function executeThread( name: workflowName, hash: io.hash, depth: options.depth, + parentState: null, }, promptHash, ); diff --git a/packages/workflow-execute/src/engine/fork-thread.ts b/packages/workflow-execute/src/engine/fork-thread.ts index c01b859..a4a3084 100644 --- a/packages/workflow-execute/src/engine/fork-thread.ts +++ b/packages/workflow-execute/src/engine/fork-thread.ts @@ -240,6 +240,7 @@ async function buildForkContinuation(params: { ancestors: ancestorsMarker, compact: null, timestamp: Date.now(), + childThread: null, }; const markerHash = await putStateNode(cas, markerPayload); diff --git a/packages/workflow-runtime/__tests__/build-context.test.ts b/packages/workflow-runtime/__tests__/build-context.test.ts index 6632f20..76c760d 100644 --- a/packages/workflow-runtime/__tests__/build-context.test.ts +++ b/packages/workflow-runtime/__tests__/build-context.test.ts @@ -27,7 +27,7 @@ describe("buildThreadContext", () => { const bundleHash = "BHAAAAAAAAAAA"; const startHash = await putStartNode( cas, - { name: "demo", hash: bundleHash, depth: 2 }, + { name: "demo", hash: bundleHash, depth: 2, parentState: null }, promptHash, ); @@ -41,6 +41,7 @@ describe("buildThreadContext", () => { ancestors: [], compact: null, timestamp: 1000, + childThread: null, }); const chCode = await putContentNodeWithRefs(cas, "code body", []); @@ -52,6 +53,7 @@ describe("buildThreadContext", () => { ancestors: [statePlan], compact: null, timestamp: 2000, + childThread: null, }); const ctx = await buildThreadContext(stateCode, cas); @@ -71,7 +73,7 @@ describe("buildThreadContext", () => { const promptHash = await cas.put("only-prompt"); const startHash = await putStartNode( cas, - { name: "solo", hash: "BHBBBBBBBBBBB", depth: 1 }, + { name: "solo", hash: "BHBBBBBBBBBBB", depth: 1, parentState: null }, promptHash, ); @@ -87,7 +89,7 @@ describe("buildThreadContext", () => { const bundleHash = "BHCCCCCCCCCCC"; const startHash = await putStartNode( cas, - { name: "demo", hash: bundleHash, depth: 0 }, + { name: "demo", hash: bundleHash, depth: 0, parentState: null }, promptHash, ); @@ -100,6 +102,7 @@ describe("buildThreadContext", () => { ancestors: [], compact: null, timestamp: 500, + childThread: null, }); const endContent = await putContentNodeWithRefs(cas, "finished", []); @@ -111,6 +114,7 @@ describe("buildThreadContext", () => { ancestors: [state1], compact: null, timestamp: 600, + childThread: null, }); const ctx = await buildThreadContext(endState, cas);