diff --git a/packages/cli-workflow/src/__tests__/current-role.test.ts b/packages/cli-workflow/src/__tests__/current-role.test.ts index 2b75083..3f9efd6 100644 --- a/packages/cli-workflow/src/__tests__/current-role.test.ts +++ b/packages/cli-workflow/src/__tests__/current-role.test.ts @@ -7,7 +7,7 @@ import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - appendThreadHistory, + addHistoryEntry, createUwfStore, deleteThread, loadAllThreads, @@ -288,7 +288,7 @@ describe("currentRole field", () => { const uwfForIndex = await createUwfStore(storageRoot); const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; deleteThread(uwfForIndex.varStore, tid); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: tid, workflow, head, @@ -316,7 +316,7 @@ describe("currentRole field", () => { const uwfForIndex = await createUwfStore(storageRoot); const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; deleteThread(uwfForIndex.varStore, tid); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: tid, workflow, head, @@ -377,7 +377,7 @@ describe("currentRole field", () => { const uwfForIndex = await createUwfStore(storageRoot); const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head; deleteThread(uwfForIndex.varStore, compId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: compId, workflow: comp.workflow, head: compHead, diff --git a/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts b/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts index 81784a6..39dc1d0 100644 --- a/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts +++ b/packages/cli-workflow/src/__tests__/resolve-head-hash.test.ts @@ -1,15 +1,18 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { mkdtemp, rm } from "node:fs/promises"; +import { mkdir, mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { resolveHeadHash } from "../commands/shared.js"; -import { appendThreadHistory } from "../store.js"; +import { addHistoryEntry, createUwfStore, setThread } from "../store.js"; let tmpDir: string; beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-resolve-head-")); + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + process.env.UNCAGED_CAS_DIR = casDir; }); afterEach(async () => { @@ -19,7 +22,6 @@ afterEach(async () => { describe("resolveHeadHash", () => { test("returns head hash from variable store for active thread", async () => { const threadId = "01JTEST0000000000000000001" as ThreadId; - const { createUwfStore, setThread } = await import("../store.js"); const uwf = await createUwfStore(tmpDir); const headHash = (await uwf.store.put(uwf.schemas.text, "active")) as CasRef; setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash as CasRef)); @@ -29,13 +31,13 @@ describe("resolveHeadHash", () => { expect(result).toBe(headHash); }); - test("falls back to history.jsonl when thread not in threads.yaml", async () => { + test("falls back to history variable when thread not in active index", async () => { const threadId = "01JTEST0000000000000000002" as ThreadId; - const headHash = "completed_hash_456" as CasRef; const workflowHash = "workflow_hash_789" as CasRef; - // No entry in threads.yaml, only in history.jsonl - await appendThreadHistory(tmpDir, { + const uwf = await createUwfStore(tmpDir); + const headHash = (await uwf.store.put(uwf.schemas.text, "completed-head")) as CasRef; + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: headHash, @@ -54,15 +56,13 @@ describe("resolveHeadHash", () => { test("prioritizes active thread over history when thread exists in both", async () => { const threadId = "01JTEST0000000000000000004" as ThreadId; - const historicalHash = "historical_hash_v1" as CasRef; const workflowHash = "workflow_hash_xyz" as CasRef; - const { createUwfStore, setThread } = await import("../store.js"); - const { createThreadIndexEntry } = await import("@united-workforce/protocol"); const uwf = await createUwfStore(tmpDir); const activeHead = (await uwf.store.put(uwf.schemas.text, "active-v2")) as CasRef; + const historicalHash = (await uwf.store.put(uwf.schemas.text, "historical-v1")) as CasRef; setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead)); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: historicalHash, @@ -80,25 +80,26 @@ describe("resolveHeadHash", () => { const threadId1 = "01JTEST0000000000000000005" as ThreadId; const threadId2 = "01JTEST0000000000000000006" as ThreadId; const threadId3 = "01JTEST0000000000000000007" as ThreadId; - const hash1 = "hash_thread1" as CasRef; - const hash2 = "hash_thread2" as CasRef; - const hash3 = "hash_thread3" as CasRef; const workflowHash = "workflow_hash_abc" as CasRef; - await appendThreadHistory(tmpDir, { + const uwf = await createUwfStore(tmpDir); + const hash1 = (await uwf.store.put(uwf.schemas.text, "hash-thread1")) as CasRef; + const hash2 = (await uwf.store.put(uwf.schemas.text, "hash-thread2")) as CasRef; + const hash3 = (await uwf.store.put(uwf.schemas.text, "hash-thread3")) as CasRef; + addHistoryEntry(uwf.varStore, { thread: threadId1, workflow: workflowHash, head: hash1, completedAt: Date.now() - 2000, reason: null, }); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId2, workflow: workflowHash, head: hash2, completedAt: Date.now() - 1000, reason: null, }); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId3, workflow: workflowHash, head: hash3, diff --git a/packages/cli-workflow/src/__tests__/store-global-cas.test.ts b/packages/cli-workflow/src/__tests__/store-global-cas.test.ts index 5a6e3ca..d65de14 100644 --- a/packages/cli-workflow/src/__tests__/store-global-cas.test.ts +++ b/packages/cli-workflow/src/__tests__/store-global-cas.test.ts @@ -238,35 +238,78 @@ describe("Global CAS directory", () => { await expect(readFile(threadsPath, "utf8")).rejects.toThrow(); }); - test("history remains in storageRoot", async () => { + test("history is stored in global CAS variable store", async () => { const globalCasDir = join(tmpDir, "global-cas"); process.env.UNCAGED_CAS_DIR = globalCasDir; const storageRoot = join(tmpDir, "storage"); await mkdir(storageRoot, { recursive: true }); - await createUwfStore(storageRoot); - - // Write history - const { appendThreadHistory } = await import("../store.js"); - await appendThreadHistory(storageRoot, { - thread: "thread-123" as any, + const uwf = await createUwfStore(storageRoot); + const threadId = "thread-123" as ThreadId; + const headHash = await uwf.store.put(uwf.schemas.text, "history-head"); + const { addHistoryEntry, findHistoryEntry } = await import("../store.js"); + addHistoryEntry(uwf.varStore, { + thread: threadId, workflow: "workflow-456", - head: "hash-789", + head: headHash, completedAt: Date.now(), reason: "completed", }); - // Verify history.jsonl is in storageRoot, not global CAS - const { readFile } = await import("node:fs/promises"); - const historyPath = join(storageRoot, "history.jsonl"); - const content = await readFile(historyPath, "utf8"); - expect(content).toContain("thread-123"); - expect(content).toContain("workflow-456"); + const entry = findHistoryEntry(uwf.varStore, threadId); + expect(entry?.thread).toBe(threadId); + expect(entry?.workflow).toBe("workflow-456"); + expect(entry?.head).toBe(headHash); - // Verify history.jsonl is NOT in global CAS directory - const globalHistoryPath = join(globalCasDir, "history.jsonl"); - await expect(readFile(globalHistoryPath, "utf8")).rejects.toThrow(); + const { access } = await import("node:fs/promises"); + await access(join(globalCasDir, "variables.db")); + + const historyPath = join(storageRoot, "history.jsonl"); + await expect(access(historyPath)).rejects.toThrow(); + }); + + test("migrates history.jsonl to variable store and renames file", async () => { + const globalCasDir = join(tmpDir, "global-cas-history"); + process.env.UNCAGED_CAS_DIR = globalCasDir; + + const storageRoot = join(tmpDir, "storage-history-migrate"); + await mkdir(storageRoot, { recursive: true }); + + const threadId = "01JTEST0000000000000000CD" as ThreadId; + const uwfSeed = await createUwfStore(storageRoot); + const workflowHash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-workflow"); + const headHash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-head"); + const completedAt = 1780410000000; + const { writeFile, access, readFile } = await import("node:fs/promises"); + const historyPath = join(storageRoot, "history.jsonl"); + await writeFile( + historyPath, + `${JSON.stringify({ + thread: threadId, + workflow: workflowHash, + head: headHash, + completedAt, + reason: "cancelled", + })}\n`, + "utf8", + ); + + const uwf = await createUwfStore(storageRoot); + const { findHistoryEntry } = await import("../store.js"); + const entry = findHistoryEntry(uwf.varStore, threadId); + expect(entry).toEqual({ + thread: threadId, + workflow: workflowHash, + head: headHash, + completedAt, + reason: "cancelled", + }); + + await expect(access(historyPath)).rejects.toThrow(); + const migratedContent = await readFile(`${historyPath}.migrated`, "utf8"); + expect(migratedContent).toContain(threadId); + expect(migratedContent).toContain(workflowHash); }); test("CAS nodes are stored in global directory", async () => { diff --git a/packages/cli-workflow/src/__tests__/thread-cancel-status.test.ts b/packages/cli-workflow/src/__tests__/thread-cancel-status.test.ts index 61e53f6..afe4ba4 100644 --- a/packages/cli-workflow/src/__tests__/thread-cancel-status.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-cancel-status.test.ts @@ -1,24 +1,40 @@ import { describe, expect, test } from "bun:test"; -import { mkdtemp } from "node:fs/promises"; +import { mkdir, mkdtemp } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, ThreadId } from "@united-workforce/protocol"; -import { appendThreadHistory, loadThreadHistory } from "../store.js"; +import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js"; + +async function makeUwfStore(storageRoot: string) { + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + process.env.UNCAGED_CAS_DIR = casDir; + return createUwfStore(storageRoot); +} + +async function seedHistoryHead( + uwf: Awaited>, + label: string, +): Promise { + return (await uwf.store.put(uwf.schemas.text, label)) as CasRef; +} describe("thread cancel status", () => { test("cancelled history entry has reason 'cancelled'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL1" as ThreadId; + const uwf = await makeUwfStore(tmpDir); + const head = await seedHistoryHead(uwf, "cancelled-head"); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: "test-workflow", - head: "test-head-hash" as CasRef, + head, completedAt: Date.now(), reason: "cancelled", }); - const history = await loadThreadHistory(tmpDir); + const history = loadAllHistory(uwf.varStore); expect(history).toHaveLength(1); expect(history[0]?.reason).toBe("cancelled"); }); @@ -26,60 +42,66 @@ describe("thread cancel status", () => { test("completed history entry has reason 'completed'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL2" as ThreadId; + const uwf = await makeUwfStore(tmpDir); + const head = await seedHistoryHead(uwf, "completed-head"); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: "test-workflow", - head: "test-head-hash" as CasRef, + head, completedAt: Date.now(), reason: "completed", }); - const history = await loadThreadHistory(tmpDir); + const history = loadAllHistory(uwf.varStore); expect(history).toHaveLength(1); expect(history[0]?.reason).toBe("completed"); }); - test("legacy history entry without reason parses as null", async () => { + test("history entry with null reason is stored as completed", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL3" as ThreadId; + const uwf = await makeUwfStore(tmpDir); + const head = await seedHistoryHead(uwf, "legacy-head"); - // Simulate legacy entry without reason field - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: "test-workflow", - head: "test-head-hash" as CasRef, + head, completedAt: Date.now(), reason: null, }); - const history = await loadThreadHistory(tmpDir); + const history = loadAllHistory(uwf.varStore); expect(history).toHaveLength(1); - expect(history[0]?.reason).toBeNull(); + expect(history[0]?.reason).toBe("completed"); }); test("mixed completed and cancelled entries preserve distinct reasons", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); + const uwf = await makeUwfStore(tmpDir); + const head1 = await seedHistoryHead(uwf, "head1"); + const head2 = await seedHistoryHead(uwf, "head2"); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: "01JTEST000000000000CANCEL4" as ThreadId, workflow: "test-workflow", - head: "head1" as CasRef, + head: head1, completedAt: Date.now(), reason: "completed", }); - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: "01JTEST000000000000CANCEL5" as ThreadId, workflow: "test-workflow", - head: "head2" as CasRef, + head: head2, completedAt: Date.now(), reason: "cancelled", }); - const history = await loadThreadHistory(tmpDir); + const history = loadAllHistory(uwf.varStore); expect(history).toHaveLength(2); - expect(history[0]?.reason).toBe("completed"); - expect(history[1]?.reason).toBe("cancelled"); + const reasons = history.map((entry) => entry.reason).sort(); + expect(reasons).toEqual(["cancelled", "completed"]); }); }); diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts index 0d2ae44..ffe583d 100644 --- a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -10,7 +10,7 @@ import { cmdThreadList } from "../commands/thread.js"; import { parseTimeInput } from "../commands/thread-time-parser.js"; import type { UwfStore } from "../store.js"; import { - appendThreadHistory, + addHistoryEntry, createUwfStore, deleteThread, loadAllThreads, @@ -78,7 +78,7 @@ async function completeThread( ) { const uwfIdx = await createUwfStore(storageRoot); deleteThread(uwfIdx.varStore, threadId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfIdx.varStore, { thread: threadId, workflow: workflowHash, head: headHash, diff --git a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts index e47cabb..50f8447 100644 --- a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts @@ -7,7 +7,7 @@ import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - appendThreadHistory, + addHistoryEntry, createUwfStore, deleteThread, loadAllThreads, @@ -210,7 +210,7 @@ describe("thread show status field", () => { deleteThread(uwfForIndex.varStore, threadId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: threadId, workflow, head, @@ -247,7 +247,7 @@ describe("thread show status field", () => { deleteThread(uwfForIndex.varStore, threadId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: threadId, workflow, head, @@ -284,7 +284,7 @@ describe("thread show status field", () => { deleteThread(uwfForIndex.varStore, threadId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwfForIndex.varStore, { thread: threadId, workflow, head, diff --git a/packages/cli-workflow/src/__tests__/thread.test.ts b/packages/cli-workflow/src/__tests__/thread.test.ts index 0c598ba..1eefdaa 100644 --- a/packages/cli-workflow/src/__tests__/thread.test.ts +++ b/packages/cli-workflow/src/__tests__/thread.test.ts @@ -12,7 +12,7 @@ import { THREAD_READ_DEFAULT_QUOTA, } from "../commands/thread.js"; import type { UwfStore } from "../store.js"; -import { appendThreadHistory, createUwfStore } from "../store.js"; +import { addHistoryEntry, createUwfStore } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -744,9 +744,9 @@ describe("cmdStepList with completed threads", () => { }); const threadId = "01JTEST0000000000000000A2" as ThreadId; - // Thread is NOT in threads.yaml (simulating completed thread) - // But it IS in history.jsonl - await appendThreadHistory(tmpDir, { + // Thread is NOT in active index (simulating completed thread) + // But it IS in history variable store + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: step2Hash, @@ -872,9 +872,9 @@ describe("cmdStepShow with completed threads", () => { }); const threadId = "01JTEST0000000000000000B2" as ThreadId; - // Thread is NOT in threads.yaml - // But it IS in history.jsonl - await appendThreadHistory(tmpDir, { + // Thread is NOT in active index + // But it IS in history variable store + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: stepHash, @@ -935,9 +935,9 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C1" as ThreadId; - // Thread is NOT in threads.yaml - // But it IS in history.jsonl - await appendThreadHistory(tmpDir, { + // Thread is NOT in active index + // But it IS in history variable store + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: stepHash, @@ -999,7 +999,7 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C2" as ThreadId; - await appendThreadHistory(tmpDir, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow: workflowHash, head: step3Hash, diff --git a/packages/cli-workflow/src/commands/shared.ts b/packages/cli-workflow/src/commands/shared.ts index f53ee6c..5da7dca 100644 --- a/packages/cli-workflow/src/commands/shared.ts +++ b/packages/cli-workflow/src/commands/shared.ts @@ -6,7 +6,7 @@ import type { StepNodePayload, ThreadId, } from "@united-workforce/protocol"; -import { createUwfStore, findThreadInHistory, getThread, type UwfStore } from "../store.js"; +import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js"; type ChainState = { startHash: CasRef; @@ -207,7 +207,7 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = await findThreadInHistory(storageRoot, threadId); + const hist = findHistoryEntry(uwf.varStore, threadId); if (hist !== null) { return hist.head; } diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index ed7e417..21f4ca2 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -38,13 +38,13 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index import { createIncludeTag } from "../include.js"; import { evaluate, isSuspendResult } from "../moderator/index.js"; import { - appendThreadHistory, + addHistoryEntry, createUwfStore, deleteThread, - findThreadInHistory, + findHistoryEntry, getThread, + loadAllHistory, loadAllThreads, - loadThreadHistory, loadWorkflowRegistry, resolveWorkflowHash, setThread, @@ -521,7 +521,7 @@ export async function cmdThreadShow( }; } - const hist = await findThreadInHistory(storageRoot, threadId); + const hist = findHistoryEntry(uwf.varStore, threadId); if (hist !== null) { const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed"; @@ -593,12 +593,12 @@ async function collectActiveThreads( return items; } -async function collectCompletedThreads( - storageRoot: string, +function collectCompletedThreads( + varStore: VariableStore, activeIds: Set, -): Promise { +): ThreadListItemWithStatus[] { const items: ThreadListItemWithStatus[] = []; - const history = await loadThreadHistory(storageRoot); + const history = loadAllHistory(varStore); const seen = new Set(); // Deduplication (issue #470) for (const entry of history) { if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { @@ -671,7 +671,7 @@ export async function cmdThreadList( statusFilter.includes("cancelled"); if (includeCompleted) { const activeIds = new Set(items.map((i) => i.thread)); - const completedItems = await collectCompletedThreads(storageRoot, activeIds); + const completedItems = collectCompletedThreads(uwf.varStore, activeIds); items = items.concat(completedItems); } @@ -1035,15 +1035,9 @@ function spawnAgent( return obj as unknown as AdapterOutput; } -async function archiveThread( - uwf: UwfStore, - storageRoot: string, - threadId: ThreadId, - workflow: CasRef, - head: CasRef, -): Promise { +function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void { deleteThread(uwf.varStore, threadId); - await appendThreadHistory(storageRoot, { + addHistoryEntry(uwf.varStore, { thread: threadId, workflow, head, @@ -1302,7 +1296,7 @@ async function resolveModeratorStepTarget( if (nextResult.value.role === END_ROLE) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null); - await archiveThread(uwf, storageRoot, threadId, workflowHash, headHash); + archiveThread(uwf, threadId, workflowHash, headHash); return { workflow: workflowHash, thread: threadId, @@ -1368,7 +1362,7 @@ async function finalizeAgentStep( const done = afterResult.value.role === END_ROLE; if (done) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null); - await archiveThread(uwfAfter, storageRoot, threadId, workflowHash, newHead); + archiveThread(uwfAfter, threadId, workflowHash, newHead); } const status: ThreadStatus = done ? "completed" : "idle"; @@ -1456,7 +1450,7 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = await findThreadInHistory(storageRoot, threadId); + const hist = findHistoryEntry(uwf.varStore, threadId); if (hist !== null) { return hist.head; } @@ -1566,7 +1560,7 @@ export async function cmdThreadCancel( completedAt: Date.now(), reason: "cancelled", }; - await appendThreadHistory(storageRoot, historyEntry); + addHistoryEntry(uwf.varStore, historyEntry); return { thread: threadId, cancelled: true }; } diff --git a/packages/cli-workflow/src/store.ts b/packages/cli-workflow/src/store.ts index 485bdc5..d19daf4 100644 --- a/packages/cli-workflow/src/store.ts +++ b/packages/cli-workflow/src/store.ts @@ -1,6 +1,6 @@ import type { Dirent } from "node:fs"; import { existsSync, symlinkSync } from "node:fs"; -import { access, appendFile, mkdir, readdir, readFile, rename } from "node:fs/promises"; +import { access, mkdir, readdir, readFile, rename } from "node:fs/promises"; import { homedir } from "node:os"; import { join } from "node:path"; @@ -27,6 +27,9 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/"; /** Variable name prefix for active thread entries (`@uwf/thread/`). */ export const THREAD_VAR_PREFIX = "@uwf/thread/"; +/** Variable name prefix for completed/cancelled thread history (`@uwf/history/`). */ +export const HISTORY_VAR_PREFIX = "@uwf/history/"; + /** A workflow entry discovered from the project-local .workflows/ directory. */ export type ProjectWorkflowEntry = { /** Workflow name (from YAML `name` field, equals filename stem). */ @@ -190,10 +193,6 @@ export function getThreadsPath(storageRoot: string): string { return join(storageRoot, "threads.yaml"); } -export function getHistoryPath(storageRoot: string): string { - return join(storageRoot, "history.jsonl"); -} - export type ThreadHistoryLine = ThreadListItem & { completedAt: number; reason: "completed" | "cancelled" | null; @@ -214,6 +213,7 @@ export async function createUwfStore(storageRoot: string): Promise { const varStore = createVariableStore(join(casDir, "variables.db"), store); await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); await migrateThreadsIndexIfNeeded(storageRoot, varStore); + await migrateHistoryIfNeeded(storageRoot, varStore); return { storageRoot, store, schemas, varStore }; } @@ -384,77 +384,100 @@ export function deleteThread(varStore: VariableStore, threadId: ThreadId): void varStore.remove(threadVarName(threadId)); } -export async function loadThreadHistory(storageRoot: string): Promise { - const path = getHistoryPath(storageRoot); +function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { + let raw: unknown; try { - const text = await readFile(path, "utf8"); - const lines: ThreadHistoryLine[] = []; - for (const line of text.split("\n")) { - const trimmed = line.trim(); - if (trimmed === "") { - continue; - } - let raw: unknown; - try { - raw = JSON.parse(trimmed) as unknown; - } catch { - continue; - } - if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { - continue; - } - const rec = raw as Record; - const thread = rec.thread; - const workflow = rec.workflow; - const head = rec.head; - const completedAt = rec.completedAt; - if ( - typeof thread === "string" && - typeof workflow === "string" && - typeof head === "string" && - typeof completedAt === "number" - ) { - const reason = rec.reason; - const parsedReason = reason === "completed" || reason === "cancelled" ? reason : null; - lines.push({ - thread: thread as ThreadId, - workflow, - head, - completedAt, - reason: parsedReason, - }); - } - } - return lines; - } catch (e) { - const err = e as NodeJS.ErrnoException; - if (err.code === "ENOENT") { - return []; - } - throw e; + raw = JSON.parse(trimmed) as unknown; + } catch { + return null; } -} - -export async function findThreadInHistory( - storageRoot: string, - threadId: ThreadId, -): Promise { - const history = await loadThreadHistory(storageRoot); - for (let i = history.length - 1; i >= 0; i--) { - const entry = history[i]; - if (entry !== undefined && entry.thread === threadId) { - return entry; - } + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + return null; + } + const rec = raw as Record; + const thread = rec.thread; + const workflow = rec.workflow; + const head = rec.head; + const completedAt = rec.completedAt; + if ( + typeof thread === "string" && + typeof workflow === "string" && + typeof head === "string" && + typeof completedAt === "number" + ) { + const reason = rec.reason; + const parsedReason = reason === "completed" || reason === "cancelled" ? reason : null; + return { + thread: thread as ThreadId, + workflow, + head, + completedAt, + reason: parsedReason, + }; } return null; } -export async function appendThreadHistory( +/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */ +export async function migrateHistoryIfNeeded( storageRoot: string, - entry: ThreadHistoryLine, + varStore: VariableStore, ): Promise { - const path = getHistoryPath(storageRoot); - await mkdir(storageRoot, { recursive: true }); - const line = `${JSON.stringify(entry)}\n`; - await appendFile(path, line, "utf8"); + const path = join(storageRoot, "history.jsonl"); + if (!existsSync(path)) { + return; + } + + const text = await readFile(path, "utf8"); + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + const entry = parseHistoryJsonlLine(trimmed); + if (entry !== null) { + addHistoryEntry(varStore, entry); + } + } + + await rename(path, `${path}.migrated`); +} + +export function loadAllHistory(varStore: VariableStore): ThreadHistoryLine[] { + const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX }); + return vars.map((v) => ({ + thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId, + workflow: v.tags.workflow ?? "", + head: v.value as CasRef, + completedAt: Number(v.tags.completedAt ?? "0"), + reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, + })); +} + +export function findHistoryEntry( + varStore: VariableStore, + threadId: ThreadId, +): ThreadHistoryLine | null { + const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` }); + const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`); + if (v === undefined) { + return null; + } + return { + thread: threadId, + workflow: v.tags.workflow ?? "", + head: v.value as CasRef, + completedAt: Number(v.tags.completedAt ?? "0"), + reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, + }; +} + +export function addHistoryEntry(varStore: VariableStore, entry: ThreadHistoryLine): void { + varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, { + tags: { + workflow: entry.workflow, + completedAt: String(entry.completedAt), + reason: entry.reason ?? "completed", + }, + }); }