From 93b96987a32e3c878ce4747b7d1ac97b97e36b44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Tue, 2 Jun 2026 22:22:38 +0800 Subject: [PATCH] refactor: migrate threads index from YAML to ocas variable store (Phase 4b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace loadThreadsIndex/saveThreadsIndex with granular variable API: loadAllThreads, getThread, setThread, deleteThread - Variable: @uwf/thread/, value=head hash, tags=suspend metadata - Auto-migration: threads.yaml → variables, renames to .migrated - Updated ~20 call sites in thread.ts, step.ts, shared.ts - workflow-util-agent: getActiveThreadEntry reads from variable store - New test helper: seedThread/seedThreads - biome fix: removed unused imports - 22 files changed Ref #11 --- .../__tests__/adapter-json-roundtrip.test.ts | 6 +- .../src/__tests__/current-role.test.ts | 38 +++--- .../src/__tests__/resolve-head-hash.test.ts | 26 ++--- .../src/__tests__/step-timing.test.ts | 8 +- .../src/__tests__/store-global-cas.test.ts | 48 +++++--- .../src/__tests__/thread-list-filters.test.ts | 52 ++++++--- .../src/__tests__/thread-location.test.ts | 8 +- .../src/__tests__/thread-read-quota.test.ts | 14 +-- .../__tests__/thread-read-xml-tags.test.ts | 21 ++-- .../src/__tests__/thread-resume.test.ts | 27 +++-- .../src/__tests__/thread-show-status.test.ts | 37 +++--- .../__tests__/thread-start-cwd-cli.test.ts | 6 +- .../src/__tests__/thread-suspend-step.test.ts | 13 +-- .../thread-suspended-display.test.ts | 9 +- .../src/__tests__/thread-test-helpers.ts | 37 ++++++ .../cli-workflow/src/__tests__/thread.test.ts | 33 +++--- packages/cli-workflow/src/commands/shared.ts | 10 +- packages/cli-workflow/src/commands/step.ts | 10 +- packages/cli-workflow/src/commands/thread.ts | 110 ++++++++---------- packages/cli-workflow/src/store.ts | 103 ++++++++++++---- packages/workflow-util-agent/src/context.ts | 22 ++-- packages/workflow-util-agent/src/storage.ts | 64 ++++++---- 22 files changed, 419 insertions(+), 283 deletions(-) create mode 100644 packages/cli-workflow/src/__tests__/thread-test-helpers.ts diff --git a/packages/cli-workflow/src/__tests__/adapter-json-roundtrip.test.ts b/packages/cli-workflow/src/__tests__/adapter-json-roundtrip.test.ts index 44c1236..af8a0be 100644 --- a/packages/cli-workflow/src/__tests__/adapter-json-roundtrip.test.ts +++ b/packages/cli-workflow/src/__tests__/adapter-json-roundtrip.test.ts @@ -7,7 +7,7 @@ import { putSchema } from "@ocas/core"; import { createFsStore } from "@ocas/fs"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; import { registerUwfSchemas } from "../schemas.js"; -import { saveThreadsIndex } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; // ── schemas ────────────────────────────────────────────────────────────────── @@ -67,8 +67,10 @@ describe("C1: adapter JSON round-trip integration", () => { prompt: "Test round-trip task", }); + process.env.UNCAGED_CAS_DIR = casDir; + const threadId = "01ROUNDTRIPTEST0000000000" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: startHash }); + await seedThreads(tmpDir, { [threadId]: startHash }); // 2. Pre-create CAS nodes that the mock agent would produce const outputHash = await store.put(outputSchemaHash, { diff --git a/packages/cli-workflow/src/__tests__/current-role.test.ts b/packages/cli-workflow/src/__tests__/current-role.test.ts index 2de4c7a..2b75083 100644 --- a/packages/cli-workflow/src/__tests__/current-role.test.ts +++ b/packages/cli-workflow/src/__tests__/current-role.test.ts @@ -9,8 +9,9 @@ import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread import { appendThreadHistory, createUwfStore, - loadThreadsIndex, - saveThreadsIndex, + deleteThread, + loadAllThreads, + setThread, } from "../store.js"; const OUTPUT_SCHEMA = { @@ -174,7 +175,7 @@ async function insertStepNode( outputPayload: Record, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = await loadThreadsIndex(storageRoot); + const index = loadAllThreads(uwf.varStore); const headEntry = index[threadId]; if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); const head = headEntry.head; @@ -195,13 +196,17 @@ async function insertStepNode( start: startHash, prev: isStart ? null : head, role, - prompt: `Do ${role}`, output: outputHash, detail: detailHash, + agent: "uwf-test", + edgePrompt: `Do ${role}`, + startedAtMs: Date.now(), + completedAtMs: Date.now() + 1, + cwd: storageRoot, + assembledPrompt: null, })) as CasRef; - index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; - await saveThreadsIndex(storageRoot, index); + setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); } describe("currentRole field", () => { @@ -280,10 +285,9 @@ describe("currentRole field", () => { const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const tid = thread as ThreadId; - const index = await loadThreadsIndex(storageRoot); - const head = index[tid]!.head; - delete index[tid]; - await saveThreadsIndex(storageRoot, index); + const uwfForIndex = await createUwfStore(storageRoot); + const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; + deleteThread(uwfForIndex.varStore, tid); await appendThreadHistory(storageRoot, { thread: tid, workflow, @@ -309,10 +313,9 @@ describe("currentRole field", () => { const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const tid = thread as ThreadId; - const index = await loadThreadsIndex(storageRoot); - const head = index[tid]!.head; - delete index[tid]; - await saveThreadsIndex(storageRoot, index); + const uwfForIndex = await createUwfStore(storageRoot); + const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; + deleteThread(uwfForIndex.varStore, tid); await appendThreadHistory(storageRoot, { thread: tid, workflow, @@ -371,10 +374,9 @@ describe("currentRole field", () => { // completed thread const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const compId = comp.thread as ThreadId; - const index = await loadThreadsIndex(storageRoot); - const compHead = index[compId]!.head; - delete index[compId]; - await saveThreadsIndex(storageRoot, index); + const uwfForIndex = await createUwfStore(storageRoot); + const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head; + deleteThread(uwfForIndex.varStore, compId); await appendThreadHistory(storageRoot, { thread: compId, workflow: comp.workflow, diff --git a/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts b/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts index d12cc56..81784a6 100644 --- a/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts +++ b/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts @@ -2,9 +2,9 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import type { CasRef, ThreadId } from "@united-workforce/protocol"; +import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { resolveHeadHash } from "../commands/shared.js"; -import { appendThreadHistory, saveThreadsIndex } from "../store.js"; +import { appendThreadHistory } from "../store.js"; let tmpDir: string; @@ -17,11 +17,12 @@ afterEach(async () => { }); describe("resolveHeadHash", () => { - test("returns head hash from threads.yaml for active thread", async () => { + test("returns head hash from variable store for active thread", async () => { const threadId = "01JTEST0000000000000000001" as ThreadId; - const headHash = "active_hash_123" as CasRef; - - await saveThreadsIndex(tmpDir, { [threadId]: headHash }); + const { createUwfStore, setThread } = await import("../store.js"); + const uwf = await createUwfStore(tmpDir); + const headHash = (await uwf.store.put(uwf.schemas.text, "active")) as CasRef; + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash as CasRef)); const result = await resolveHeadHash(tmpDir, threadId); @@ -34,7 +35,6 @@ describe("resolveHeadHash", () => { const workflowHash = "workflow_hash_789" as CasRef; // No entry in threads.yaml, only in history.jsonl - await saveThreadsIndex(tmpDir, {}); await appendThreadHistory(tmpDir, { thread: threadId, workflow: workflowHash, @@ -54,12 +54,14 @@ describe("resolveHeadHash", () => { test("prioritizes active thread over history when thread exists in both", async () => { const threadId = "01JTEST0000000000000000004" as ThreadId; - const activeHash = "active_hash_v2" as CasRef; const historicalHash = "historical_hash_v1" as CasRef; const workflowHash = "workflow_hash_xyz" as CasRef; - // Thread exists in both locations (should not happen normally, but test the precedence) - await saveThreadsIndex(tmpDir, { [threadId]: activeHash }); + const { createUwfStore, setThread } = await import("../store.js"); + const { createThreadIndexEntry } = await import("@united-workforce/protocol"); + const uwf = await createUwfStore(tmpDir); + const activeHead = (await uwf.store.put(uwf.schemas.text, "active-v2")) as CasRef; + setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead)); await appendThreadHistory(tmpDir, { thread: threadId, workflow: workflowHash, @@ -71,7 +73,7 @@ describe("resolveHeadHash", () => { const result = await resolveHeadHash(tmpDir, threadId); // Should return the active head, not the historical one - expect(result).toBe(activeHash); + expect(result).toBe(activeHead); }); test("finds thread from multiple history entries", async () => { @@ -82,8 +84,6 @@ describe("resolveHeadHash", () => { const hash2 = "hash_thread2" as CasRef; const hash3 = "hash_thread3" as CasRef; const workflowHash = "workflow_hash_abc" as CasRef; - - await saveThreadsIndex(tmpDir, {}); await appendThreadHistory(tmpDir, { thread: threadId1, workflow: workflowHash, diff --git a/packages/cli-workflow/src/__tests__/step-timing.test.ts b/packages/cli-workflow/src/__tests__/step-timing.test.ts index c494e7b..5cc556f 100644 --- a/packages/cli-workflow/src/__tests__/step-timing.test.ts +++ b/packages/cli-workflow/src/__tests__/step-timing.test.ts @@ -9,7 +9,7 @@ import { STEP_NODE_SCHEMA } from "@united-workforce/protocol"; import { cmdStepList } from "../commands/step.js"; import { cmdThreadRead } from "../commands/thread.js"; import { registerUwfSchemas } from "../schemas.js"; -import { saveThreadsIndex } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; // ── schemas ────────────────────────────────────────────────────────────────── @@ -216,7 +216,7 @@ describe("step list timing", () => { }); const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const result = await cmdStepList(tmpDir, threadId); const stepEntries = result.steps.slice(1); // skip start entry @@ -290,7 +290,7 @@ describe("thread read timing", () => { }); const threadId = "01HX2Q3R4S5T6V7W8X9YZ3" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false); expect(markdown).toContain("**Duration:** 42.0s"); @@ -356,7 +356,7 @@ describe("thread read timing", () => { }); const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false); expect(markdown).toContain("**Duration:** 350ms"); diff --git a/packages/cli-workflow/src/__tests__/store-global-cas.test.ts b/packages/cli-workflow/src/__tests__/store-global-cas.test.ts index bbbdb68..5a6e3ca 100644 --- a/packages/cli-workflow/src/__tests__/store-global-cas.test.ts +++ b/packages/cli-workflow/src/__tests__/store-global-cas.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdir, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { createUwfStore, getCasDir, @@ -9,6 +10,7 @@ import { getRegistryPath, loadWorkflowRegistry, saveWorkflowRegistry, + setThread, } from "../store.js"; describe("Global CAS directory", () => { @@ -191,29 +193,49 @@ describe("Global CAS directory", () => { expect(migratedContent).toContain(hash); }); - test("thread metadata remains in storageRoot", async () => { + test("migrates threads.yaml to variable store and renames file", async () => { + const globalCasDir = join(tmpDir, "global-cas-threads"); + process.env.UNCAGED_CAS_DIR = globalCasDir; + + const storageRoot = join(tmpDir, "storage-threads-migrate"); + await mkdir(storageRoot, { recursive: true }); + + const threadId = "01JTEST0000000000000000AB" as ThreadId; + const uwfSeed = await createUwfStore(storageRoot); + const headHash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-thread-head"); + const { writeFile, access, readFile } = await import("node:fs/promises"); + const threadsPath = join(storageRoot, "threads.yaml"); + await writeFile(threadsPath, `${threadId}: ${headHash}\n`, "utf8"); + + const uwf = await createUwfStore(storageRoot); + const entry = uwf.varStore.list({ exactName: `@uwf/thread/${threadId}` })[0]; + expect(entry?.value).toBe(headHash); + + await expect(access(threadsPath)).rejects.toThrow(); + const migratedContent = await readFile(`${threadsPath}.migrated`, "utf8"); + expect(migratedContent).toContain(threadId); + expect(migratedContent).toContain(headHash); + }); + + test("thread metadata stored in ocas variable store", async () => { const globalCasDir = join(tmpDir, "global-cas"); process.env.UNCAGED_CAS_DIR = globalCasDir; const storageRoot = join(tmpDir, "storage"); await mkdir(storageRoot, { recursive: true }); - await createUwfStore(storageRoot); + const threadId = "01JTEST000000000000000123" as ThreadId; + const uwfSeed = await createUwfStore(storageRoot); + const headHash = await uwfSeed.store.put(uwfSeed.schemas.text, "hash-456"); + setThread(uwfSeed.varStore, threadId, createThreadIndexEntry(headHash)); - // Write threads index - const { saveThreadsIndex } = await import("../store.js"); - await saveThreadsIndex(storageRoot, { "thread-123": "hash-456" }); + const uwf = await createUwfStore(storageRoot); + const entry = uwf.varStore.list({ exactName: `@uwf/thread/${threadId}` })[0]; + expect(entry?.value).toBe(headHash); - // Verify threads.yaml is in storageRoot, not global CAS const { readFile } = await import("node:fs/promises"); const threadsPath = join(storageRoot, "threads.yaml"); - const content = await readFile(threadsPath, "utf8"); - expect(content).toContain("thread-123"); - expect(content).toContain("hash-456"); - - // Verify threads.yaml is NOT in global CAS directory - const globalThreadsPath = join(globalCasDir, "threads.yaml"); - await expect(readFile(globalThreadsPath, "utf8")).rejects.toThrow(); + await expect(readFile(threadsPath, "utf8")).rejects.toThrow(); }); test("history remains in storageRoot", async () => { diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts index 98a74a6..0d2ae44 100644 --- a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -9,7 +9,13 @@ import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadList } from "../commands/thread.js"; import { parseTimeInput } from "../commands/thread-time-parser.js"; import type { UwfStore } from "../store.js"; -import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js"; +import { + appendThreadHistory, + createUwfStore, + deleteThread, + loadAllThreads, + setThread, +} from "../store.js"; // ── helpers ─────────────────────────────────────────────────────────────────── @@ -50,10 +56,7 @@ async function createTestThread( }; const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); - // Load existing index and add new thread - const existingIndex = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - existingIndex[threadId] = createThreadIndexEntry(headHash); - await saveThreadsIndex(storageRoot, existingIndex); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); return threadId; } @@ -73,9 +76,8 @@ async function completeThread( workflowHash: CasRef, headHash: CasRef, ) { - const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - delete index[threadId]; - await saveThreadsIndex(storageRoot, index); + const uwfIdx = await createUwfStore(storageRoot); + deleteThread(uwfIdx.varStore, threadId); await appendThreadHistory(storageRoot, { thread: threadId, workflow: workflowHash, @@ -110,7 +112,8 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -134,7 +137,8 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -158,7 +162,8 @@ describe("cmdThreadList status filter", () => { const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -180,7 +185,8 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -352,7 +358,8 @@ describe("combined filters", () => { await markThreadRunning(tmpDir, thread2, workflowHash); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -376,7 +383,8 @@ describe("combined filters", () => { for (let i = 9; i >= 0; i--) { const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); threads.push(thread); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const headHash = index[thread]!.head; if (headHash === undefined) throw new Error("head not found"); await completeThread(tmpDir, thread, workflowHash, headHash); @@ -425,7 +433,8 @@ describe("combined filters", () => { threads.push(thread); if (i % 2 === 0) { - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); const headHash = index[thread]!.head; if (headHash === undefined) throw new Error("head not found"); await completeThread(tmpDir, thread, workflowHash, headHash); @@ -483,13 +492,20 @@ describe("edge cases", () => { const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const uwfIdx = await createUwfStore(tmpDir); + const index = loadAllThreads(uwfIdx.varStore); + const placeholderHead = (await uwfIdx.store.put( + uwfIdx.schemas.text, + "invalid-ulid-placeholder", + )) as CasRef; index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { - head: "01J6HMVRNQKJV2", + head: placeholderHead, suspendedRole: null, suspendMessage: null, }; - await saveThreadsIndex(tmpDir, index); + for (const [tid, ent] of Object.entries(index)) { + setThread(uwfIdx.varStore, tid as ThreadId, ent); + } const afterMs = Date.now() - 3000; const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null); diff --git a/packages/cli-workflow/src/__tests__/thread-location.test.ts b/packages/cli-workflow/src/__tests__/thread-location.test.ts index df7a14f..1c466f1 100644 --- a/packages/cli-workflow/src/__tests__/thread-location.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-location.test.ts @@ -4,7 +4,7 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol"; import { cmdThreadStart } from "../commands/thread.js"; -import { createUwfStore } from "../store.js"; +import { createUwfStore, getThread } from "../store.js"; describe("Thread and edge location integration", () => { let tmpDir: string; @@ -79,8 +79,7 @@ graph: // Verify StartNode has the cwd field const uwf = await createUwfStore(storageRoot); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - const headHash = index[result.thread as ThreadId]!.head; + const headHash = getThread(uwf.varStore, result.thread as ThreadId)!.head; expect(headHash).toBeDefined(); const startNode = uwf.store.get(headHash as CasRef); @@ -174,8 +173,7 @@ graph: const result = await cmdThreadStart(storageRoot, workflowPath, "test", tmpDir); const uwf = await createUwfStore(storageRoot); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - const headHash = index[result.thread as ThreadId]!.head; + const headHash = getThread(uwf.varStore, result.thread as ThreadId)!.head; const startNode = uwf.store.get(headHash as CasRef); const startPayload = startNode?.payload as StartNodePayload; diff --git a/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts b/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts index 252a223..b68a832 100644 --- a/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts @@ -7,7 +7,7 @@ import { createFsStore } from "@ocas/fs"; import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { cmdThreadRead } from "../commands/thread.js"; import { registerUwfSchemas } from "../schemas.js"; -import { saveThreadsIndex } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -158,7 +158,7 @@ describe("thread read --quota flag", () => { } const threadId = "01HX2Q3R4S5T6V7W8X9YZ0" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); + await seedThreads(tmpDir, { [threadId]: steps[2] as CasRef }); // Set quota to 800 chars - should only fit most recent steps const markdown = await cmdThreadRead(tmpDir, threadId, 800, null, false); @@ -266,7 +266,7 @@ describe("thread read --quota flag", () => { }); const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: step2Hash }); + await seedThreads(tmpDir, { [threadId]: step2Hash }); // Set quota to 500 chars const markdown = await cmdThreadRead(tmpDir, threadId, 500, null, false); @@ -354,7 +354,7 @@ describe("thread read --quota flag", () => { } const threadId = "01HX2Q3R4S5T6V7W8X9YZ2" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: steps[1] as CasRef }); + await seedThreads(tmpDir, { [threadId]: steps[1] as CasRef }); // Set tight quota with --start flag const markdown = await cmdThreadRead(tmpDir, threadId, 600, null, true); @@ -432,7 +432,7 @@ describe("thread read --quota flag", () => { }); const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); // Minimal quota const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); @@ -512,7 +512,7 @@ describe("thread read --quota flag", () => { } const threadId = "01HX2Q3R4S5T6V7W8X9YZ5" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); + await seedThreads(tmpDir, { [threadId]: steps[2] as CasRef }); // Very large quota const markdown = await cmdThreadRead(tmpDir, threadId, 1000000, null, false); @@ -594,7 +594,7 @@ describe("thread read --quota flag", () => { } const threadId = "01HX2Q3R4S5T6V7W8X9YZ6" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: steps[4] as CasRef }); + await seedThreads(tmpDir, { [threadId]: steps[4] as CasRef }); // Use --before to limit to steps 1-2, then set quota that allows only 1 const markdown = await cmdThreadRead(tmpDir, threadId, 500, steps[2] as CasRef, false); diff --git a/packages/cli-workflow/src/__tests__/thread-read-xml-tags.test.ts b/packages/cli-workflow/src/__tests__/thread-read-xml-tags.test.ts index 13f5b8a..a514470 100644 --- a/packages/cli-workflow/src/__tests__/thread-read-xml-tags.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-read-xml-tags.test.ts @@ -7,7 +7,8 @@ import type { createFsStore } from "@ocas/fs"; import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js"; import type { UwfStore } from "../store.js"; -import { createUwfStore, saveThreadsIndex } from "../store.js"; +import { createUwfStore } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -143,7 +144,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000001" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -221,7 +222,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000002" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -296,7 +297,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000003" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: step2 }); + await seedThreads(tmpDir, { [threadId]: step2 }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -351,7 +352,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000004" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -406,7 +407,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000005" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -461,7 +462,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000006" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true); @@ -560,7 +561,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000007" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: step3 }); + await seedThreads(tmpDir, { [threadId]: step3 }); const markdown = await cmdThreadRead( tmpDir, @@ -641,7 +642,7 @@ describe("thread read XML tag isolation", () => { }); const threadId = "01JTEST0000000000000008" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -701,7 +702,7 @@ describe("thread read XML tag isolation", () => { } const threadId = "01JTEST0000000000000009" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: steps[steps.length - 1]! }); + await seedThreads(tmpDir, { [threadId]: steps[steps.length - 1]! }); // Use very small quota const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); diff --git a/packages/cli-workflow/src/__tests__/thread-resume.test.ts b/packages/cli-workflow/src/__tests__/thread-resume.test.ts index a631378..096f87a 100644 --- a/packages/cli-workflow/src/__tests__/thread-resume.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-resume.test.ts @@ -6,10 +6,9 @@ import { join } from "node:path"; import { putSchema } from "@ocas/core"; import { createFsStore } from "@ocas/fs"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; -import { parse } from "yaml"; import { cmdThreadShow } from "../commands/thread.js"; import { registerUwfSchemas } from "../schemas.js"; -import { saveThreadsIndex } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; const OUTPUT_SCHEMA = { type: "object" as const, @@ -89,7 +88,8 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{ cwd: tmpDir, }); - await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + process.env.UNCAGED_CAS_DIR = casDir; + await seedThreads(tmpDir, { [THREAD_ID]: startHash }); const outputHash = await store.put(outputSchemaHash, { $status: "needs_input", @@ -114,7 +114,7 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{ assembledPrompt: null, }); - await saveThreadsIndex(tmpDir, { + await seedThreads(tmpDir, { [THREAD_ID]: { head: stepHash, suspendedRole: "worker", @@ -241,7 +241,8 @@ describe("uwf thread resume", () => { cwd: tmpDir, }); - await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + process.env.UNCAGED_CAS_DIR = casDir; + await seedThreads(tmpDir, { [THREAD_ID]: startHash }); const result = runUwf(["thread", "resume", THREAD_ID], casDir); expect(result.status).not.toBe(0); @@ -264,9 +265,12 @@ describe("uwf thread resume", () => { expect(cliOutput.suspendMessage).toBeNull(); expect(cliOutput.done).toBe(false); - const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); - const threadsIndex = parse(threadsYaml) as Record; - expect(threadsIndex[THREAD_ID]).toBe(cliOutput.head); + const { createUwfStore, getThread } = await import("../store.js"); + const uwf = await createUwfStore(tmpDir); + const entry = getThread(uwf.varStore, THREAD_ID); + expect(entry?.head).toBe(cliOutput.head); + expect(entry?.suspendedRole).toBeNull(); + expect(entry?.suspendMessage).toBeNull(); const showResult = await cmdThreadShow(tmpDir, THREAD_ID); expect(showResult.status).toBe("idle"); @@ -338,10 +342,9 @@ describe("uwf thread resume", () => { expect(firstResume.suspendedRole).toBe("worker"); expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE); - const threadsAfterFirst = parse( - await readFile(join(tmpDir, "threads.yaml"), "utf8"), - ) as Record; - expect(threadsAfterFirst[THREAD_ID]).toEqual({ + const { createUwfStore, getThread } = await import("../store.js"); + const uwfAfterFirst = await createUwfStore(tmpDir); + expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({ head: firstResume.head, suspendedRole: "worker", suspendMessage: SUSPEND_MESSAGE, diff --git a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts index 601b5db..e47cabb 100644 --- a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts @@ -9,8 +9,9 @@ import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { appendThreadHistory, createUwfStore, - loadThreadsIndex, - saveThreadsIndex, + deleteThread, + loadAllThreads, + setThread, } from "../store.js"; const OUTPUT_SCHEMA = { @@ -89,7 +90,7 @@ async function insertStepNode( outputPayload: Record, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = await loadThreadsIndex(storageRoot); + const index = loadAllThreads(uwf.varStore); const headEntry = index[threadId]; if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); const head = headEntry.head; @@ -117,8 +118,7 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; - await saveThreadsIndex(storageRoot, index); + setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); } describe("thread show status field", () => { @@ -203,15 +203,12 @@ describe("thread show status field", () => { const workflow = startResult.workflow; // Get the head hash before moving to history - const index = await loadThreadsIndex(storageRoot); + const uwfForIndex = await createUwfStore(storageRoot); + const index = loadAllThreads(uwfForIndex.varStore); const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - // Move thread to history with reason 'completed' - const { saveThreadsIndex } = await import("../store.js"); - const newIndex = { ...index }; - delete newIndex[threadId]; - await saveThreadsIndex(storageRoot, newIndex); + deleteThread(uwfForIndex.varStore, threadId); await appendThreadHistory(storageRoot, { thread: threadId, @@ -243,15 +240,12 @@ describe("thread show status field", () => { const workflow = startResult.workflow; // Get the head hash before moving to history - const index = await loadThreadsIndex(storageRoot); + const uwfForIndex = await createUwfStore(storageRoot); + const index = loadAllThreads(uwfForIndex.varStore); const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - // Move thread to history with reason 'cancelled' - const { saveThreadsIndex } = await import("../store.js"); - const newIndex = { ...index }; - delete newIndex[threadId]; - await saveThreadsIndex(storageRoot, newIndex); + deleteThread(uwfForIndex.varStore, threadId); await appendThreadHistory(storageRoot, { thread: threadId, @@ -283,15 +277,12 @@ describe("thread show status field", () => { const workflow = startResult.workflow; // Get the head hash before moving to history - const index = await loadThreadsIndex(storageRoot); + const uwfForIndex = await createUwfStore(storageRoot); + const index = loadAllThreads(uwfForIndex.varStore); const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - // Move thread to history with reason null (legacy format) - const { saveThreadsIndex } = await import("../store.js"); - const newIndex = { ...index }; - delete newIndex[threadId]; - await saveThreadsIndex(storageRoot, newIndex); + deleteThread(uwfForIndex.varStore, threadId); await appendThreadHistory(storageRoot, { thread: threadId, diff --git a/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts b/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts index 6c8bd2d..3ccd791 100644 --- a/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts @@ -5,7 +5,7 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol"; import { cmdThreadStart } from "../commands/thread.js"; -import { createUwfStore, loadThreadsIndex } from "../store.js"; +import { createUwfStore, getThread } from "../store.js"; describe("thread start --cwd CLI option", () => { let tmpDir: string; @@ -74,8 +74,8 @@ graph: async function getStartNodeCwd(threadId: string): Promise { const uwf = await createUwfStore(storageRoot); - const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId as ThreadId]!.head; + const entry = getThread(uwf.varStore, threadId as ThreadId); + const headHash = entry!.head; expect(headHash).toBeDefined(); const startNode = uwf.store.get(headHash as CasRef); diff --git a/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts b/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts index 7fe0172..8982426 100644 --- a/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts @@ -1,15 +1,14 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { execFileSync } from "node:child_process"; -import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { putSchema } from "@ocas/core"; import { createFsStore } from "@ocas/fs"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; -import { parse } from "yaml"; import { cmdThreadShow } from "../commands/thread.js"; import { registerUwfSchemas } from "../schemas.js"; -import { saveThreadsIndex } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; const OUTPUT_SCHEMA = { type: "object" as const, @@ -76,7 +75,7 @@ describe("suspend step CAS chain and threads.yaml metadata", () => { }); const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: startHash }); + await seedThreads(tmpDir, { [threadId]: startHash }); const outputHash = await store.put(outputSchemaHash, { $status: "needs_input", @@ -155,9 +154,9 @@ describe("suspend step CAS chain and threads.yaml metadata", () => { question: "Which API?", }); - const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); - const threadsIndex = parse(threadsYaml) as Record; - const threadEntry = threadsIndex[threadId]; + const { createUwfStore, getThread } = await import("../store.js"); + const uwf = await createUwfStore(tmpDir); + const threadEntry = getThread(uwf.varStore, threadId); expect(threadEntry).toEqual({ head: stepHash, suspendedRole: "worker", diff --git a/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts index 0979163..cd5b82f 100644 --- a/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-suspended-display.test.ts @@ -6,7 +6,8 @@ import { putSchema } from "@ocas/core"; import type { ThreadId } from "@united-workforce/protocol"; import { createThreadIndexEntry, markThreadSuspended } from "@united-workforce/protocol"; import { cmdThreadList, cmdThreadShow } from "../commands/thread.js"; -import { createUwfStore, saveThreadsIndex } from "../store.js"; +import { createUwfStore } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; const OUTPUT_SCHEMA = { type: "object" as const, @@ -109,7 +110,7 @@ describe("suspended thread display", () => { }); const idleEntry = createThreadIndexEntry(idleStartHash); - await saveThreadsIndex(tmpDir, { + await seedThreads(tmpDir, { [suspendedThreadId]: suspendedEntry, [idleThreadId]: idleEntry, }); @@ -205,7 +206,7 @@ describe("suspended thread display", () => { "Need clarification: Which database to use?", ); - await saveThreadsIndex(tmpDir, { [threadId]: suspendedEntry }); + await seedThreads(tmpDir, { [threadId]: suspendedEntry }); // Test thread show const showResult = await cmdThreadShow(tmpDir, threadId); @@ -258,7 +259,7 @@ describe("suspended thread display", () => { }); const threadId = "01NORMALTHREAD000000000" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: createThreadIndexEntry(startHash) }); + await seedThreads(tmpDir, { [threadId]: createThreadIndexEntry(startHash) }); // Test thread show const showResult = await cmdThreadShow(tmpDir, threadId); diff --git a/packages/cli-workflow/src/__tests__/thread-test-helpers.ts b/packages/cli-workflow/src/__tests__/thread-test-helpers.ts new file mode 100644 index 0000000..cd1bdc8 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-test-helpers.ts @@ -0,0 +1,37 @@ +import type { CasRef, ThreadId, ThreadIndexEntry } from "@united-workforce/protocol"; +import { createThreadIndexEntry } from "@united-workforce/protocol"; +import { createUwfStore, setThread } from "../store.js"; + +async function ensureHeadInCas( + uwf: Awaited>, + head: CasRef, + threadId: ThreadId, +): Promise { + if (uwf.store.get(head) !== null) { + return head; + } + return (await uwf.store.put(uwf.schemas.text, `thread-head:${threadId}:${head}`)) as CasRef; +} + +export async function seedThread( + storageRoot: string, + threadId: ThreadId, + entry: ThreadIndexEntry | CasRef, +): Promise { + const uwf = await createUwfStore(storageRoot); + const normalized = typeof entry === "string" ? createThreadIndexEntry(entry) : entry; + const head = await ensureHeadInCas(uwf, normalized.head, threadId); + setThread(uwf.varStore, threadId, { ...normalized, head }); +} + +export async function seedThreads( + storageRoot: string, + entries: Record, +): Promise { + const uwf = await createUwfStore(storageRoot); + for (const [threadId, entry] of Object.entries(entries)) { + const normalized = typeof entry === "string" ? createThreadIndexEntry(entry as CasRef) : entry; + const head = await ensureHeadInCas(uwf, normalized.head, threadId as ThreadId); + setThread(uwf.varStore, threadId as ThreadId, { ...normalized, head }); + } +} diff --git a/packages/cli-workflow/src/__tests__/thread.test.ts b/packages/cli-workflow/src/__tests__/thread.test.ts index c11bf14..0c598ba 100644 --- a/packages/cli-workflow/src/__tests__/thread.test.ts +++ b/packages/cli-workflow/src/__tests__/thread.test.ts @@ -12,7 +12,8 @@ import { THREAD_READ_DEFAULT_QUOTA, } from "../commands/thread.js"; import type { UwfStore } from "../store.js"; -import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js"; +import { appendThreadHistory, createUwfStore } from "../store.js"; +import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -258,7 +259,7 @@ describe("cmdThreadRead section", () => { }); const threadId = "01JTEST0000000000000000001" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -303,7 +304,7 @@ describe("cmdThreadRead section", () => { }); const threadId = "01JTEST0000000000000000002" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -438,7 +439,7 @@ describe("cmdThreadRead deduplication", () => { const uwf = await makeUwfStore(tmpDir); const headHash = await makeThreadWithRoles(uwf, ["writer", "writer"]); const threadId = "01JTEST0000000000000003" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: headHash }); + await seedThreads(tmpDir, { [threadId]: headHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const count = (markdown.match(//g) ?? []).length; @@ -449,7 +450,7 @@ describe("cmdThreadRead deduplication", () => { const uwf = await makeUwfStore(tmpDir); const headHash = await makeThreadWithRoles(uwf, ["planner", "coder"]); const threadId = "01JTEST0000000000000004" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: headHash }); + await seedThreads(tmpDir, { [threadId]: headHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const count = (markdown.match(//g) ?? []).length; @@ -460,7 +461,7 @@ describe("cmdThreadRead deduplication", () => { const uwf = await makeUwfStore(tmpDir); const headHash = await makeThreadWithRoles(uwf, ["roleA", "roleB", "roleA"]); const threadId = "01JTEST0000000000000005" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: headHash }); + await seedThreads(tmpDir, { [threadId]: headHash }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const count = (markdown.match(//g) ?? []).length; @@ -528,7 +529,7 @@ describe("cmdThreadRead start section / before / quota", () => { const uwf = await makeUwfStore(tmpDir); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const threadId = "01JTEST0000000000000006" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); + await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true); expect(markdown).toContain("# Thread"); @@ -540,7 +541,7 @@ describe("cmdThreadRead start section / before / quota", () => { const uwf = await makeUwfStore(tmpDir); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const threadId = "01JTEST0000000000000007" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); + await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); // When before=null, the start section is always shown regardless of showStart const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -553,7 +554,7 @@ describe("cmdThreadRead start section / before / quota", () => { const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]); const [_hashA, hashB, hashC] = stepHashes as [CasRef, CasRef, CasRef]; const threadId = "01JTEST0000000000000008" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: hashC }); + await seedThreads(tmpDir, { [threadId]: hashC }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, hashB, false); expect(markdown).toContain("roleA"); @@ -565,7 +566,7 @@ describe("cmdThreadRead start section / before / quota", () => { const uwf = await makeUwfStore(tmpDir); const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]); const threadId = "01JTEST000000000000000A" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); + await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); expect(markdown).toContain("earlier step"); @@ -575,7 +576,7 @@ describe("cmdThreadRead start section / before / quota", () => { const uwf = await makeUwfStore(tmpDir); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const threadId = "01JTEST000000000000000B" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[0]! }); + await seedThreads(tmpDir, { [threadId]: stepHashes[0]! }); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); expect(markdown).not.toContain("earlier step"); @@ -627,7 +628,7 @@ describe("cmdStepShow (process.exit tests - must be last)", () => { detail: null, agent: "uwf-test", }); - await saveThreadsIndex(tmpDir, { ["01JTEST000000000000000C" as ThreadId]: stepHash as CasRef }); + await seedThreads(tmpDir, { ["01JTEST000000000000000C" as ThreadId]: stepHash as CasRef }); await expect( cmdThreadRead( @@ -692,7 +693,7 @@ describe("cmdStepList with completed threads", () => { }); const threadId = "01JTEST0000000000000000A1" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: step3Hash }); + await seedThreads(tmpDir, { [threadId]: step3Hash }); const result = await cmdStepList(tmpDir, threadId); @@ -744,7 +745,6 @@ describe("cmdStepList with completed threads", () => { const threadId = "01JTEST0000000000000000A2" as ThreadId; // Thread is NOT in threads.yaml (simulating completed thread) - await saveThreadsIndex(tmpDir, {}); // But it IS in history.jsonl await appendThreadHistory(tmpDir, { thread: threadId, @@ -812,7 +812,7 @@ describe("cmdStepShow with completed threads", () => { }); const threadId = "01JTEST0000000000000000B1" as ThreadId; - await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + await seedThreads(tmpDir, { [threadId]: stepHash }); const result = await cmdStepShow(tmpDir, stepHash); @@ -873,7 +873,6 @@ describe("cmdStepShow with completed threads", () => { const threadId = "01JTEST0000000000000000B2" as ThreadId; // Thread is NOT in threads.yaml - await saveThreadsIndex(tmpDir, {}); // But it IS in history.jsonl await appendThreadHistory(tmpDir, { thread: threadId, @@ -937,7 +936,6 @@ describe("cmdThreadRead with completed threads", () => { const threadId = "01JTEST0000000000000000C1" as ThreadId; // Thread is NOT in threads.yaml - await saveThreadsIndex(tmpDir, {}); // But it IS in history.jsonl await appendThreadHistory(tmpDir, { thread: threadId, @@ -1001,7 +999,6 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C2" as ThreadId; - await saveThreadsIndex(tmpDir, {}); await appendThreadHistory(tmpDir, { thread: threadId, workflow: workflowHash, diff --git a/packages/cli-workflow/src/commands/shared.ts b/packages/cli-workflow/src/commands/shared.ts index 3fa9364..f53ee6c 100644 --- a/packages/cli-workflow/src/commands/shared.ts +++ b/packages/cli-workflow/src/commands/shared.ts @@ -6,7 +6,7 @@ import type { StepNodePayload, ThreadId, } from "@united-workforce/protocol"; -import { findThreadInHistory, loadThreadsIndex, type UwfStore } from "../store.js"; +import { createUwfStore, findThreadInHistory, getThread, type UwfStore } from "../store.js"; type ChainState = { startHash: CasRef; @@ -202,10 +202,10 @@ function collectOrderedSteps( } async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { - const index = await loadThreadsIndex(storageRoot); - const activeHead = index[threadId]?.head; - if (activeHead !== undefined) { - return activeHead; + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry !== null) { + return entry.head; } const hist = await findThreadInHistory(storageRoot, threadId); if (hist !== null) { diff --git a/packages/cli-workflow/src/commands/step.ts b/packages/cli-workflow/src/commands/step.ts index 2c650bf..d1e5afa 100644 --- a/packages/cli-workflow/src/commands/step.ts +++ b/packages/cli-workflow/src/commands/step.ts @@ -9,7 +9,7 @@ import type { ThreadStepsOutput, } from "@united-workforce/protocol"; import { generateUlid } from "@united-workforce/util"; -import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js"; +import { createUwfStore, setThread } from "../store.js"; import { collectOrderedSteps, expandDeep, @@ -112,9 +112,11 @@ export async function cmdStepFork( } const newThreadId = generateUlid(Date.now()) as ThreadId; - const index = await loadThreadsIndex(storageRoot); - index[newThreadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; - await saveThreadsIndex(storageRoot, index); + setThread(uwf.varStore, newThreadId, { + head: stepHash, + suspendedRole: null, + suspendMessage: null, + }); return { thread: newThreadId, diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index bd4eaf9..ed7e417 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -1,6 +1,7 @@ import { execFileSync, spawn } from "node:child_process"; import { access, readFile } from "node:fs/promises"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; +import type { VariableStore } from "@ocas/core"; import { validate } from "@ocas/core"; import type { AgentAlias, @@ -39,12 +40,14 @@ import { evaluate, isSuspendResult } from "../moderator/index.js"; import { appendThreadHistory, createUwfStore, + deleteThread, findThreadInHistory, + getThread, + loadAllThreads, loadThreadHistory, - loadThreadsIndex, loadWorkflowRegistry, resolveWorkflowHash, - saveThreadsIndex, + setThread, type ThreadHistoryLine, type UwfStore, } from "../store.js"; @@ -136,7 +139,7 @@ function resolveSuspendFieldsForShow( } async function ensureThreadSuspendMetadata( - storageRoot: string, + varStore: VariableStore, threadId: ThreadId, entry: ThreadIndexEntry, suspendedRole: string, @@ -146,9 +149,7 @@ async function ensureThreadSuspendMetadata( return entry; } const updated = markThreadSuspended(entry, suspendedRole, suspendMessage); - const index = await loadThreadsIndex(storageRoot); - index[threadId] = updated; - await saveThreadsIndex(storageRoot, index); + setThread(varStore, threadId, updated); return updated; } @@ -467,9 +468,7 @@ export async function cmdThreadStart( fail("stored StartNode failed schema validation"); } - const index = await loadThreadsIndex(storageRoot); - index[threadId] = createThreadIndexEntry(headHash); - await saveThreadsIndex(storageRoot, index); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); plog.log( PL_THREAD_START, @@ -484,11 +483,10 @@ export async function cmdThreadShow( storageRoot: string, threadId: ThreadId, ): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry !== undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry !== null) { const activeHead = entry.head; - const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, activeHead); if (workflow === null) { fail(`failed to resolve workflow from head: ${activeHead}`); @@ -661,7 +659,7 @@ export async function cmdThreadList( take: number | null, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = await loadThreadsIndex(storageRoot); + const index = loadAllThreads(uwf.varStore); // Collect active threads let items = await collectActiveThreads(storageRoot, uwf, index); @@ -1038,14 +1036,13 @@ function spawnAgent( } async function archiveThread( + uwf: UwfStore, storageRoot: string, threadId: ThreadId, workflow: CasRef, head: CasRef, ): Promise { - const index = await loadThreadsIndex(storageRoot); - delete index[threadId]; - await saveThreadsIndex(storageRoot, index); + deleteThread(uwf.varStore, threadId); await appendThreadHistory(storageRoot, { thread: threadId, workflow, @@ -1066,13 +1063,12 @@ export async function cmdThreadResume( fail(`thread already executing in background (PID: ${runningMarker.pid})`); } - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { fail(`thread not active: ${threadId}`); } - const uwf = await createUwfStore(storageRoot); const headHash = entry.head; const chain = walkChain(uwf, headHash); const workflowHash = chain.start.workflow; @@ -1179,12 +1175,11 @@ async function resolveActiveThreadWorkflowHash( storageRoot: string, threadId: ThreadId, ): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { fail(`thread not active: ${threadId}`); } - const uwf = await createUwfStore(storageRoot); const chain = walkChain(uwf, entry.head); return chain.start.workflow; } @@ -1197,16 +1192,13 @@ async function cmdThreadStepBackground( plog: ProcessLogger, workflowHash: CasRef, ): Promise { - // Get current head to return to caller - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { failStep(plog, `thread not active: ${threadId}`); } const headHash = entry.head; - const uwf = await createUwfStore(storageRoot); - // Spawn detached background process const scriptPath = process.argv[1]; if (scriptPath === undefined) { @@ -1292,7 +1284,7 @@ async function resolveModeratorStepTarget( if (isSuspendResult(nextResult.value)) { await ensureThreadSuspendMetadata( - storageRoot, + uwf.varStore, threadId, entry, nextResult.value.suspendedRole, @@ -1310,7 +1302,7 @@ async function resolveModeratorStepTarget( if (nextResult.value.role === END_ROLE) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null); - await archiveThread(storageRoot, threadId, workflowHash, headHash); + await archiveThread(uwf, storageRoot, threadId, workflowHash, headHash); return { workflow: workflowHash, thread: threadId, @@ -1340,10 +1332,8 @@ async function finalizeAgentStep( uwfAfter: UwfStore, plog: ProcessLogger, ): Promise { - const freshIndex = await loadThreadsIndex(storageRoot); - const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead); - freshIndex[threadId] = updateThreadHead(priorEntry, newHead); - await saveThreadsIndex(storageRoot, freshIndex); + const priorEntry = getThread(uwfAfter.varStore, threadId) ?? createThreadIndexEntry(newHead); + setThread(uwfAfter.varStore, threadId, updateThreadHead(priorEntry, newHead)); const chainAfter = walkChain(uwfAfter, newHead); const { lastRole: lastRoleAfter, lastOutput: lastOutputAfter } = resolveEvaluateArgs( @@ -1356,12 +1346,15 @@ async function finalizeAgentStep( } if (isSuspendResult(afterResult.value)) { - freshIndex[threadId] = markThreadSuspended( - freshIndex[threadId] ?? createThreadIndexEntry(newHead), - afterResult.value.suspendedRole, - afterResult.value.prompt, + setThread( + uwfAfter.varStore, + threadId, + markThreadSuspended( + getThread(uwfAfter.varStore, threadId) ?? createThreadIndexEntry(newHead), + afterResult.value.suspendedRole, + afterResult.value.prompt, + ), ); - await saveThreadsIndex(storageRoot, freshIndex); return buildStepOutputFromEvaluation( workflowHash, threadId, @@ -1375,7 +1368,7 @@ async function finalizeAgentStep( const done = afterResult.value.role === END_ROLE; if (done) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null); - await archiveThread(storageRoot, threadId, workflowHash, newHead); + await archiveThread(uwfAfter, storageRoot, threadId, workflowHash, newHead); } const status: ThreadStatus = done ? "completed" : "idle"; @@ -1401,14 +1394,13 @@ async function cmdThreadStepOnce( plog: ProcessLogger, resume: ResumeStepConfig | null = null, ): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { failStep(plog, `thread not active: ${threadId}`); } const headHash = entry.head; - const uwf = await createUwfStore(storageRoot); const chain = walkChain(uwf, headHash); const workflowHash = chain.start.workflow; const workflow = loadWorkflowPayload(uwf, workflowHash); @@ -1459,10 +1451,10 @@ async function cmdThreadStepOnce( } async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { - const index = await loadThreadsIndex(storageRoot); - const activeHead = index[threadId]?.head; - if (activeHead !== undefined) { - return activeHead; + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry !== null) { + return entry.head; } const hist = await findThreadInHistory(storageRoot, threadId); if (hist !== null) { @@ -1512,9 +1504,9 @@ export type CancelOutput = { * Stop background execution of a thread (but keep thread active) */ export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { fail(`thread not active: ${threadId}`); } @@ -1542,9 +1534,9 @@ export async function cmdThreadCancel( storageRoot: string, threadId: ThreadId, ): Promise { - const index = await loadThreadsIndex(storageRoot); - const entry = index[threadId]; - if (entry === undefined) { + const uwf = await createUwfStore(storageRoot); + const entry = getThread(uwf.varStore, threadId); + if (entry === null) { fail(`thread not active: ${threadId}`); } const head = entry.head; @@ -1560,14 +1552,12 @@ export async function cmdThreadCancel( await deleteMarker(storageRoot, threadId); } - const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, head); if (workflow === null) { fail(`failed to resolve workflow from head: ${head}`); } - delete index[threadId]; - await saveThreadsIndex(storageRoot, index); + deleteThread(uwf.varStore, threadId); const historyEntry: ThreadHistoryLine = { thread: threadId, diff --git a/packages/cli-workflow/src/store.ts b/packages/cli-workflow/src/store.ts index 3483692..485bdc5 100644 --- a/packages/cli-workflow/src/store.ts +++ b/packages/cli-workflow/src/store.ts @@ -1,6 +1,6 @@ import type { Dirent } from "node:fs"; import { existsSync, symlinkSync } from "node:fs"; -import { access, appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises"; +import { access, appendFile, mkdir, readdir, readFile, rename } from "node:fs/promises"; import { homedir } from "node:os"; import { join } from "node:path"; @@ -14,12 +14,8 @@ import type { ThreadListItem, ThreadsIndex, } from "@united-workforce/protocol"; -import { - createThreadIndexEntry, - parseThreadsIndex, - serializeThreadsIndex, -} from "@united-workforce/protocol"; -import { parse, stringify } from "yaml"; +import { parseThreadsIndex } from "@united-workforce/protocol"; +import { parse } from "yaml"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; @@ -28,6 +24,9 @@ export type WorkflowRegistry = Record; /** Variable name prefix for workflow registry entries (`@uwf/registry/`). */ export const REGISTRY_VAR_PREFIX = "@uwf/registry/"; +/** Variable name prefix for active thread entries (`@uwf/thread/`). */ +export const THREAD_VAR_PREFIX = "@uwf/thread/"; + /** A workflow entry discovered from the project-local .workflows/ directory. */ export type ProjectWorkflowEntry = { /** Workflow name (from YAML `name` field, equals filename stem). */ @@ -214,6 +213,7 @@ export async function createUwfStore(storageRoot: string): Promise { const schemas = await registerUwfSchemas(store); const varStore = createVariableStore(join(casDir, "variables.db"), store); await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); + await migrateThreadsIndexIfNeeded(storageRoot, varStore); return { storageRoot, store, schemas, varStore }; } @@ -294,7 +294,7 @@ export function findRegistryName(registry: WorkflowRegistry, hash: Hash): string return null; } -export async function loadThreadsIndex(storageRoot: string): Promise { +async function loadThreadsIndexFromYaml(storageRoot: string): Promise { const path = getThreadsPath(storageRoot); try { const text = await readFile(path, "utf8"); @@ -309,26 +309,79 @@ export async function loadThreadsIndex(storageRoot: string): Promise; - -function normalizeThreadsIndexInput(index: ThreadsIndexInput): ThreadsIndex { - const normalized: ThreadsIndex = {}; - for (const [threadId, value] of Object.entries(index)) { - normalized[threadId as ThreadId] = - typeof value === "string" ? createThreadIndexEntry(value as CasRef) : value; - } - return normalized; -} - -export async function saveThreadsIndex( +/** One-time migration: `~/.uwf/threads.yaml` → `@uwf/thread/*` variables. */ +export async function migrateThreadsIndexIfNeeded( storageRoot: string, - index: ThreadsIndexInput, + varStore: VariableStore, ): Promise { const path = getThreadsPath(storageRoot); - await mkdir(storageRoot, { recursive: true }); - const text = stringify(serializeThreadsIndex(normalizeThreadsIndexInput(index)), { indent: 2 }); - await writeFile(path, text, "utf8"); + if (!existsSync(path)) { + return; + } + + const index = await loadThreadsIndexFromYaml(storageRoot); + for (const [threadId, entry] of Object.entries(index)) { + setThread(varStore, threadId as ThreadId, entry); + } + + await rename(path, `${path}.migrated`); +} + +function threadVarName(threadId: ThreadId): string { + return `${THREAD_VAR_PREFIX}${threadId}`; +} + +function entryFromVariable(v: { value: string; tags: Record }): ThreadIndexEntry { + return { + head: v.value as CasRef, + suspendedRole: v.tags.suspendedRole ?? null, + suspendMessage: v.tags.suspendMessage ?? null, + }; +} + +/** Load all active threads (equivalent to legacy `loadThreadsIndex`). */ +export function loadAllThreads(varStore: VariableStore): ThreadsIndex { + const vars = varStore.list({ namePrefix: THREAD_VAR_PREFIX }); + const index: ThreadsIndex = {}; + for (const v of vars) { + const threadId = v.name.slice(THREAD_VAR_PREFIX.length) as ThreadId; + index[threadId] = entryFromVariable(v); + } + return index; +} + +/** Get a single active thread entry, or null if not found. */ +export function getThread(varStore: VariableStore, threadId: ThreadId): ThreadIndexEntry | null { + const vars = varStore.list({ exactName: threadVarName(threadId) }); + const v = vars[0]; + if (v === undefined) { + return null; + } + return entryFromVariable(v); +} + +/** Set or update a single active thread entry. */ +export function setThread( + varStore: VariableStore, + threadId: ThreadId, + entry: ThreadIndexEntry, +): void { + const name = threadVarName(threadId); + // Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first. + varStore.remove(name); + const tags: Record = {}; + if (entry.suspendedRole !== null) { + tags.suspendedRole = entry.suspendedRole; + } + if (entry.suspendMessage !== null) { + tags.suspendMessage = entry.suspendMessage; + } + varStore.set(name, entry.head, { tags }); +} + +/** Remove an active thread entry (on complete/cancel). */ +export function deleteThread(varStore: VariableStore, threadId: ThreadId): void { + varStore.remove(threadVarName(threadId)); } export async function loadThreadHistory(storageRoot: string): Promise { diff --git a/packages/workflow-util-agent/src/context.ts b/packages/workflow-util-agent/src/context.ts index c021cc9..9717318 100644 --- a/packages/workflow-util-agent/src/context.ts +++ b/packages/workflow-util-agent/src/context.ts @@ -7,7 +7,7 @@ import type { ThreadId, } from "@united-workforce/protocol"; import type { AgentStore } from "./storage.js"; -import { createAgentStore, loadThreadsIndex, resolveStorageRoot } from "./storage.js"; +import { createAgentStore, getActiveThreadEntry, resolveStorageRoot } from "./storage.js"; import type { AgentContext } from "./types.js"; type ChainState = { @@ -162,13 +162,12 @@ export async function buildContext( const agentStore = await createAgentStore(storageRoot); const { store, schemas } = agentStore; - const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]?.head; - if (headHash === undefined) { - fail(`thread not found in threads.yaml: ${threadId}`); + const entry = await getActiveThreadEntry(storageRoot, threadId); + if (entry === null) { + fail(`thread not found in active thread index: ${threadId}`); } - const chain = walkChain(store, schemas, headHash); + const chain = walkChain(store, schemas, entry.head); const workflow = await loadWorkflow(store, schemas, chain.start.workflow); const roleDef = workflow.roles[role]; if (roleDef === undefined) { @@ -211,13 +210,12 @@ export async function buildContextWithMeta( const agentStore = await createAgentStore(storageRoot); const { store, schemas } = agentStore; - const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]?.head; - if (headHash === undefined) { - fail(`thread not found in threads.yaml: ${threadId}`); + const entry = await getActiveThreadEntry(storageRoot, threadId); + if (entry === null) { + fail(`thread not found in active thread index: ${threadId}`); } - const chain = walkChain(store, schemas, headHash); + const chain = walkChain(store, schemas, entry.head); const workflow = await loadWorkflow(store, schemas, chain.start.workflow); const roleDef = workflow.roles[role]; if (roleDef === undefined) { @@ -237,6 +235,6 @@ export async function buildContextWithMeta( outputFormatInstruction: "", edgePrompt, isFirstVisit, - meta: { storageRoot, store, schemas, headHash, chain }, + meta: { storageRoot, store, schemas, headHash: entry.head, chain }, }; } diff --git a/packages/workflow-util-agent/src/storage.ts b/packages/workflow-util-agent/src/storage.ts index 386562b..6dbb214 100644 --- a/packages/workflow-util-agent/src/storage.ts +++ b/packages/workflow-util-agent/src/storage.ts @@ -2,21 +2,22 @@ import { readFile } from "node:fs/promises"; import { homedir } from "node:os"; import { join } from "node:path"; -import type { Store } from "@ocas/core"; +import { createVariableStore, type Store } from "@ocas/core"; import { createFsStore } from "@ocas/fs"; import type { AgentAlias, AgentConfig, + CasRef, ModelAlias, ModelConfig, ProviderAlias, ProviderConfig, Scenario, - ThreadsIndex, + ThreadId, + ThreadIndexEntry, WorkflowConfig, WorkflowName, } from "@united-workforce/protocol"; -import { parseThreadsIndex } from "@united-workforce/protocol"; import { parse } from "yaml"; import { registerAgentSchemas } from "./schemas.js"; @@ -58,8 +59,46 @@ export function getEnvPath(storageRoot: string): string { return join(storageRoot, ".env"); } -export function getThreadsPath(storageRoot: string): string { - return join(storageRoot, "threads.yaml"); +const THREAD_VAR_PREFIX = "@uwf/thread/"; + +/** + * Global CAS directory (same as uwf CLI). + * Priority: `OCAS_DIR` → `UNCAGED_CAS_DIR` (legacy) → default ~/.ocas + */ +export function getGlobalCasDir(): string { + const primary = process.env.OCAS_DIR; + if (primary !== undefined && primary !== "") { + return primary; + } + const legacy = process.env.UNCAGED_CAS_DIR; + if (legacy !== undefined && legacy !== "") { + return legacy; + } + return join(homedir(), ".ocas"); +} + +function threadVarName(threadId: ThreadId): string { + return `${THREAD_VAR_PREFIX}${threadId}`; +} + +/** Read active thread head + suspend metadata from ocas variable store. */ +export async function getActiveThreadEntry( + _storageRoot: string, + threadId: ThreadId, +): Promise { + const casDir = getGlobalCasDir(); + const store = createFsStore(casDir); + const varStore = createVariableStore(join(casDir, "variables.db"), store); + const vars = varStore.list({ exactName: threadVarName(threadId) }); + const v = vars[0]; + if (v === undefined) { + return null; + } + return { + head: v.value as CasRef, + suspendedRole: v.tags.suspendedRole ?? null, + suspendMessage: v.tags.suspendMessage ?? null, + }; } export type AgentStore = { @@ -205,18 +244,3 @@ export async function loadWorkflowConfig(storageRoot: string): Promise { - const path = getThreadsPath(storageRoot); - try { - const text = await readFile(path, "utf8"); - const raw = parse(text) as unknown; - return parseThreadsIndex(raw); - } catch (e) { - const err = e as NodeJS.ErrnoException; - if (err.code === "ENOENT") { - return {}; - } - throw e; - } -} -- 2.43.0