diff --git a/packages/cli/src/__tests__/current-role.test.ts b/packages/cli/src/__tests__/current-role.test.ts index e93b908..369ae24 100644 --- a/packages/cli/src/__tests__/current-role.test.ts +++ b/packages/cli/src/__tests__/current-role.test.ts @@ -7,10 +7,9 @@ import { describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, - loadAllThreads, + loadActiveThreads, setThread, } from "../store.js"; @@ -175,7 +174,7 @@ async function insertStepNode( outputPayload: Record, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = loadAllThreads(uwf.varStore); + const index = loadActiveThreads(uwf.varStore); const headEntry = index[threadId]; if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); const head = headEntry.head; @@ -206,7 +205,13 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); + setThread(uwf.varStore, threadId, { + head: stepHash, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); } describe("currentRole field", () => { @@ -286,15 +291,8 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; - deleteThread(uwfForIndex.varStore, tid); - addHistoryEntry(uwfForIndex.varStore, { - thread: tid, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); + const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head; + completeThread(uwfForIndex.varStore, tid, "completed"); const result = await cmdThreadShow(storageRoot, tid); expect(result.status).toBe("completed"); @@ -314,15 +312,8 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; - deleteThread(uwfForIndex.varStore, tid); - addHistoryEntry(uwfForIndex.varStore, { - thread: tid, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }); + const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head; + completeThread(uwfForIndex.varStore, tid, "cancelled"); const result = await cmdThreadShow(storageRoot, tid); expect(result.status).toBe("cancelled"); @@ -375,15 +366,8 @@ describe("currentRole field", () => { const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const compId = comp.thread as ThreadId; const uwfForIndex = await createUwfStore(storageRoot); - const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head; - deleteThread(uwfForIndex.varStore, compId); - addHistoryEntry(uwfForIndex.varStore, { - thread: compId, - workflow: comp.workflow, - head: compHead, - completedAt: Date.now(), - reason: "completed", - }); + const compHead = loadActiveThreads(uwfForIndex.varStore)[compId]!.head; + completeThread(uwfForIndex.varStore, compId, "completed"); const list = await cmdThreadList(storageRoot, null, null, null, 0, 100); diff --git a/packages/cli/src/__tests__/resolve-head-hash.test.ts b/packages/cli/src/__tests__/resolve-head-hash.test.ts index 4926543..4cea476 100644 --- a/packages/cli/src/__tests__/resolve-head-hash.test.ts +++ b/packages/cli/src/__tests__/resolve-head-hash.test.ts @@ -4,7 +4,7 @@ import { join } from "node:path"; import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { resolveHeadHash } from "../commands/shared.js"; -import { addHistoryEntry, createUwfStore, setThread } from "../store.js"; +import { completeThread, createUwfStore, setThread } from "../store.js"; let tmpDir: string; @@ -31,19 +31,13 @@ describe("resolveHeadHash", () => { expect(result).toBe(headHash); }); - test("falls back to history variable when thread not in active index", async () => { + test("finds completed thread", async () => { const threadId = "01JTEST0000000000000000002" as ThreadId; - const workflowHash = "workflow_hash_789" as CasRef; const uwf = await createUwfStore(tmpDir); const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef; - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt: Date.now(), - reason: null, - }); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); + completeThread(uwf.varStore, threadId, "completed"); const result = await resolveHeadHash(tmpDir, threadId); @@ -54,58 +48,36 @@ describe("resolveHeadHash", () => { // calls fail() which does process.exit(1), terminating the test runner. // The error behavior is tested in integration tests below via CLI invocation. - test("prioritizes active thread over history when thread exists in both", async () => { + test("prioritizes active thread", async () => { const threadId = "01JTEST0000000000000000004" as ThreadId; - const workflowHash = "workflow_hash_xyz" as CasRef; const uwf = await createUwfStore(tmpDir); const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef; - const historicalHash = (await uwf.store.cas.put(uwf.schemas.text, "historical-v1")) as CasRef; setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead)); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, - head: historicalHash, - completedAt: Date.now(), - reason: null, - }); const result = await resolveHeadHash(tmpDir, threadId); - // Should return the active head, not the historical one + // Should return the active head expect(result).toBe(activeHead); }); - test("finds thread from multiple history entries", async () => { + test("finds thread from multiple completed threads", async () => { const threadId1 = "01JTEST0000000000000000005" as ThreadId; const threadId2 = "01JTEST0000000000000000006" as ThreadId; const threadId3 = "01JTEST0000000000000000007" as ThreadId; - const workflowHash = "workflow_hash_abc" as CasRef; const uwf = await createUwfStore(tmpDir); const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef; const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef; const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef; - addHistoryEntry(uwf.varStore, { - thread: threadId1, - workflow: workflowHash, - head: hash1, - completedAt: Date.now() - 2000, - reason: null, - }); - addHistoryEntry(uwf.varStore, { - thread: threadId2, - workflow: workflowHash, - head: hash2, - completedAt: Date.now() - 1000, - reason: null, - }); - addHistoryEntry(uwf.varStore, { - thread: threadId3, - workflow: workflowHash, - head: hash3, - completedAt: Date.now(), - reason: null, - }); + + setThread(uwf.varStore, threadId1, createThreadIndexEntry(hash1)); + completeThread(uwf.varStore, threadId1, "completed"); + + setThread(uwf.varStore, threadId2, createThreadIndexEntry(hash2)); + completeThread(uwf.varStore, threadId2, "completed"); + + setThread(uwf.varStore, threadId3, createThreadIndexEntry(hash3)); + completeThread(uwf.varStore, threadId3, "completed"); const result = await resolveHeadHash(tmpDir, threadId2); diff --git a/packages/cli/src/__tests__/store-global-cas.test.ts b/packages/cli/src/__tests__/store-global-cas.test.ts index 86e9436..6eef76c 100644 --- a/packages/cli/src/__tests__/store-global-cas.test.ts +++ b/packages/cli/src/__tests__/store-global-cas.test.ts @@ -226,19 +226,15 @@ describe("Global CAS directory", () => { const uwf = await createUwfStore(storageRoot); const threadId = "thread-123" as ThreadId; const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head"); - const { addHistoryEntry, findHistoryEntry } = await import("../store.js"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "workflow-456", - head: headHash, - completedAt: Date.now(), - reason: "completed", - }); + const { completeThread, setThread, getThread } = await import("../store.js"); + const { createThreadIndexEntry } = await import("@united-workforce/protocol"); - const entry = findHistoryEntry(uwf.varStore, threadId); - expect(entry?.thread).toBe(threadId); - expect(entry?.workflow).toBe("workflow-456"); + setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash)); + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); expect(entry?.head).toBe(headHash); + expect(entry?.status).toBe("completed"); const { access } = await import("node:fs/promises"); await access(join(globalCasDir, "vars")); @@ -274,15 +270,12 @@ describe("Global CAS directory", () => { ); const uwf = await createUwfStore(storageRoot); - const { findHistoryEntry } = await import("../store.js"); - const entry = findHistoryEntry(uwf.varStore, threadId); - expect(entry).toEqual({ - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt, - reason: "cancelled", - }); + const { getThread } = await import("../store.js"); + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.head).toBe(headHash); + expect(entry?.status).toBe("cancelled"); + expect(entry?.completedAt).toBe(completedAt); await expect(access(historyPath)).rejects.toThrow(); const migratedContent = await readFile(`${historyPath}.migrated`, "utf8"); diff --git a/packages/cli/src/__tests__/store-unified-threads.test.ts b/packages/cli/src/__tests__/store-unified-threads.test.ts new file mode 100644 index 0000000..247db5d --- /dev/null +++ b/packages/cli/src/__tests__/store-unified-threads.test.ts @@ -0,0 +1,235 @@ +import { mkdir, mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { CasRef, ThreadId } from "@united-workforce/protocol"; +import { describe, expect, test } from "vitest"; +import { + completeThread, + createUwfStore, + getThread, + loadActiveThreads, + loadHistoryThreads, + setThread, +} from "../store.js"; + +async function makeUwfStore(storageRoot: string) { + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + process.env.OCAS_DIR = casDir; + return createUwfStore(storageRoot); +} + +async function seedThreadHead( + uwf: Awaited>, + label: string, +): Promise { + return (await uwf.store.cas.put(uwf.schemas.text, label)) as CasRef; +} + +describe("unified thread storage", () => { + test("loadActiveThreads excludes completed threads", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000ACTIVE1" as ThreadId; + const threadId2 = "01JTEST000000000000ACTIVE2" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "completed-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const active = loadActiveThreads(uwf.varStore); + expect(Object.keys(active)).toHaveLength(1); + expect(active[threadId1]).toBeDefined(); + expect(active[threadId2]).toBeUndefined(); + }); + + test("loadActiveThreads excludes cancelled threads", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000ACTIVE3" as ThreadId; + const threadId2 = "01JTEST000000000000ACTIVE4" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "cancelled-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const active = loadActiveThreads(uwf.varStore); + expect(Object.keys(active)).toHaveLength(1); + expect(active[threadId1]).toBeDefined(); + expect(active[threadId2]).toBeUndefined(); + }); + + test("loadHistoryThreads only returns completed and cancelled", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-history-test-")); + const uwf = await makeUwfStore(tmpDir); + + const threadId1 = "01JTEST000000000000HISTOR1" as ThreadId; + const threadId2 = "01JTEST000000000000HISTOR2" as ThreadId; + const threadId3 = "01JTEST000000000000HISTOR3" as ThreadId; + const head1 = await seedThreadHead(uwf, "active-head"); + const head2 = await seedThreadHead(uwf, "completed-head"); + const head3 = await seedThreadHead(uwf, "cancelled-head"); + + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + setThread(uwf.varStore, threadId2, { + head: head2, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + setThread(uwf.varStore, threadId3, { + head: head3, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + }); + + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + expect(history[threadId1]).toBeUndefined(); + expect(history[threadId2]).toBeDefined(); + expect(history[threadId3]).toBeDefined(); + }); + + test("completeThread marks thread as completed", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE1" as ThreadId; + const head = await seedThreadHead(uwf, "active-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.completedAt).toBeDefined(); + expect(entry?.completedAt).toBeGreaterThan(0); + }); + + test("completeThread marks thread as cancelled", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE2" as ThreadId; + const head = await seedThreadHead(uwf, "active-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "cancelled"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("cancelled"); + expect(entry?.completedAt).toBeDefined(); + expect(entry?.completedAt).toBeGreaterThan(0); + }); + + test("completeThread clears suspend metadata", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000COMPLE3" as ThreadId; + const head = await seedThreadHead(uwf, "suspended-head"); + + setThread(uwf.varStore, threadId, { + head, + status: "suspended", + suspendedRole: "test-role", + suspendMessage: "test message", + completedAt: null, + }); + + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.suspendedRole).toBeNull(); + expect(entry?.suspendMessage).toBeNull(); + }); + + test("completeThread handles non-existent thread gracefully", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000NOEXIST" as ThreadId; + + // Should not throw + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).toBeNull(); + }); + + test("status and completedAt tags are persisted and loaded", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-tags-test-")); + const uwf = await makeUwfStore(tmpDir); + const threadId = "01JTEST000000000000TAGTEST" as ThreadId; + const head = await seedThreadHead(uwf, "test-head"); + const now = Date.now(); + + setThread(uwf.varStore, threadId, { + head, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: now, + }); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); + expect(entry?.completedAt).toBe(now); + }); +}); diff --git a/packages/cli/src/__tests__/thread-cancel-status.test.ts b/packages/cli/src/__tests__/thread-cancel-status.test.ts index 5699fce..7d3ac1a 100644 --- a/packages/cli/src/__tests__/thread-cancel-status.test.ts +++ b/packages/cli/src/__tests__/thread-cancel-status.test.ts @@ -3,7 +3,13 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { describe, expect, test } from "vitest"; -import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js"; +import { + completeThread, + createUwfStore, + getThread, + loadHistoryThreads, + setThread, +} from "../store.js"; async function makeUwfStore(storageRoot: string) { const casDir = join(storageRoot, "cas"); @@ -20,88 +26,113 @@ async function seedHistoryHead( } describe("thread cancel status", () => { - test("cancelled history entry has reason 'cancelled'", async () => { + test("cancelled thread has status 'cancelled'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL1" as ThreadId; const uwf = await makeUwfStore(tmpDir); const head = await seedHistoryHead(uwf, "cancelled-head"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", + setThread(uwf.varStore, threadId, { head, - completedAt: Date.now(), - reason: "cancelled", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("cancelled"); + completeThread(uwf.varStore, threadId, "cancelled"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("cancelled"); }); - test("completed history entry has reason 'completed'", async () => { + test("completed thread has status 'completed'", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const threadId = "01JTEST000000000000CANCEL2" as ThreadId; const uwf = await makeUwfStore(tmpDir); const head = await seedHistoryHead(uwf, "completed-head"); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", + setThread(uwf.varStore, threadId, { head, - completedAt: Date.now(), - reason: "completed", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("completed"); + completeThread(uwf.varStore, threadId, "completed"); + + const entry = getThread(uwf.varStore, threadId); + expect(entry).not.toBeNull(); + expect(entry?.status).toBe("completed"); }); - test("history entry with null reason is stored as completed", async () => { - const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); - const threadId = "01JTEST000000000000CANCEL3" as ThreadId; - const uwf = await makeUwfStore(tmpDir); - const head = await seedHistoryHead(uwf, "legacy-head"); - - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: "test-workflow", - head, - completedAt: Date.now(), - reason: null, - }); - - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(1); - expect(history[0]?.reason).toBe("completed"); - }); - - test("mixed completed and cancelled entries preserve distinct reasons", async () => { + test("loadHistoryThreads returns completed and cancelled", async () => { const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const uwf = await makeUwfStore(tmpDir); const head1 = await seedHistoryHead(uwf, "head1"); const head2 = await seedHistoryHead(uwf, "head2"); - addHistoryEntry(uwf.varStore, { - thread: "01JTEST000000000000CANCEL4" as ThreadId, - workflow: "test-workflow", + const threadId1 = "01JTEST000000000000CANCEL4" as ThreadId; + setThread(uwf.varStore, threadId1, { head: head1, - completedAt: Date.now(), - reason: "completed", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId1, "completed"); - addHistoryEntry(uwf.varStore, { - thread: "01JTEST000000000000CANCEL5" as ThreadId, - workflow: "test-workflow", + const threadId2 = "01JTEST000000000000CANCEL5" as ThreadId; + setThread(uwf.varStore, threadId2, { head: head2, - completedAt: Date.now(), - reason: "cancelled", + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId2, "cancelled"); - const history = loadAllHistory(uwf.varStore); - expect(history).toHaveLength(2); - const reasons = history.map((entry) => entry.reason).sort(); - expect(reasons).toEqual(["cancelled", "completed"]); + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + const statuses = Object.values(history) + .map((entry) => entry.status) + .sort(); + expect(statuses).toEqual(["cancelled", "completed"]); + }); + + test("mixed completed and cancelled entries preserve distinct statuses", async () => { + const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); + const uwf = await makeUwfStore(tmpDir); + const head1 = await seedHistoryHead(uwf, "head1"); + const head2 = await seedHistoryHead(uwf, "head2"); + + const threadId1 = "01JTEST000000000000CANCEL6" as ThreadId; + setThread(uwf.varStore, threadId1, { + head: head1, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + completeThread(uwf.varStore, threadId1, "completed"); + + const threadId2 = "01JTEST000000000000CANCEL7" as ThreadId; + setThread(uwf.varStore, threadId2, { + head: head2, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); + completeThread(uwf.varStore, threadId2, "cancelled"); + + const history = loadHistoryThreads(uwf.varStore); + expect(Object.keys(history)).toHaveLength(2); + const statuses = Object.values(history) + .map((entry) => entry.status) + .sort(); + expect(statuses).toEqual(["cancelled", "completed"]); }); }); diff --git a/packages/cli/src/__tests__/thread-list-filters.test.ts b/packages/cli/src/__tests__/thread-list-filters.test.ts index dd9344c..e5b74cc 100644 --- a/packages/cli/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli/src/__tests__/thread-list-filters.test.ts @@ -10,9 +10,8 @@ import { cmdThreadList } from "../commands/thread.js"; import { parseTimeInput } from "../commands/thread-time-parser.js"; import type { UwfStore } from "../store.js"; import { - addHistoryEntry, + completeThread as completeThreadInStore, createUwfStore, - deleteThread, loadAllThreads, setThread, } from "../store.js"; @@ -77,14 +76,7 @@ async function completeThread( headHash: CasRef, ) { const uwfIdx = await createUwfStore(storageRoot); - deleteThread(uwfIdx.varStore, threadId); - addHistoryEntry(uwfIdx.varStore, { - thread: threadId, - workflow: workflowHash, - head: headHash, - completedAt: Date.now(), - reason: null, - }); + completeThreadInStore(uwfIdx.varStore, threadId, "completed"); } // ── test setup ──────────────────────────────────────────────────────────────── @@ -500,8 +492,10 @@ describe("edge cases", () => { )) as CasRef; index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { head: placeholderHead, + status: "idle", suspendedRole: null, suspendMessage: null, + completedAt: null, }; for (const [tid, ent] of Object.entries(index)) { setThread(uwfIdx.varStore, tid as ThreadId, ent); diff --git a/packages/cli/src/__tests__/thread-resume.test.ts b/packages/cli/src/__tests__/thread-resume.test.ts index 799bd74..53cf56c 100644 --- a/packages/cli/src/__tests__/thread-resume.test.ts +++ b/packages/cli/src/__tests__/thread-resume.test.ts @@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{ await seedThreads(tmpDir, { [THREAD_ID]: { head: stepHash, + status: "suspended", suspendedRole: "worker", suspendMessage: SUSPEND_MESSAGE, + completedAt: null, }, }); @@ -247,7 +249,7 @@ describe("uwf thread resume", () => { const result = runUwf(["thread", "resume", THREAD_ID], casDir); expect(result.status).not.toBe(0); - expect(result.stderr).toContain("thread is not suspended"); + expect(result.stderr).toContain("thread cannot be resumed"); }); test("resume suspended thread executes step and becomes idle", async () => { @@ -347,8 +349,10 @@ describe("uwf thread resume", () => { const uwfAfterFirst = await createUwfStore(tmpDir); expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({ head: firstResume.head, + status: "suspended", suspendedRole: "worker", suspendMessage: SUSPEND_MESSAGE, + completedAt: null, }); const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent( @@ -444,3 +448,266 @@ echo '${adapterJson}' return { mockAgentPath }; } + +describe("uwf thread resume - completed threads", () => { + test("resume completed thread starts from $START role", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "test-completed-resume", + description: "completed thread resume test", + roles: { + worker: { + description: "Worker role", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + reviewer: { + description: "Reviewer role", + goal: "Review", + capabilities: [], + procedure: "review", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { _: { role: "reviewer", prompt: "Review the work", location: null } }, + reviewer: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Initial task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + + const workerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" }); + const reviewerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" }); + const detailHash = await store.cas.put(schemas.text, "mock detail"); + + const workerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: workerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600000000, + completedAtMs: 1716600001000, + cwd: tmpDir, + assembledPrompt: null, + }); + + const reviewerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: workerStepHash, + role: "reviewer", + output: reviewerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Review the work", + startedAtMs: 1716600001000, + completedAtMs: 1716600002000, + cwd: tmpDir, + assembledPrompt: null, + }); + + await seedThreads(tmpDir, { + [THREAD_ID]: { + head: reviewerStepHash, + status: "completed", + suspendedRole: null, + suspendMessage: null, + completedAt: 1716600002000, + }, + }); + + // Verify the status was actually set + const { createUwfStore, getThread } = await import("../store.js"); + const verifyUwf = await createUwfStore(tmpDir); + const verifyEntry = getThread(verifyUwf.varStore, THREAD_ID); + // biome-ignore lint/nursery/noConsole: test debugging + console.log("Seeded entry status:", verifyEntry?.status); + // biome-ignore lint/nursery/noConsole: test debugging + console.log("Seeded entry:", JSON.stringify(verifyEntry, null, 2)); + + const promptCapturePath = join(tmpDir, "captured-prompt-completed.txt"); + const mockAgentPath = join(tmpDir, "mock-agent-completed.sh"); + + const newWorkerStepHash = await store.cas.put(schemas.stepNode, { + start: startHash, + prev: reviewerStepHash, + role: "worker", + output: workerOutputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs: 1716600003000, + completedAtMs: 1716600004000, + cwd: tmpDir, + assembledPrompt: null, + }); + + const adapterJson = JSON.stringify({ + stepHash: newWorkerStepHash, + detailHash, + role: "worker", + frontmatter: { $status: "_" }, + body: "", + startedAtMs: 1716600003000, + completedAtMs: 1716600004000, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + const configPath = join(tmpDir, "config.yaml"); + await writeFile( + configPath, + `defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`, + ); + + const result = runUwf( + ["thread", "resume", THREAD_ID, "-p", "Additional context", "--agent", mockAgentPath], + casDir, + ); + + if (result.status !== 0) { + // biome-ignore lint/nursery/noConsole: test debugging + console.error("Command failed:", result.stderr); + } + + expect(result.status).toBe(0); + + const cliOutput = JSON.parse(result.stdout.trim()); + expect(cliOutput.status).toBe("idle"); + expect(cliOutput.currentRole).toBe("reviewer"); + expect(cliOutput.done).toBe(false); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toContain("Previous run completed"); + expect(capturedPrompt).toContain("Additional context"); + + const storeModule = await import("../store.js"); + const uwf2 = await storeModule.createUwfStore(tmpDir); + const entry2 = storeModule.getThread(uwf2.varStore, THREAD_ID); + expect(entry2?.status).toBe("idle"); + expect(entry2?.completedAt).toBeNull(); + }); + + test("resume cancelled thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "cancelled-workflow", + description: "cancelled thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + await seedThreads(tmpDir, { + [THREAD_ID]: { + head: startHash, + status: "cancelled", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }, + }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread cannot be resumed"); + expect(result.stderr).toContain("cancelled"); + }); + + test("resume idle thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = await openStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.cas.put(schemas.workflow, { + name: "idle-workflow", + description: "idle thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.cas.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + process.env.OCAS_DIR = casDir; + await seedThreads(tmpDir, { [THREAD_ID]: startHash }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread cannot be resumed"); + expect(result.stderr).toContain("idle"); + }); +}); diff --git a/packages/cli/src/__tests__/thread-show-status.test.ts b/packages/cli/src/__tests__/thread-show-status.test.ts index 60d412b..18df70e 100644 --- a/packages/cli/src/__tests__/thread-show-status.test.ts +++ b/packages/cli/src/__tests__/thread-show-status.test.ts @@ -7,9 +7,8 @@ import { describe, expect, test } from "vitest"; import { createMarker, deleteMarker } from "../background/index.js"; import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, loadAllThreads, setThread, } from "../store.js"; @@ -118,7 +117,13 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); + setThread(uwf.varStore, threadId, { + head: stepHash, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, + }); } describe("thread show status field", () => { @@ -208,15 +213,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); + completeThread(uwfForIndex.varStore, threadId, "completed"); const result = await cmdThreadShow(storageRoot, threadId); @@ -245,15 +242,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }); + completeThread(uwfForIndex.varStore, threadId, "cancelled"); const result = await cmdThreadShow(storageRoot, threadId); @@ -282,15 +271,7 @@ describe("thread show status field", () => { const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); - deleteThread(uwfForIndex.varStore, threadId); - - addHistoryEntry(uwfForIndex.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: null, - }); + completeThread(uwfForIndex.varStore, threadId, "completed"); const result = await cmdThreadShow(storageRoot, threadId); diff --git a/packages/cli/src/__tests__/thread-suspend-step.test.ts b/packages/cli/src/__tests__/thread-suspend-step.test.ts index 16ef4f2..9796837 100644 --- a/packages/cli/src/__tests__/thread-suspend-step.test.ts +++ b/packages/cli/src/__tests__/thread-suspend-step.test.ts @@ -160,8 +160,10 @@ describe("suspend step CAS chain and threads.yaml metadata", () => { const threadEntry = getThread(uwf.varStore, threadId); expect(threadEntry).toEqual({ head: stepHash, + status: "suspended", suspendedRole: "worker", suspendMessage: "Please clarify: Which API?", + completedAt: null, }); const showResult = await cmdThreadShow(tmpDir, threadId); diff --git a/packages/cli/src/__tests__/thread.test.ts b/packages/cli/src/__tests__/thread.test.ts index 959a788..a3ea7e6 100644 --- a/packages/cli/src/__tests__/thread.test.ts +++ b/packages/cli/src/__tests__/thread.test.ts @@ -11,7 +11,7 @@ import { THREAD_READ_DEFAULT_QUOTA, } from "../commands/thread.js"; import type { UwfStore } from "../store.js"; -import { addHistoryEntry, createUwfStore } from "../store.js"; +import { completeThread, createUwfStore, setThread } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js"; // ── schemas used in tests ──────────────────────────────────────────────────── @@ -745,13 +745,14 @@ describe("cmdStepList with completed threads", () => { const threadId = "01JTEST0000000000000000A2" as ThreadId; // Thread is NOT in active index (simulating completed thread) // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + setThread(uwf.varStore, threadId, { head: step2Hash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const result = await cmdStepList(tmpDir, threadId); @@ -872,14 +873,15 @@ describe("cmdStepShow with completed threads", () => { const threadId = "01JTEST0000000000000000B2" as ThreadId; // Thread is NOT in active index - // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + // But it IS in the unified store with completed status + setThread(uwf.varStore, threadId, { head: stepHash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const result = await cmdStepShow(tmpDir, stepHash); @@ -934,15 +936,15 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C1" as ThreadId; - // Thread is NOT in active index - // But it IS in history variable store - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + // Thread is in store with completed status + setThread(uwf.varStore, threadId, { head: stepHash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); @@ -998,13 +1000,14 @@ describe("cmdThreadRead with completed threads", () => { }); const threadId = "01JTEST0000000000000000C2" as ThreadId; - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow: workflowHash, + setThread(uwf.varStore, threadId, { head: step3Hash, - completedAt: Date.now(), - reason: null, + status: "idle", + suspendedRole: null, + suspendMessage: null, + completedAt: null, }); + completeThread(uwf.varStore, threadId, "completed"); const markdown = await cmdThreadRead( tmpDir, diff --git a/packages/cli/src/commands/shared.ts b/packages/cli/src/commands/shared.ts index 872521f..b8a29bc 100644 --- a/packages/cli/src/commands/shared.ts +++ b/packages/cli/src/commands/shared.ts @@ -6,7 +6,7 @@ import type { StepNodePayload, ThreadId, } from "@united-workforce/protocol"; -import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js"; +import { createUwfStore, getThread, type UwfStore } from "../store.js"; type ChainState = { startHash: CasRef; @@ -207,10 +207,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - return hist.head; - } fail(`thread not found: ${threadId}`); } diff --git a/packages/cli/src/commands/step.ts b/packages/cli/src/commands/step.ts index dd5f50f..6786d4f 100644 --- a/packages/cli/src/commands/step.ts +++ b/packages/cli/src/commands/step.ts @@ -114,8 +114,10 @@ export async function cmdStepFork( const newThreadId = generateUlid(Date.now()) as ThreadId; setThread(uwf.varStore, newThreadId, { head: stepHash, + status: "idle", suspendedRole: null, suspendMessage: null, + completedAt: null, }); return { diff --git a/packages/cli/src/commands/thread.ts b/packages/cli/src/commands/thread.ts index 68b59ca..d2c06d6 100644 --- a/packages/cli/src/commands/thread.ts +++ b/packages/cli/src/commands/thread.ts @@ -38,17 +38,14 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index import { createIncludeTag } from "../include.js"; import { evaluate, isSuspendResult } from "../moderator/index.js"; import { - addHistoryEntry, + completeThread, createUwfStore, - deleteThread, - findHistoryEntry, getThread, - loadAllHistory, - loadAllThreads, + loadActiveThreads, + loadHistoryThreads, loadWorkflowRegistry, resolveWorkflowHash, setThread, - type ThreadHistoryLine, type UwfStore, } from "../store.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; @@ -485,61 +482,55 @@ export async function cmdThreadShow( ): Promise { const uwf = await createUwfStore(storageRoot); const entry = getThread(uwf.varStore, threadId); - if (entry !== null) { - const activeHead = entry.head; - const workflow = resolveWorkflowFromHead(uwf, activeHead); - if (workflow === null) { - fail(`failed to resolve workflow from head: ${activeHead}`); - } + if (entry === null) { + fail(`thread not found: ${threadId}`); + } - const status = await resolveActiveThreadStatus( - storageRoot, - threadId, - uwf, - activeHead, - workflow, - ); - const currentRole = resolveCurrentRole(uwf, activeHead, workflow); - const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); - - const hint = - status === "suspended" - ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` - : null; + const activeHead = entry.head; + const workflow = resolveWorkflowFromHead(uwf, activeHead); + if (workflow === null) { + fail(`failed to resolve workflow from head: ${activeHead}`); + } + // Determine if this is a completed/cancelled thread + if (entry.status === "completed" || entry.status === "cancelled") { + const hint = null; return { workflow, thread: threadId, head: activeHead, - status, - currentRole, - suspendedRole: suspendFields.suspendedRole, - suspendMessage: suspendFields.suspendMessage, - done: false, - background: null, - hint, - }; - } - - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed"; - - return { - workflow: hist.workflow, - thread: threadId, - head: hist.head, - status, + status: entry.status, currentRole: null, suspendedRole: null, suspendMessage: null, done: true, background: null, - hint: null, + hint, }; } - fail(`thread not found: ${threadId}`); + // Active thread + const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, activeHead, workflow); + const currentRole = resolveCurrentRole(uwf, activeHead, workflow); + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); + + const hint = + status === "suspended" + ? `Thread is suspended. Resume with: uwf thread resume ${threadId}` + : null; + + return { + workflow, + thread: threadId, + head: activeHead, + status, + currentRole, + suspendedRole: suspendFields.suspendedRole, + suspendMessage: suspendFields.suspendMessage, + done: false, + background: null, + hint, + }; } export type ThreadListItemWithStatus = ThreadListItem & { @@ -598,15 +589,15 @@ function collectCompletedThreads( activeIds: Set, ): ThreadListItemWithStatus[] { const items: ThreadListItemWithStatus[] = []; - const history = loadAllHistory(varStore); + const history = loadHistoryThreads(varStore); const seen = new Set(); // Deduplication (issue #470) - for (const entry of history) { - if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { - seen.add(entry.thread); - const status = entry.reason === "cancelled" ? "cancelled" : "completed"; + for (const [threadId, entry] of Object.entries(history)) { + if (!activeIds.has(threadId as ThreadId) && !seen.has(threadId as ThreadId)) { + seen.add(threadId as ThreadId); + const status = entry.status; items.push({ - thread: entry.thread, - workflow: entry.workflow, + thread: threadId as ThreadId, + workflow: "", // Will be resolved later if needed head: entry.head, status, currentRole: null, @@ -659,7 +650,7 @@ export async function cmdThreadList( take: number | null, ): Promise { const uwf = await createUwfStore(storageRoot); - const index = loadAllThreads(uwf.varStore); + const index = loadActiveThreads(uwf.varStore); // Collect active threads let items = await collectActiveThreads(storageRoot, uwf, index); @@ -1035,15 +1026,8 @@ function spawnAgent( return obj as unknown as AdapterOutput; } -function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void { - deleteThread(uwf.varStore, threadId); - addHistoryEntry(uwf.varStore, { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "completed", - }); +function archiveThread(uwf: UwfStore, threadId: ThreadId, _workflow: CasRef, _head: CasRef): void { + completeThread(uwf.varStore, threadId, "completed"); } export async function cmdThreadResume( @@ -1067,40 +1051,84 @@ export async function cmdThreadResume( const chain = walkChain(uwf, headHash); const workflowHash = chain.start.workflow; - const status = await resolveActiveThreadStatus( - storageRoot, - threadId, - uwf, - headHash, - workflowHash, - ); - if (status !== "suspended") { - fail(`thread is not suspended: ${threadId} (status: ${status})`); + // Check entry.status first for completed/cancelled (like in cmdThreadShow) + let status: ThreadStatus; + if (entry.status === "completed" || entry.status === "cancelled") { + status = entry.status; + } else { + status = await resolveActiveThreadStatus( + storageRoot, + threadId, + uwf, + headHash, + workflowHash, + ); } - const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); - if (suspendFields.suspendedRole === null) { - fail(`thread is suspended but suspendedRole is missing: ${threadId}`); - } - if (suspendFields.suspendMessage === null) { - fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + if (status !== "suspended" && status !== "completed") { + fail(`thread cannot be resumed: ${threadId} (status: ${status})`); } - const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); const plog = createProcessLogger({ storageRoot, context: { thread: threadId, workflow: workflowHash }, }); + if (status === "suspended") { + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); + if (suspendFields.suspendedRole === null) { + fail(`thread is suspended but suspendedRole is missing: ${threadId}`); + } + if (suspendFields.suspendMessage === null) { + fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + } + + const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); + + plog.log( + PL_THREAD_RESUME, + `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + null, + ); + + return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { + role: suspendFields.suspendedRole, + prompt: resumePrompt, + }); + } + + // status === "completed" + const workflow = loadWorkflowPayload(uwf, workflowHash); + const startResult = evaluate(workflow.graph, START_ROLE, {}); + if (!startResult.ok) { + fail(`failed to evaluate $START: ${startResult.error.message}`); + } + if (isSuspendResult(startResult.value)) { + fail("workflow cannot start with $SUSPEND"); + } + if (startResult.value.role === END_ROLE) { + fail("workflow cannot start with $END"); + } + + const startRole = startResult.value.role; + const completedPromptPrefix = "Previous run completed. Resuming with additional context."; + const completedResumePrompt = + supplement !== null && supplement !== "" + ? `${completedPromptPrefix}\n\n${supplement}` + : completedPromptPrefix; + + const updatedEntry = { ...entry, status: "idle" as const, completedAt: null }; + setThread(uwf.varStore, threadId, updatedEntry); + plog.log( PL_THREAD_RESUME, - `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + `resume completed role=${startRole} supplement=${supplement !== null}`, null, ); return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { - role: suspendFields.suspendedRole, - prompt: resumePrompt, + role: startRole, + prompt: completedResumePrompt, }); } @@ -1450,10 +1478,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise if (entry !== null) { return entry.head; } - const hist = findHistoryEntry(uwf.varStore, threadId); - if (hist !== null) { - return hist.head; - } fail(`thread not found: ${threadId}`); } @@ -1533,7 +1557,6 @@ export async function cmdThreadCancel( if (entry === null) { fail(`thread not active: ${threadId}`); } - const head = entry.head; // Check if thread is running in background and terminate it const runningMarker = await isThreadRunning(storageRoot, threadId); @@ -1546,21 +1569,7 @@ export async function cmdThreadCancel( await deleteMarker(storageRoot, threadId); } - const workflow = resolveWorkflowFromHead(uwf, head); - if (workflow === null) { - fail(`failed to resolve workflow from head: ${head}`); - } - - deleteThread(uwf.varStore, threadId); - - const historyEntry: ThreadHistoryLine = { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - reason: "cancelled", - }; - addHistoryEntry(uwf.varStore, historyEntry); + completeThread(uwf.varStore, threadId, "cancelled"); return { thread: threadId, cancelled: true }; } diff --git a/packages/cli/src/store.ts b/packages/cli/src/store.ts index b1126c1..5e508ef 100644 --- a/packages/cli/src/store.ts +++ b/packages/cli/src/store.ts @@ -26,8 +26,6 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/"; /** Variable name prefix for active thread entries (`@uwf/thread/`). */ export const THREAD_VAR_PREFIX = "@uwf/thread/"; -/** Variable name prefix for completed/cancelled thread history (`@uwf/history/`). */ -export const HISTORY_VAR_PREFIX = "@uwf/history/"; /** A workflow entry discovered from the project-local .workflows/ directory. */ export type ProjectWorkflowEntry = { @@ -156,10 +154,6 @@ export function getThreadsPath(storageRoot: string): string { return join(storageRoot, "threads.yaml"); } -export type ThreadHistoryLine = ThreadListItem & { - completedAt: number; - reason: "completed" | "cancelled" | null; -}; export type UwfStore = { storageRoot: string; @@ -179,6 +173,7 @@ export async function createUwfStore(storageRoot: string): Promise { await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); await migrateThreadsIndexIfNeeded(storageRoot, varStore); await migrateHistoryIfNeeded(storageRoot, varStore); + migrateHistoryVarsToThreadVars(varStore); return { storageRoot, store, schemas, varStore }; } @@ -299,8 +294,10 @@ function threadVarName(threadId: ThreadId): string { function entryFromVariable(v: { value: string; tags: Record }): ThreadIndexEntry { return { head: v.value as CasRef, + status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"], suspendedRole: v.tags.suspendedRole ?? null, suspendMessage: v.tags.suspendMessage ?? null, + completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null, }; } @@ -331,21 +328,75 @@ export function setThread(varStore: VarStore, threadId: ThreadId, entry: ThreadI // Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first. varStore.remove(name); const tags: Record = {}; + if (entry.status !== "idle") { + tags.status = entry.status; + } if (entry.suspendedRole !== null) { tags.suspendedRole = entry.suspendedRole; } if (entry.suspendMessage !== null) { tags.suspendMessage = entry.suspendMessage; } + if (entry.completedAt !== null) { + tags.completedAt = String(entry.completedAt); + } varStore.set(name, entry.head, { tags }); } -/** Remove an active thread entry (on complete/cancel). */ -export function deleteThread(varStore: VarStore, threadId: ThreadId): void { - varStore.remove(threadVarName(threadId)); +/** Load only active threads (status not in completed/cancelled). */ +export function loadActiveThreads(varStore: VarStore): ThreadsIndex { + const all = loadAllThreads(varStore); + const active: ThreadsIndex = {}; + for (const [threadId, entry] of Object.entries(all)) { + if (entry.status !== "completed" && entry.status !== "cancelled") { + active[threadId as ThreadId] = entry; + } + } + return active; } -function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { +/** Load only completed/cancelled threads (history). */ +export function loadHistoryThreads(varStore: VarStore): ThreadsIndex { + const all = loadAllThreads(varStore); + const history: ThreadsIndex = {}; + for (const [threadId, entry] of Object.entries(all)) { + if (entry.status === "completed" || entry.status === "cancelled") { + history[threadId as ThreadId] = entry; + } + } + return history; +} + +/** Complete a thread by marking it completed or cancelled. */ +export function completeThread( + varStore: VarStore, + threadId: ThreadId, + reason: "completed" | "cancelled", +): void { + const entry = getThread(varStore, threadId); + if (entry === null) { + return; + } + const completed = { + head: entry.head, + status: reason, + suspendedRole: null, + suspendMessage: null, + completedAt: Date.now(), + } as ThreadIndexEntry; + setThread(varStore, threadId, completed); +} + + +type LegacyHistoryEntry = { + thread: ThreadId; + workflow: CasRef; + head: CasRef; + completedAt: number; + reason: "completed" | "cancelled" | null; +}; + +function parseLegacyHistoryJsonlLine(trimmed: string): LegacyHistoryEntry | null { let raw: unknown; try { raw = JSON.parse(trimmed) as unknown; @@ -379,7 +430,7 @@ function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { return null; } -/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */ +/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/thread/*` variables with status tags. */ export async function migrateHistoryIfNeeded( storageRoot: string, varStore: VarStore, @@ -395,47 +446,43 @@ export async function migrateHistoryIfNeeded( if (trimmed === "") { continue; } - const entry = parseHistoryJsonlLine(trimmed); + const entry = parseLegacyHistoryJsonlLine(trimmed); if (entry !== null) { - addHistoryEntry(varStore, entry); + const status = entry.reason === "cancelled" ? "cancelled" : "completed"; + const threadEntry: ThreadIndexEntry = { + head: entry.head, + status: status as ThreadIndexEntry["status"], + suspendedRole: null, + suspendMessage: null, + completedAt: entry.completedAt, + }; + setThread(varStore, entry.thread, threadEntry); } } await rename(path, `${path}.migrated`); } -export function loadAllHistory(varStore: VarStore): ThreadHistoryLine[] { - const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX }); - return vars.map((v) => ({ - thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId, - workflow: v.tags.workflow ?? "", - head: v.value as CasRef, - completedAt: Number(v.tags.completedAt ?? "0"), - reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, - })); -} +/** Migrate `@uwf/history/*` variables to `@uwf/thread/*` with status tags. */ +export function migrateHistoryVarsToThreadVars(varStore: VarStore): void { + const LEGACY_HISTORY_VAR_PREFIX = "@uwf/history/"; + const vars = varStore.list({ namePrefix: LEGACY_HISTORY_VAR_PREFIX }); -export function findHistoryEntry(varStore: VarStore, threadId: ThreadId): ThreadHistoryLine | null { - const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` }); - const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`); - if (v === undefined) { - return null; + for (const v of vars) { + const threadId = v.name.slice(LEGACY_HISTORY_VAR_PREFIX.length) as ThreadId; + const reason = v.tags.reason; + const status = reason === "cancelled" ? "cancelled" : "completed"; + const completedAt = Number(v.tags.completedAt ?? Date.now()); + + const threadEntry: ThreadIndexEntry = { + head: v.value as CasRef, + status: status as ThreadIndexEntry["status"], + suspendedRole: null, + suspendMessage: null, + completedAt, + }; + + setThread(varStore, threadId, threadEntry); + varStore.remove(v.name); } - return { - thread: threadId, - workflow: v.tags.workflow ?? "", - head: v.value as CasRef, - completedAt: Number(v.tags.completedAt ?? "0"), - reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null, - }; -} - -export function addHistoryEntry(varStore: VarStore, entry: ThreadHistoryLine): void { - varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, { - tags: { - workflow: entry.workflow, - completedAt: String(entry.completedAt), - reason: entry.reason ?? "completed", - }, - }); } diff --git a/packages/protocol/src/__tests__/thread-index.test.ts b/packages/protocol/src/__tests__/thread-index.test.ts index 2d58e7e..be2bf5c 100644 --- a/packages/protocol/src/__tests__/thread-index.test.ts +++ b/packages/protocol/src/__tests__/thread-index.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test } from "vitest"; import { createThreadIndexEntry, + markThreadCompleted, markThreadSuspended, normalizeThreadIndexEntry, parseThreadsIndex, @@ -16,6 +17,8 @@ describe("thread-index", () => { head: "0123456789ABC", suspendedRole: null, suspendMessage: null, + status: "idle", + completedAt: null, }); }); @@ -29,6 +32,40 @@ describe("thread-index", () => { head: "0123456789ABC", suspendedRole: "worker", suspendMessage: "Please clarify: Which API?", + status: "idle", + completedAt: null, + }); + }); + + test("normalizeThreadIndexEntry preserves status and completedAt from new data", () => { + const entry = normalizeThreadIndexEntry({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + status: "completed", + completedAt: 1234567890, + }); + expect(entry).toEqual({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + status: "completed", + completedAt: 1234567890, + }); + }); + + test("normalizeThreadIndexEntry defaults status=idle, completedAt=null for old data", () => { + const entry = normalizeThreadIndexEntry({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + }); + expect(entry).toEqual({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + status: "idle", + completedAt: null, }); }); @@ -47,10 +84,24 @@ describe("thread-index", () => { head: "0123456789ABC", suspendedRole: "worker", suspendMessage: "Please clarify: Which API?", + status: "suspended", }); }); - test("updateThreadHead clears suspend metadata", () => { + test("serialize completed entry as object", () => { + const entry = markThreadCompleted( + createThreadIndexEntry("0123456789ABC"), + "completed", + 1234567890, + ); + expect(serializeThreadIndexEntry(entry)).toEqual({ + head: "0123456789ABC", + status: "completed", + completedAt: 1234567890, + }); + }); + + test("updateThreadHead clears suspend metadata and resets status to idle", () => { const suspended = markThreadSuspended( createThreadIndexEntry("OLDHEAD0123456"), "worker", @@ -61,6 +112,44 @@ describe("thread-index", () => { head: "NEWHEAD01234567", suspendedRole: null, suspendMessage: null, + status: "idle", + completedAt: null, + }); + }); + + test("markThreadSuspended sets status to suspended", () => { + const entry = createThreadIndexEntry("0123456789ABC"); + const suspended = markThreadSuspended(entry, "worker", "Waiting for input"); + expect(suspended).toEqual({ + head: "0123456789ABC", + suspendedRole: "worker", + suspendMessage: "Waiting for input", + status: "suspended", + completedAt: null, + }); + }); + + test("markThreadCompleted sets status and completedAt", () => { + const entry = createThreadIndexEntry("0123456789ABC"); + const completed = markThreadCompleted(entry, "completed", 1234567890); + expect(completed).toEqual({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + status: "completed", + completedAt: 1234567890, + }); + }); + + test("markThreadCompleted with cancelled status", () => { + const entry = createThreadIndexEntry("0123456789ABC"); + const cancelled = markThreadCompleted(entry, "cancelled", 9876543210); + expect(cancelled).toEqual({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + status: "cancelled", + completedAt: 9876543210, }); }); @@ -71,6 +160,7 @@ describe("thread-index", () => { head: "HEAD00000000002", suspendedRole: "reviewer", suspendMessage: "Need input", + status: "suspended", }, }; const parsed = parseThreadsIndex(raw); diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 9f8bffa..d62c170 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -5,6 +5,7 @@ export { } from "./schemas.js"; export { createThreadIndexEntry, + markThreadCompleted, markThreadSuspended, normalizeThreadIndexEntry, parseThreadsIndex, diff --git a/packages/protocol/src/thread-index.ts b/packages/protocol/src/thread-index.ts index 91daaaf..f5f0720 100644 --- a/packages/protocol/src/thread-index.ts +++ b/packages/protocol/src/thread-index.ts @@ -15,10 +15,14 @@ export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null } const suspendedRole = rec.suspendedRole; const suspendMessage = rec.suspendMessage; + const status = rec.status; + const completedAt = rec.completedAt; return { head: head as CasRef, suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null, suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null, + status: typeof status === "string" ? (status as "idle" | "running" | "suspended" | "completed" | "cancelled") : "idle", + completedAt: typeof completedAt === "number" ? completedAt : null, }; } @@ -27,6 +31,8 @@ export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry { head, suspendedRole: null, suspendMessage: null, + status: "idle", + completedAt: null, }; } @@ -35,6 +41,8 @@ export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): Thread head, suspendedRole: null, suspendMessage: null, + status: "idle", + completedAt: null, }; } @@ -47,21 +55,58 @@ export function markThreadSuspended( head: entry.head, suspendedRole, suspendMessage, + status: "suspended", + completedAt: null, + }; +} + +export function markThreadCompleted( + entry: ThreadIndexEntry, + status: "completed" | "cancelled", + now: number, +): ThreadIndexEntry { + return { + head: entry.head, + suspendedRole: null, + suspendMessage: null, + status, + completedAt: now, }; } /** Serialize for variable store — compact string when not suspended. */ export function serializeThreadIndexEntry( entry: ThreadIndexEntry, -): string | Record { - if (entry.suspendedRole === null || entry.suspendMessage === null) { +): string | Record { + // Compact string only for idle status with no suspend metadata + if (entry.status === "idle" && entry.suspendedRole === null && entry.suspendMessage === null && entry.completedAt === null) { return entry.head; } - return { + + // Build object representation + const obj: Record = { head: entry.head, - suspendedRole: entry.suspendedRole, - suspendMessage: entry.suspendMessage, }; + + // Include suspend metadata if present + if (entry.suspendedRole !== null) { + obj.suspendedRole = entry.suspendedRole; + } + if (entry.suspendMessage !== null) { + obj.suspendMessage = entry.suspendMessage; + } + + // Always include status if not idle + if (entry.status !== "idle") { + obj.status = entry.status; + } + + // Include completedAt if present + if (entry.completedAt !== null) { + obj.completedAt = entry.completedAt; + } + + return obj; } export function parseThreadsIndex(raw: unknown): ThreadsIndex { @@ -80,8 +125,8 @@ export function parseThreadsIndex(raw: unknown): ThreadsIndex { export function serializeThreadsIndex( index: ThreadsIndex, -): Record> { - const out: Record> = {}; +): Record> { + const out: Record> = {}; for (const [threadId, entry] of Object.entries(index)) { out[threadId] = serializeThreadIndexEntry(entry); } diff --git a/packages/protocol/src/types.ts b/packages/protocol/src/types.ts index 82b39f7..333eeac 100644 --- a/packages/protocol/src/types.ts +++ b/packages/protocol/src/types.ts @@ -118,6 +118,8 @@ export type ThreadIndexEntry = { head: CasRef; suspendedRole: string | null; suspendMessage: string | null; + status: ThreadStatus; + completedAt: number | null; }; /** uwf thread steps — single step entry */ diff --git a/packages/util-agent/src/storage.ts b/packages/util-agent/src/storage.ts index a065668..bec55cb 100644 --- a/packages/util-agent/src/storage.ts +++ b/packages/util-agent/src/storage.ts @@ -82,8 +82,10 @@ export async function getActiveThreadEntry( } return { head: v.value as CasRef, + status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"], suspendedRole: v.tags.suspendedRole ?? null, suspendMessage: v.tags.suspendMessage ?? null, + completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null, }; }