From ca7b68ca5f0841ea7fdc617e545d295f4f0dc89f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Thu, 4 Jun 2026 15:01:20 +0800 Subject: [PATCH] refactor(cli): unify thread storage, remove history prefix - store.ts: all threads in @uwf/thread/* with status tag - Remove HISTORY_VAR_PREFIX, ThreadHistoryLine, deleteThread - Add loadActiveThreads, loadHistoryThreads, completeThread - Add migrateHistoryVarsToThreadVars migration - thread.ts: replace deleteThread+addHistoryEntry with completeThread - shared.ts: remove findHistoryEntry fallback - Update all tests for unified storage model 422 tests pass. Part of #39, closes #41, closes #42 --- .../cli/src/__tests__/current-role.test.ts | 48 ++-- .../src/__tests__/resolve-head-hash.test.ts | 60 ++--- .../src/__tests__/store-global-cas.test.ts | 33 +-- .../__tests__/store-unified-threads.test.ts | 235 ++++++++++++++++++ .../__tests__/thread-cancel-status.test.ts | 137 ++++++---- .../src/__tests__/thread-list-filters.test.ts | 14 +- .../cli/src/__tests__/thread-resume.test.ts | 4 + .../src/__tests__/thread-show-status.test.ts | 41 +-- .../src/__tests__/thread-suspend-step.test.ts | 2 + packages/cli/src/__tests__/thread.test.ts | 51 ++-- packages/cli/src/commands/shared.ts | 6 +- packages/cli/src/commands/step.ts | 2 + packages/cli/src/commands/thread.ts | 133 ++++------ packages/cli/src/store.ts | 137 ++++++---- packages/util-agent/src/storage.ts | 2 + 15 files changed, 558 insertions(+), 347 deletions(-) create mode 100644 packages/cli/src/__tests__/store-unified-threads.test.ts diff --git a/packages/cli/src/__tests__/current-role.test.ts b/packages/cli/src/__tests__/current-role.test.ts index 8b69816..88d3b4f 100644 --- a/packages/cli/src/__tests__/current-role.test.ts +++ b/packages/cli/src/__tests__/current-role.test.ts @@ -7,10 +7,9 @@ import { describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, - loadAllThreads, + loadActiveThreads, setThread, } from "../store.js"; @@ -175,7 +174,7 @@ async function insertStepNode( outputPayload: Record, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = loadAllThreads(uwf.varStore); + const index = loadActiveThreads(uwf.varStore); const headEntry = index[threadId]; if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); const head = headEntry.head; @@ -206,7 +205,13 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); + setThread(uwf.varStore, threadId, { + head: stepHash, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); } describe("currentRole field", () => { @@ -286,15 +291,8 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; - deleteThread(uwfForIndex.varStore, tid); - addHistoryEntry(uwfForIndex.varStore, { - thread: tid, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); + const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head; + completeThread(uwfForIndex.varStore, tid, "completed"); const result = await cmdThreadShow(storageRoot, tid); expect(result.status).toBe("completed"); @@ -314,15 +312,8 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; - deleteThread(uwfForIndex.varStore, tid); - addHistoryEntry(uwfForIndex.varStore, { - thread: tid, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }); + const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head; + completeThread(uwfForIndex.varStore, tid, "cancelled"); const result = await cmdThreadShow(storageRoot, tid); expect(result.status).toBe("cancelled"); @@ -375,15 +366,8 @@ describe("currentRole field", () => { const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const compId = comp.thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head; - deleteThread(uwfForIndex.varStore, compId); - addHistoryEntry(uwfForIndex.varStore, { - thread: compId, - workflow: comp.workflow, - head: compHead, - completedAt: Date.now(), - reason: "completed", - }); + const compHead = loadActiveThreads(uwfForIndex.varStore)[compId]!.head; + completeThread(uwfForIndex.varStore, compId, "completed"); const list = await cmdThreadList(storageRoot, null, null, null, 0, 100); diff --git a/packages/cli/src/__tests__/resolve-head-hash.test.ts b/packages/cli/src/__tests__/resolve-head-hash.test.ts index b5f68f1..cffcd6d 100644 --- a/packages/cli/src/__tests__/resolve-head-hash.test.ts +++ b/packages/cli/src/__tests__/resolve-head-hash.test.ts @@ -4,7 +4,7 @@ import { join } from "node:path"; import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { resolveHeadHash } from "../commands/shared.js"; -import { addHistoryEntry, createUwfStore, setThread } from "../store.js"; +import { completeThread, createUwfStore, setThread } from "../store.js"; let tmpDir: string; @@ -31,19 +31,13 @@ describe("resolveHeadHash", () => { expect(result).toBe(headHash); }); - test("falls back to history variable when thread not in active index", async () => { + test("finds completed thread", async () => { const threadId = "01JTEST0000000000000000002" as ThreadId; - const workflowHash = "workflow_hash_789" as CasRef; const uwf = await createUwfStore(tmpDir); const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef; - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt: Date.now(), - reason: null, - }); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); + completeThread(uwf.varStore, threadId, "completed"); const result = await resolveHeadHash(tmpDir, threadId); @@ -54,58 +48,36 @@ describe("resolveHeadHash", () => { // calls fail() which does process.exit(1), terminating the test runner. // The error behavior is tested in integration tests below via CLI invocation. - test("prioritizes active thread over history when thread exists in both", async () => { + test("prioritizes active thread", async () => { const threadId = "01JTEST0000000000000000004" as ThreadId; - const workflowHash = "workflow_hash_xyz" as CasRef; const uwf = await createUwfStore(tmpDir); const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef; - const historicalHash = (await uwf.store.cas.put(uwf.schemas.text, "historical-v1")) as CasRef; setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead)); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, - head: historicalHash, - completedAt: Date.now(), - reason: null, - }); const result = await resolveHeadHash(tmpDir, threadId); - // Should return the active head, not the historical one + // Should return the active head expect(result).toBe(activeHead); }); - test("finds thread from multiple history entries", async () => { + test("finds thread from multiple completed threads", async () => { const threadId1 = "01JTEST0000000000000000005" as ThreadId; const threadId2 = "01JTEST0000000000000000006" as ThreadId; const threadId3 = "01JTEST0000000000000000007" as ThreadId; - const workflowHash = "workflow_hash_abc" as CasRef; const uwf = await createUwfStore(tmpDir); const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef; const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef; const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef; - addHistoryEntry(uwf.varStore, { - thread: threadId1, - workflow: workflowHash, - head: hash1, - completedAt: Date.now() - 2000, - reason: null, - }); - addHistoryEntry(uwf.varStore, { - thread: threadId2, - workflow: workflowHash, - head: hash2, - completedAt: Date.now() - 1000, - reason: null, - }); - addHistoryEntry(uwf.varStore, { - thread: threadId3, - workflow: workflowHash, - head: hash3, - completedAt: Date.now(), - reason: null, - }); + + setThread(uwf.varStore, threadId1, createThreadIndexEntry(hash1)); + completeThread(uwf.varStore, threadId1, "completed"); + + setThread(uwf.varStore, threadId2, createThreadIndexEntry(hash2)); + completeThread(uwf.varStore, threadId2, "completed"); + + setThread(uwf.varStore, threadId3, createThreadIndexEntry(hash3)); + completeThread(uwf.varStore, threadId3, "completed"); const result = await resolveHeadHash(tmpDir, threadId2); diff --git a/packages/cli/src/__tests__/store-global-cas.test.ts b/packages/cli/src/__tests__/store-global-cas.test.ts index 5e2a727..5b24f87 100644 --- a/packages/cli/src/__tests__/store-global-cas.test.ts +++ b/packages/cli/src/__tests__/store-global-cas.test.ts @@ -226,19 +226,15 @@ describe("Global CAS directory", () => { const uwf = await createUwfStore(storageRoot); const threadId = "thread-123" as ThreadId; const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head"); - const { addHistoryEntry, findHistoryEntry } = await import("../store.js"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "workflow-456", - head: headHash, - completedAt: Date.now(), - reason: "completed", - }); + const { completeThread, setThread, getThread } = await import("../store.js"); + const { createThreadIndexEntry } = await import("@united-workforce/protocol"); - const entry = findHistoryEntry(uwf.varStore, threadId); - expect(entry?.thread).toBe(threadId); - expect(entry?.workflow).toBe("workflow-456"); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); expect(entry?.head).toBe(headHash); + expect(entry?.status).toBe("completed"); const { access } = await import("node:fs/promises"); await access(join(globalCasDir, "vars")); @@ -274,15 +270,12 @@ describe("Global CAS directory", () => { ); const uwf = await createUwfStore(storageRoot); - const { findHistoryEntry } = await import("../store.js"); - const entry = findHistoryEntry(uwf.varStore, threadId); - expect(entry).toEqual({ - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt, - reason: "cancelled", - }); + const { getThread } = await import("../store.js"); + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.head).toBe(headHash); + expect(entry?.status).toBe("cancelled"); + expect(entry?.completedAt).toBe(completedAt); await expect(access(historyPath)).rejects.toThrow(); const migratedContent = await readFile(`${historyPath}.migrated`, "utf8"); diff --git a/packages/cli/src/__tests__/store-unified-threads.test.ts b/packages/cli/src/__tests__/store-unified-threads.test.ts new file mode 100644 index 0000000..247db5d --- /dev/null +++ b/packages/cli/src/__tests__/store-unified-threads.test.ts @@ -0,0 +1,235 @@ +import { mkdir, mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { CasRef, ThreadId } from "@united-workforce/protocol"; +import { describe, expect, test } from "vitest"; +import { + completeThread, + createUwfStore, + getThread, + loadActiveThreads, + loadHistoryThreads, + setThread, +} from "../store.js"; + +async function makeUwfStore(storageRoot: string) { + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + process.env.OCAS_DIR = casDir; + return createUwfStore(storageRoot); +} + +async function seedThreadHead( + uwf: Awaited>, + label: string, +): Promise { + return (await uwf.store.cas.put(uwf.schemas.text, label)) as CasRef; +} + +describe("unified thread storage", () => { + test("loadActiveThreads excludes completed threads", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000ACTIVE1" as ThreadId; + const threadId2 = "01JTEST000000000000ACTIVE2" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "completed-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const active = loadActiveThreads(uwf.varStore); + expect(Object.keys(active)).toHaveLength(1); + expect(active[threadId1]).toBeDefined(); + expect(active[threadId2]).toBeUndefined(); + }); + + test("loadActiveThreads excludes cancelled threads", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000ACTIVE3" as ThreadId; + const threadId2 = "01JTEST000000000000ACTIVE4" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "cancelled-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const active = loadActiveThreads(uwf.varStore); + expect(Object.keys(active)).toHaveLength(1); + expect(active[threadId1]).toBeDefined(); + expect(active[threadId2]).toBeUndefined(); + }); + + test("loadHistoryThreads only returns completed and cancelled", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-history-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000HISTOR1" as ThreadId; + const threadId2 = "01JTEST000000000000HISTOR2" as ThreadId; + const threadId3 = "01JTEST000000000000HISTOR3" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "completed-head"); + const head3 = await seedThreadHead(uwf, "cancelled-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + setThread(uwf.varStore, threadId3, { + head: head3, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + expect(history[threadId1]).toBeUndefined(); + expect(history[threadId2]).toBeDefined(); + expect(history[threadId3]).toBeDefined(); + }); + + test("completeThread marks thread as completed", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE1" as ThreadId; + const head = await seedThreadHead(uwf, "active-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.completedAt).toBeDefined(); + expect(entry?.completedAt).toBeGreaterThan(0); + }); + + test("completeThread marks thread as cancelled", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE2" as ThreadId; + const head = await seedThreadHead(uwf, "active-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "cancelled"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("cancelled"); + expect(entry?.completedAt).toBeDefined(); + expect(entry?.completedAt).toBeGreaterThan(0); + }); + + test("completeThread clears suspend metadata", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE3" as ThreadId; + const head = await seedThreadHead(uwf, "suspended-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "suspended", + suspendedRole: "test-role", + suspendMessage: "test message", + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.suspendedRole).toBeNull(); + expect(entry?.suspendMessage).toBeNull(); + }); + + test("completeThread handles non-existent thread gracefully", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000NOEXIST" as ThreadId; + + // Should not throw + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).toBeNull(); + }); + + test("status and completedAt tags are persisted and loaded", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-tags-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000TAGTEST" as ThreadId; + const head = await seedThreadHead(uwf, "test-head"); + const now = Date.now(); + + setThread(uwf.varStore, threadId, { + head, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: now, + }); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.completedAt).toBe(now); + }); +}); diff --git a/packages/cli/src/__tests__/thread-cancel-status.test.ts b/packages/cli/src/__tests__/thread-cancel-status.test.ts index 5da810b..b54f60d 100644 --- a/packages/cli/src/__tests__/thread-cancel-status.test.ts +++ b/packages/cli/src/__tests__/thread-cancel-status.test.ts @@ -3,7 +3,13 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { describe, expect, test } from "vitest"; -import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js"; +import { + completeThread, + createUwfStore, + getThread, + loadHistoryThreads, + setThread, +} from "../store.js"; async function makeUwfStore(storageRoot: string) { const casDir = join(storageRoot, "cas"); @@ -20,88 +26,113 @@ async function seedHistoryHead( } describe("thread cancel status", () => { - test("cancelled history entry has reason 'cancelled'", async () => { + test("cancelled thread has status 'cancelled'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL1" as ThreadId; const uwf = await makeUwfStore(tmpDir); const head = await seedHistoryHead(uwf, "cancelled-head"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", + setThread(uwf.varStore, threadId, { head, - completedAt: Date.now(), - reason: "cancelled", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("cancelled"); + completeThread(uwf.varStore, threadId, "cancelled"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("cancelled"); }); - test("completed history entry has reason 'completed'", async () => { + test("completed thread has status 'completed'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL2" as ThreadId; const uwf = await makeUwfStore(tmpDir); const head = await seedHistoryHead(uwf, "completed-head"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", + setThread(uwf.varStore, threadId, { head, - completedAt: Date.now(), - reason: "completed", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("completed"); + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); }); - test("history entry with null reason is stored as completed", async () => { - const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); - const threadId = "01JTEST000000000000CANCEL3" as ThreadId; - const uwf = await makeUwfStore(tmpDir); - const head = await seedHistoryHead(uwf, "legacy-head"); - - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", - head, - completedAt: Date.now(), - reason: null, - }); - - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("completed"); - }); - - test("mixed completed and cancelled entries preserve distinct reasons", async () => { + test("loadHistoryThreads returns completed and cancelled", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const uwf = await makeUwfStore(tmpDir); const head1 = await seedHistoryHead(uwf, "head1"); const head2 = await seedHistoryHead(uwf, "head2"); - addHistoryEntry(uwf.varStore, { - thread: "01JTEST000000000000CANCEL4" as ThreadId, - workflow: "test-workflow", + const threadId1 = "01JTEST000000000000CANCEL4" as ThreadId; + setThread(uwf.varStore, threadId1, { head: head1, - completedAt: Date.now(), - reason: "completed", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId1, "completed"); - addHistoryEntry(uwf.varStore, { - thread: "01JTEST000000000000CANCEL5" as ThreadId, - workflow: "test-workflow", + const threadId2 = "01JTEST000000000000CANCEL5" as ThreadId; + setThread(uwf.varStore, threadId2, { head: head2, - completedAt: Date.now(), - reason: "cancelled", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId2, "cancelled"); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(2); - const reasons = history.map((entry) => entry.reason).sort(); - expect(reasons).toEqual(["cancelled", "completed"]); + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + const statuses = Object.values(history) + .map((entry) => entry.status) + .sort(); + expect(statuses).toEqual(["cancelled", "completed"]); + }); + + test("mixed completed and cancelled entries preserve distinct statuses", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); + const uwf = await makeUwfStore(tmpDir); + const head1 = await seedHistoryHead(uwf, "head1"); + const head2 = await seedHistoryHead(uwf, "head2"); + + const threadId1 = "01JTEST000000000000CANCEL6" as ThreadId; + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + completeThread(uwf.varStore, threadId1, "completed"); + + const threadId2 = "01JTEST000000000000CANCEL7" as ThreadId; + setThread(uwf.varStore, threadId2, { + head: head2, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + completeThread(uwf.varStore, threadId2, "cancelled"); + + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + const statuses = Object.values(history) + .map((entry) => entry.status) + .sort(); + expect(statuses).toEqual(["cancelled", "completed"]); }); }); diff --git a/packages/cli/src/__tests__/thread-list-filters.test.ts b/packages/cli/src/__tests__/thread-list-filters.test.ts index f7cf581..70c9325 100644 --- a/packages/cli/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli/src/__tests__/thread-list-filters.test.ts @@ -10,9 +10,8 @@ import { cmdThreadList } from "../commands/thread.js"; import { parseTimeInput } from "../commands/thread-time-parser.js"; import type { UwfStore } from "../store.js"; import { - addHistoryEntry, + completeThread as completeThreadInStore, createUwfStore, - deleteThread, loadAllThreads, setThread, } from "../store.js"; @@ -77,14 +76,7 @@ async function completeThread( headHash: CasRef, ) { const uwfIdx = await createUwfStore(storageRoot); - deleteThread(uwfIdx.varStore, threadId); - addHistoryEntry(uwfIdx.varStore, { - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt: Date.now(), - reason: null, - }); + completeThreadInStore(uwfIdx.varStore, threadId, "completed"); } // ── test setup ──────────────────────────────────────────────────────────────── @@ -500,8 +492,10 @@ describe("edge cases", () => { )) as CasRef; index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { head: placeholderHead, + status: "idle", suspendedRole: null, suspendMessage: null, + completedAt: null, }; for (const [tid, ent] of Object.entries(index)) { setThread(uwfIdx.varStore, tid as ThreadId, ent); diff --git a/packages/cli/src/__tests__/thread-resume.test.ts b/packages/cli/src/__tests__/thread-resume.test.ts index 85884bd..af9b044 100644 --- a/packages/cli/src/__tests__/thread-resume.test.ts +++ b/packages/cli/src/__tests__/thread-resume.test.ts @@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{ await seedThreads(tmpDir, { [THREAD_ID]: { head: stepHash, + status: "suspended", suspendedRole: "worker", suspendMessage: SUSPEND_MESSAGE, + completedAt: null, }, }); @@ -347,8 +349,10 @@ describe("uwf thread resume", () => { const uwfAfterFirst = await createUwfStore(tmpDir); expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({ head: firstResume.head, + status: "suspended", suspendedRole: "worker", suspendMessage: SUSPEND_MESSAGE, + completedAt: null, }); const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent( diff --git a/packages/cli/src/__tests__/thread-show-status.test.ts b/packages/cli/src/__tests__/thread-show-status.test.ts index 8c4d0af..8f6948a 100644 --- a/packages/cli/src/__tests__/thread-show-status.test.ts +++ b/packages/cli/src/__tests__/thread-show-status.test.ts @@ -7,9 +7,8 @@ import { describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, loadAllThreads, setThread, } from "../store.js"; @@ -118,7 +117,13 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); + setThread(uwf.varStore, threadId, { + head: stepHash, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); } describe("thread show status field", () => { @@ -208,15 +213,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); + completeThread(uwfForIndex.varStore, threadId, "completed"); const result = await cmdThreadShow(storageRoot, threadId); @@ -245,15 +242,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }); + completeThread(uwfForIndex.varStore, threadId, "cancelled"); const result = await cmdThreadShow(storageRoot, threadId); @@ -282,15 +271,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: null, - }); + completeThread(uwfForIndex.varStore, threadId, "completed"); const result = await cmdThreadShow(storageRoot, threadId); diff --git a/packages/cli/src/__tests__/thread-suspend-step.test.ts b/packages/cli/src/__tests__/thread-suspend-step.test.ts index 77f200f..0ba1ca7 100644 --- a/packages/cli/src/__tests__/thread-suspend-step.test.ts +++ b/packages/cli/src/__tests__/thread-suspend-step.test.ts @@ -160,8 +160,10 @@ describe("suspend step CAS chain and threads.yaml metadata", () => { const threadEntry = getThread(uwf.varStore, threadId); expect(threadEntry).toEqual({ head: stepHash, + status: "suspended", suspendedRole: "worker", suspendMessage: "Please clarify: Which API?", + completedAt: null, }); const showResult = await cmdThreadShow(tmpDir, threadId); diff --git a/packages/cli/src/__tests__/thread.test.ts b/packages/cli/src/__tests__/thread.test.ts index ff97665..3d38012 100644 --- a/packages/cli/src/__tests__/thread.test.ts +++ b/packages/cli/src/__tests__/thread.test.ts @@ -11,7 +11,7 @@ import { THREAD_READ_DEFAULT_QUOTA, } from "../commands/thread.js"; import type { UwfStore } from "../store.js"; -import { addHistoryEntry, createUwfStore } from "../store.js"; +import { completeThread, createUwfStore, setThread } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -745,13 +745,14 @@ describe("cmdStepList with completed threads", () => { const threadId = "01JTEST0000000000000000A2" as ThreadId; // Thread is NOT in active index (simulating completed thread) // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + setThread(uwf.varStore, threadId, { head: step2Hash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const result = await cmdStepList(tmpDir, threadId); @@ -872,14 +873,15 @@ describe("cmdStepShow with completed threads", () => { const threadId = "01JTEST0000000000000000B2" as ThreadId; // Thread is NOT in active index - // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + // But it IS in the unified store with completed status + setThread(uwf.varStore, threadId, { head: stepHash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const result = await cmdStepShow(tmpDir, stepHash); @@ -934,15 +936,15 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C1" as ThreadId; - // Thread is NOT in active index - // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + // Thread is in store with completed status + setThread(uwf.varStore, threadId, { head: stepHash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -998,13 +1000,14 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C2" as ThreadId; - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + setThread(uwf.varStore, threadId, { head: step3Hash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const markdown = await cmdThreadRead( tmpDir, diff --git a/packages/cli/src/commands/shared.ts b/packages/cli/src/commands/shared.ts index 872521f..b8a29bc 100644 --- a/packages/cli/src/commands/shared.ts +++ b/packages/cli/src/commands/shared.ts @@ -6,7 +6,7 @@ import type { StepNodePayload, ThreadId, } from "@united-workforce/protocol"; -import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js"; +import { createUwfStore, getThread, type UwfStore } from "../store.js"; type ChainState = { startHash: CasRef; @@ -207,10 +207,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - return hist.head; - } fail(`thread not found: ${threadId}`); } diff --git a/packages/cli/src/commands/step.ts b/packages/cli/src/commands/step.ts index dd5f50f..6786d4f 100644 --- a/packages/cli/src/commands/step.ts +++ b/packages/cli/src/commands/step.ts @@ -114,8 +114,10 @@ export async function cmdStepFork( const newThreadId = generateUlid(Date.now()) as ThreadId; setThread(uwf.varStore, newThreadId, { head: stepHash, + status: "idle", suspendedRole: null, suspendMessage: null, + completedAt: null, }); return { diff --git a/packages/cli/src/commands/thread.ts b/packages/cli/src/commands/thread.ts index 68b59ca..53c6493 100644 --- a/packages/cli/src/commands/thread.ts +++ b/packages/cli/src/commands/thread.ts @@ -38,17 +38,14 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index import { createIncludeTag } from "../include.js"; import { evaluate, isSuspendResult } from "../moderator/index.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, - findHistoryEntry, getThread, - loadAllHistory, - loadAllThreads, + loadActiveThreads, + loadHistoryThreads, loadWorkflowRegistry, resolveWorkflowHash, setThread, - type ThreadHistoryLine, type UwfStore, } from "../store.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; @@ -485,61 +482,55 @@ export async function cmdThreadShow( ): Promise { const uwf = await createUwfStore(storageRoot); const entry = getThread(uwf.varStore, threadId); - if (entry !== null) { - const activeHead = entry.head; - const workflow = resolveWorkflowFromHead(uwf, activeHead); - if (workflow === null) { - fail(`failed to resolve workflow from head: ${activeHead}`); - } + if (entry === null) { + fail(`thread not found: ${threadId}`); + } - const status = await resolveActiveThreadStatus( - storageRoot, - threadId, - uwf, - activeHead, - workflow, - ); - const currentRole = resolveCurrentRole(uwf, activeHead, workflow); - const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); - - const hint = - status === "suspended" - ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` - : null; + const activeHead = entry.head; + const workflow = resolveWorkflowFromHead(uwf, activeHead); + if (workflow === null) { + fail(`failed to resolve workflow from head: ${activeHead}`); + } + // Determine if this is a completed/cancelled thread + if (entry.status === "completed" || entry.status === "cancelled") { + const hint = null; return { workflow, thread: threadId, head: activeHead, - status, - currentRole, - suspendedRole: suspendFields.suspendedRole, - suspendMessage: suspendFields.suspendMessage, - done: false, - background: null, - hint, - }; - } - - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed"; - - return { - workflow: hist.workflow, - thread: threadId, - head: hist.head, - status, + status: entry.status, currentRole: null, suspendedRole: null, suspendMessage: null, done: true, background: null, - hint: null, + hint, }; } - fail(`thread not found: ${threadId}`); + // Active thread + const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, activeHead, workflow); + const currentRole = resolveCurrentRole(uwf, activeHead, workflow); + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); + + const hint = + status === "suspended" + ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` + : null; + + return { + workflow, + thread: threadId, + head: activeHead, + status, + currentRole, + suspendedRole: suspendFields.suspendedRole, + suspendMessage: suspendFields.suspendMessage, + done: false, + background: null, + hint, + }; } export type ThreadListItemWithStatus = ThreadListItem & { @@ -598,15 +589,15 @@ function collectCompletedThreads( activeIds: Set, ): ThreadListItemWithStatus[] { const items: ThreadListItemWithStatus[] = []; - const history = loadAllHistory(varStore); + const history = loadHistoryThreads(varStore); const seen = new Set(); // Deduplication (issue #470) - for (const entry of history) { - if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { - seen.add(entry.thread); - const status = entry.reason === "cancelled" ? "cancelled" : "completed"; + for (const [threadId, entry] of Object.entries(history)) { + if (!activeIds.has(threadId as ThreadId) && !seen.has(threadId as ThreadId)) { + seen.add(threadId as ThreadId); + const status = entry.status; items.push({ - thread: entry.thread, - workflow: entry.workflow, + thread: threadId as ThreadId, + workflow: "", // Will be resolved later if needed head: entry.head, status, currentRole: null, @@ -659,7 +650,7 @@ export async function cmdThreadList( take: number | null, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = loadAllThreads(uwf.varStore); + const index = loadActiveThreads(uwf.varStore); // Collect active threads let items = await collectActiveThreads(storageRoot, uwf, index); @@ -1035,15 +1026,8 @@ function spawnAgent( return obj as unknown as AdapterOutput; } -function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void { - deleteThread(uwf.varStore, threadId); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); +function archiveThread(uwf: UwfStore, threadId: ThreadId, _workflow: CasRef, _head: CasRef): void { + completeThread(uwf.varStore, threadId, "completed"); } export async function cmdThreadResume( @@ -1450,10 +1434,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - return hist.head; - } fail(`thread not found: ${threadId}`); } @@ -1533,7 +1513,6 @@ export async function cmdThreadCancel( if (entry === null) { fail(`thread not active: ${threadId}`); } - const head = entry.head; // Check if thread is running in background and terminate it const runningMarker = await isThreadRunning(storageRoot, threadId); @@ -1546,21 +1525,7 @@ export async function cmdThreadCancel( await deleteMarker(storageRoot, threadId); } - const workflow = resolveWorkflowFromHead(uwf, head); - if (workflow === null) { - fail(`failed to resolve workflow from head: ${head}`); - } - - deleteThread(uwf.varStore, threadId); - - const historyEntry: ThreadHistoryLine = { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }; - addHistoryEntry(uwf.varStore, historyEntry); + completeThread(uwf.varStore, threadId, "cancelled"); return { thread: threadId, cancelled: true }; } diff --git a/packages/cli/src/store.ts b/packages/cli/src/store.ts index 9bc0ddb..c96be21 100644 --- a/packages/cli/src/store.ts +++ b/packages/cli/src/store.ts @@ -26,8 +26,6 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/"; /** Variable name prefix for active thread entries (`@uwf/thread/`). */ export const THREAD_VAR_PREFIX = "@uwf/thread/"; -/** Variable name prefix for completed/cancelled thread history (`@uwf/history/`). */ -export const HISTORY_VAR_PREFIX = "@uwf/history/"; /** A workflow entry discovered from the project-local .workflows/ directory. */ export type ProjectWorkflowEntry = { @@ -160,10 +158,6 @@ export function getThreadsPath(storageRoot: string): string { return join(storageRoot, "threads.yaml"); } -export type ThreadHistoryLine = ThreadListItem & { - completedAt: number; - reason: "completed" | "cancelled" | null; -}; export type UwfStore = { storageRoot: string; @@ -183,6 +177,7 @@ export async function createUwfStore(storageRoot: string): Promise { await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); await migrateThreadsIndexIfNeeded(storageRoot, varStore); await migrateHistoryIfNeeded(storageRoot, varStore); + migrateHistoryVarsToThreadVars(varStore); return { storageRoot, store, schemas, varStore }; } @@ -303,8 +298,10 @@ function threadVarName(threadId: ThreadId): string { function entryFromVariable(v: { value: string; tags: Record }): ThreadIndexEntry { return { head: v.value as CasRef, + status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"], suspendedRole: v.tags.suspendedRole ?? null, suspendMessage: v.tags.suspendMessage ?? null, + completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null, }; } @@ -335,21 +332,75 @@ export function setThread(varStore: VarStore, threadId: ThreadId, entry: ThreadI // Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first. varStore.remove(name); const tags: Record = {}; + if (entry.status !== "idle") { + tags.status = entry.status; + } if (entry.suspendedRole !== null) { tags.suspendedRole = entry.suspendedRole; } if (entry.suspendMessage !== null) { tags.suspendMessage = entry.suspendMessage; } + if (entry.completedAt !== null) { + tags.completedAt = String(entry.completedAt); + } varStore.set(name, entry.head, { tags }); } -/** Remove an active thread entry (on complete/cancel). */ -export function deleteThread(varStore: VarStore, threadId: ThreadId): void { - varStore.remove(threadVarName(threadId)); +/** Load only active threads (status not in completed/cancelled). */ +export function loadActiveThreads(varStore: VarStore): ThreadsIndex { + const all = loadAllThreads(varStore); + const active: ThreadsIndex = {}; + for (const [threadId, entry] of Object.entries(all)) { + if (entry.status !== "completed" && entry.status !== "cancelled") { + active[threadId as ThreadId] = entry; + } + } + return active; } -function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { +/** Load only completed/cancelled threads (history). */ +export function loadHistoryThreads(varStore: VarStore): ThreadsIndex { + const all = loadAllThreads(varStore); + const history: ThreadsIndex = {}; + for (const [threadId, entry] of Object.entries(all)) { + if (entry.status === "completed" || entry.status === "cancelled") { + history[threadId as ThreadId] = entry; + } + } + return history; +} + +/** Complete a thread by marking it completed or cancelled. */ +export function completeThread( + varStore: VarStore, + threadId: ThreadId, + reason: "completed" | "cancelled", +): void { + const entry = getThread(varStore, threadId); + if (entry === null) { + return; + } + const completed = { + head: entry.head, + status: reason, + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + } as ThreadIndexEntry; + setThread(varStore, threadId, completed); +} + + +type LegacyHistoryEntry = { + thread: ThreadId; + workflow: CasRef; + head: CasRef; + completedAt: number; + reason: "completed" | "cancelled" | null; +}; + +function parseLegacyHistoryJsonlLine(trimmed: string): LegacyHistoryEntry | null { let raw: unknown; try { raw = JSON.parse(trimmed) as unknown; @@ -383,7 +434,7 @@ function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { return null; } -/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */ +/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/thread/*` variables with status tags. */ export async function migrateHistoryIfNeeded( storageRoot: string, varStore: VarStore, @@ -399,47 +450,43 @@ export async function migrateHistoryIfNeeded( if (trimmed === "") { continue; } - const entry = parseHistoryJsonlLine(trimmed); + const entry = parseLegacyHistoryJsonlLine(trimmed); if (entry !== null) { - addHistoryEntry(varStore, entry); + const status = entry.reason === "cancelled" ? "cancelled" : "completed"; + const threadEntry: ThreadIndexEntry = { + head: entry.head, + status: status as ThreadIndexEntry["status"], + suspendedRole: null, + suspendMessage: null, + completedAt: entry.completedAt, + }; + setThread(varStore, entry.thread, threadEntry); } } await rename(path, `${path}.migrated`); } -export function loadAllHistory(varStore: VarStore): ThreadHistoryLine[] { - const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX }); - return vars.map((v) => ({ - thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId, - workflow: v.tags.workflow ?? "", - head: v.value as CasRef, - completedAt: Number(v.tags.completedAt ?? "0"), - reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, - })); -} +/** Migrate `@uwf/history/*` variables to `@uwf/thread/*` with status tags. */ +export function migrateHistoryVarsToThreadVars(varStore: VarStore): void { + const LEGACY_HISTORY_VAR_PREFIX = "@uwf/history/"; + const vars = varStore.list({ namePrefix: LEGACY_HISTORY_VAR_PREFIX }); -export function findHistoryEntry(varStore: VarStore, threadId: ThreadId): ThreadHistoryLine | null { - const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` }); - const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`); - if (v === undefined) { - return null; + for (const v of vars) { + const threadId = v.name.slice(LEGACY_HISTORY_VAR_PREFIX.length) as ThreadId; + const reason = v.tags.reason; + const status = reason === "cancelled" ? "cancelled" : "completed"; + const completedAt = Number(v.tags.completedAt ?? Date.now()); + + const threadEntry: ThreadIndexEntry = { + head: v.value as CasRef, + status: status as ThreadIndexEntry["status"], + suspendedRole: null, + suspendMessage: null, + completedAt, + }; + + setThread(varStore, threadId, threadEntry); + varStore.remove(v.name); } - return { - thread: threadId, - workflow: v.tags.workflow ?? "", - head: v.value as CasRef, - completedAt: Number(v.tags.completedAt ?? "0"), - reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, - }; -} - -export function addHistoryEntry(varStore: VarStore, entry: ThreadHistoryLine): void { - varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, { - tags: { - workflow: entry.workflow, - completedAt: String(entry.completedAt), - reason: entry.reason ?? "completed", - }, - }); } diff --git a/packages/util-agent/src/storage.ts b/packages/util-agent/src/storage.ts index 8b963f1..47c47d2 100644 --- a/packages/util-agent/src/storage.ts +++ b/packages/util-agent/src/storage.ts @@ -88,8 +88,10 @@ export async function getActiveThreadEntry( } return { head: v.value as CasRef, + status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"], suspendedRole: v.tags.suspendedRole ?? null, suspendMessage: v.tags.suspendMessage ?? null, + completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null, }; }