refactor: unified thread storage + resume completed threads #45
@@ -7,10 +7,9 @@ import { describe, expect, test } from "vitest";
|
||||
import { createMarker, deleteMarker } from "../background/index.js";
|
||||
import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
|
||||
import {
|
||||
addHistoryEntry,
|
||||
completeThread,
|
||||
createUwfStore,
|
||||
deleteThread,
|
||||
loadAllThreads,
|
||||
loadActiveThreads,
|
||||
setThread,
|
||||
} from "../store.js";
|
||||
|
||||
@@ -175,7 +174,7 @@ async function insertStepNode(
|
||||
outputPayload: Record<string, unknown>,
|
||||
): Promise<void> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const index = loadAllThreads(uwf.varStore);
|
||||
const index = loadActiveThreads(uwf.varStore);
|
||||
const headEntry = index[threadId];
|
||||
if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
|
||||
const head = headEntry.head;
|
||||
@@ -206,7 +205,13 @@ async function insertStepNode(
|
||||
assembledPrompt: null,
|
||||
})) as CasRef;
|
||||
|
||||
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: stepHash,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
}
|
||||
|
||||
describe("currentRole field", () => {
|
||||
@@ -286,15 +291,8 @@ describe("currentRole field", () => {
|
||||
const tid = thread as ThreadId;
|
||||
|
||||
const uwfForIndex = await createUwfStore(storageRoot);
|
||||
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
|
||||
deleteThread(uwfForIndex.varStore, tid);
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: tid,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
});
|
||||
const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
|
||||
completeThread(uwfForIndex.varStore, tid, "completed");
|
||||
|
||||
const result = await cmdThreadShow(storageRoot, tid);
|
||||
expect(result.status).toBe("completed");
|
||||
@@ -314,15 +312,8 @@ describe("currentRole field", () => {
|
||||
const tid = thread as ThreadId;
|
||||
|
||||
const uwfForIndex = await createUwfStore(storageRoot);
|
||||
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
|
||||
deleteThread(uwfForIndex.varStore, tid);
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: tid,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "cancelled",
|
||||
});
|
||||
const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
|
||||
completeThread(uwfForIndex.varStore, tid, "cancelled");
|
||||
|
||||
const result = await cmdThreadShow(storageRoot, tid);
|
||||
expect(result.status).toBe("cancelled");
|
||||
@@ -375,15 +366,8 @@ describe("currentRole field", () => {
|
||||
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
|
||||
const compId = comp.thread as ThreadId;
|
||||
const uwfForIndex = await createUwfStore(storageRoot);
|
||||
const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head;
|
||||
deleteThread(uwfForIndex.varStore, compId);
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: compId,
|
||||
workflow: comp.workflow,
|
||||
head: compHead,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
});
|
||||
const compHead = loadActiveThreads(uwfForIndex.varStore)[compId]!.head;
|
||||
completeThread(uwfForIndex.varStore, compId, "completed");
|
||||
|
||||
const list = await cmdThreadList(storageRoot, null, null, null, 0, 100);
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import { join } from "node:path";
|
||||
import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol";
|
||||
import { afterEach, beforeEach, describe, expect, test } from "vitest";
|
||||
import { resolveHeadHash } from "../commands/shared.js";
|
||||
import { addHistoryEntry, createUwfStore, setThread } from "../store.js";
|
||||
import { completeThread, createUwfStore, setThread } from "../store.js";
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
@@ -31,19 +31,13 @@ describe("resolveHeadHash", () => {
|
||||
expect(result).toBe(headHash);
|
||||
});
|
||||
|
||||
test("falls back to history variable when thread not in active index", async () => {
|
||||
test("finds completed thread", async () => {
|
||||
const threadId = "01JTEST0000000000000000002" as ThreadId;
|
||||
const workflowHash = "workflow_hash_789" as CasRef;
|
||||
|
||||
const uwf = await createUwfStore(tmpDir);
|
||||
const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef;
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
head: headHash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const result = await resolveHeadHash(tmpDir, threadId);
|
||||
|
||||
@@ -54,58 +48,36 @@ describe("resolveHeadHash", () => {
|
||||
// calls fail() which does process.exit(1), terminating the test runner.
|
||||
// The error behavior is tested in integration tests below via CLI invocation.
|
||||
|
||||
test("prioritizes active thread over history when thread exists in both", async () => {
|
||||
test("prioritizes active thread", async () => {
|
||||
const threadId = "01JTEST0000000000000000004" as ThreadId;
|
||||
const workflowHash = "workflow_hash_xyz" as CasRef;
|
||||
|
||||
const uwf = await createUwfStore(tmpDir);
|
||||
const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef;
|
||||
const historicalHash = (await uwf.store.cas.put(uwf.schemas.text, "historical-v1")) as CasRef;
|
||||
setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead));
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
head: historicalHash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
|
||||
const result = await resolveHeadHash(tmpDir, threadId);
|
||||
|
||||
// Should return the active head, not the historical one
|
||||
// Should return the active head
|
||||
expect(result).toBe(activeHead);
|
||||
});
|
||||
|
||||
test("finds thread from multiple history entries", async () => {
|
||||
test("finds thread from multiple completed threads", async () => {
|
||||
const threadId1 = "01JTEST0000000000000000005" as ThreadId;
|
||||
const threadId2 = "01JTEST0000000000000000006" as ThreadId;
|
||||
const threadId3 = "01JTEST0000000000000000007" as ThreadId;
|
||||
const workflowHash = "workflow_hash_abc" as CasRef;
|
||||
const uwf = await createUwfStore(tmpDir);
|
||||
const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef;
|
||||
const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef;
|
||||
const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef;
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId1,
|
||||
workflow: workflowHash,
|
||||
head: hash1,
|
||||
completedAt: Date.now() - 2000,
|
||||
reason: null,
|
||||
});
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId2,
|
||||
workflow: workflowHash,
|
||||
head: hash2,
|
||||
completedAt: Date.now() - 1000,
|
||||
reason: null,
|
||||
});
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId3,
|
||||
workflow: workflowHash,
|
||||
head: hash3,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
|
||||
setThread(uwf.varStore, threadId1, createThreadIndexEntry(hash1));
|
||||
completeThread(uwf.varStore, threadId1, "completed");
|
||||
|
||||
setThread(uwf.varStore, threadId2, createThreadIndexEntry(hash2));
|
||||
completeThread(uwf.varStore, threadId2, "completed");
|
||||
|
||||
setThread(uwf.varStore, threadId3, createThreadIndexEntry(hash3));
|
||||
completeThread(uwf.varStore, threadId3, "completed");
|
||||
|
||||
const result = await resolveHeadHash(tmpDir, threadId2);
|
||||
|
||||
|
||||
@@ -226,19 +226,15 @@ describe("Global CAS directory", () => {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const threadId = "thread-123" as ThreadId;
|
||||
const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head");
|
||||
const { addHistoryEntry, findHistoryEntry } = await import("../store.js");
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: "workflow-456",
|
||||
head: headHash,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
});
|
||||
const { completeThread, setThread, getThread } = await import("../store.js");
|
||||
const { createThreadIndexEntry } = await import("@united-workforce/protocol");
|
||||
|
||||
const entry = findHistoryEntry(uwf.varStore, threadId);
|
||||
expect(entry?.thread).toBe(threadId);
|
||||
expect(entry?.workflow).toBe("workflow-456");
|
||||
setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry?.head).toBe(headHash);
|
||||
expect(entry?.status).toBe("completed");
|
||||
|
||||
const { access } = await import("node:fs/promises");
|
||||
await access(join(globalCasDir, "vars"));
|
||||
@@ -274,15 +270,12 @@ describe("Global CAS directory", () => {
|
||||
);
|
||||
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const { findHistoryEntry } = await import("../store.js");
|
||||
const entry = findHistoryEntry(uwf.varStore, threadId);
|
||||
expect(entry).toEqual({
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
head: headHash,
|
||||
completedAt,
|
||||
reason: "cancelled",
|
||||
});
|
||||
const { getThread } = await import("../store.js");
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.head).toBe(headHash);
|
||||
expect(entry?.status).toBe("cancelled");
|
||||
expect(entry?.completedAt).toBe(completedAt);
|
||||
|
||||
await expect(access(historyPath)).rejects.toThrow();
|
||||
const migratedContent = await readFile(`${historyPath}.migrated`, "utf8");
|
||||
|
||||
@@ -0,0 +1,235 @@
|
||||
import { mkdir, mkdtemp } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import type { CasRef, ThreadId } from "@united-workforce/protocol";
|
||||
import { describe, expect, test } from "vitest";
|
||||
import {
|
||||
completeThread,
|
||||
createUwfStore,
|
||||
getThread,
|
||||
loadActiveThreads,
|
||||
loadHistoryThreads,
|
||||
setThread,
|
||||
} from "../store.js";
|
||||
|
||||
async function makeUwfStore(storageRoot: string) {
|
||||
const casDir = join(storageRoot, "cas");
|
||||
await mkdir(casDir, { recursive: true });
|
||||
process.env.OCAS_DIR = casDir;
|
||||
return createUwfStore(storageRoot);
|
||||
}
|
||||
|
||||
async function seedThreadHead(
|
||||
uwf: Awaited<ReturnType<typeof createUwfStore>>,
|
||||
label: string,
|
||||
): Promise<CasRef> {
|
||||
return (await uwf.store.cas.put(uwf.schemas.text, label)) as CasRef;
|
||||
}
|
||||
|
||||
describe("unified thread storage", () => {
|
||||
test("loadActiveThreads excludes completed threads", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
|
||||
const threadId1 = "01JTEST000000000000ACTIVE1" as ThreadId;
|
||||
const threadId2 = "01JTEST000000000000ACTIVE2" as ThreadId;
|
||||
const head1 = await seedThreadHead(uwf, "active-head");
|
||||
const head2 = await seedThreadHead(uwf, "completed-head");
|
||||
|
||||
setThread(uwf.varStore, threadId1, {
|
||||
head: head1,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
setThread(uwf.varStore, threadId2, {
|
||||
head: head2,
|
||||
status: "completed",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
|
||||
const active = loadActiveThreads(uwf.varStore);
|
||||
expect(Object.keys(active)).toHaveLength(1);
|
||||
expect(active[threadId1]).toBeDefined();
|
||||
expect(active[threadId2]).toBeUndefined();
|
||||
});
|
||||
|
||||
test("loadActiveThreads excludes cancelled threads", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
|
||||
const threadId1 = "01JTEST000000000000ACTIVE3" as ThreadId;
|
||||
const threadId2 = "01JTEST000000000000ACTIVE4" as ThreadId;
|
||||
const head1 = await seedThreadHead(uwf, "active-head");
|
||||
const head2 = await seedThreadHead(uwf, "cancelled-head");
|
||||
|
||||
setThread(uwf.varStore, threadId1, {
|
||||
head: head1,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
setThread(uwf.varStore, threadId2, {
|
||||
head: head2,
|
||||
status: "cancelled",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
|
||||
const active = loadActiveThreads(uwf.varStore);
|
||||
expect(Object.keys(active)).toHaveLength(1);
|
||||
expect(active[threadId1]).toBeDefined();
|
||||
expect(active[threadId2]).toBeUndefined();
|
||||
});
|
||||
|
||||
test("loadHistoryThreads only returns completed and cancelled", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-history-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
|
||||
const threadId1 = "01JTEST000000000000HISTOR1" as ThreadId;
|
||||
const threadId2 = "01JTEST000000000000HISTOR2" as ThreadId;
|
||||
const threadId3 = "01JTEST000000000000HISTOR3" as ThreadId;
|
||||
const head1 = await seedThreadHead(uwf, "active-head");
|
||||
const head2 = await seedThreadHead(uwf, "completed-head");
|
||||
const head3 = await seedThreadHead(uwf, "cancelled-head");
|
||||
|
||||
setThread(uwf.varStore, threadId1, {
|
||||
head: head1,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
setThread(uwf.varStore, threadId2, {
|
||||
head: head2,
|
||||
status: "completed",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
|
||||
setThread(uwf.varStore, threadId3, {
|
||||
head: head3,
|
||||
status: "cancelled",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
|
||||
const history = loadHistoryThreads(uwf.varStore);
|
||||
expect(Object.keys(history)).toHaveLength(2);
|
||||
expect(history[threadId1]).toBeUndefined();
|
||||
expect(history[threadId2]).toBeDefined();
|
||||
expect(history[threadId3]).toBeDefined();
|
||||
});
|
||||
|
||||
test("completeThread marks thread as completed", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const threadId = "01JTEST000000000000COMPLE1" as ThreadId;
|
||||
const head = await seedThreadHead(uwf, "active-head");
|
||||
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("completed");
|
||||
expect(entry?.completedAt).toBeDefined();
|
||||
expect(entry?.completedAt).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
test("completeThread marks thread as cancelled", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const threadId = "01JTEST000000000000COMPLE2" as ThreadId;
|
||||
const head = await seedThreadHead(uwf, "active-head");
|
||||
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
completeThread(uwf.varStore, threadId, "cancelled");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("cancelled");
|
||||
expect(entry?.completedAt).toBeDefined();
|
||||
expect(entry?.completedAt).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
test("completeThread clears suspend metadata", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const threadId = "01JTEST000000000000COMPLE3" as ThreadId;
|
||||
const head = await seedThreadHead(uwf, "suspended-head");
|
||||
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
status: "suspended",
|
||||
suspendedRole: "test-role",
|
||||
suspendMessage: "test message",
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("completed");
|
||||
expect(entry?.suspendedRole).toBeNull();
|
||||
expect(entry?.suspendMessage).toBeNull();
|
||||
});
|
||||
|
||||
test("completeThread handles non-existent thread gracefully", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const threadId = "01JTEST000000000000NOEXIST" as ThreadId;
|
||||
|
||||
// Should not throw
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).toBeNull();
|
||||
});
|
||||
|
||||
test("status and completedAt tags are persisted and loaded", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-tags-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const threadId = "01JTEST000000000000TAGTEST" as ThreadId;
|
||||
const head = await seedThreadHead(uwf, "test-head");
|
||||
const now = Date.now();
|
||||
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
status: "completed",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: now,
|
||||
});
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("completed");
|
||||
expect(entry?.completedAt).toBe(now);
|
||||
});
|
||||
});
|
||||
@@ -3,7 +3,13 @@ import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import type { CasRef, ThreadId } from "@united-workforce/protocol";
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js";
|
||||
import {
|
||||
completeThread,
|
||||
createUwfStore,
|
||||
getThread,
|
||||
loadHistoryThreads,
|
||||
setThread,
|
||||
} from "../store.js";
|
||||
|
||||
async function makeUwfStore(storageRoot: string) {
|
||||
const casDir = join(storageRoot, "cas");
|
||||
@@ -20,88 +26,113 @@ async function seedHistoryHead(
|
||||
}
|
||||
|
||||
describe("thread cancel status", () => {
|
||||
test("cancelled history entry has reason 'cancelled'", async () => {
|
||||
test("cancelled thread has status 'cancelled'", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
|
||||
const threadId = "01JTEST000000000000CANCEL1" as ThreadId;
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const head = await seedHistoryHead(uwf, "cancelled-head");
|
||||
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: "test-workflow",
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "cancelled",
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
const history = loadAllHistory(uwf.varStore);
|
||||
expect(history).toHaveLength(1);
|
||||
expect(history[0]?.reason).toBe("cancelled");
|
||||
completeThread(uwf.varStore, threadId, "cancelled");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("cancelled");
|
||||
});
|
||||
|
||||
test("completed history entry has reason 'completed'", async () => {
|
||||
test("completed thread has status 'completed'", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
|
||||
const threadId = "01JTEST000000000000CANCEL2" as ThreadId;
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const head = await seedHistoryHead(uwf, "completed-head");
|
||||
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: "test-workflow",
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
const history = loadAllHistory(uwf.varStore);
|
||||
expect(history).toHaveLength(1);
|
||||
expect(history[0]?.reason).toBe("completed");
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.status).toBe("completed");
|
||||
});
|
||||
|
||||
test("history entry with null reason is stored as completed", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
|
||||
const threadId = "01JTEST000000000000CANCEL3" as ThreadId;
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const head = await seedHistoryHead(uwf, "legacy-head");
|
||||
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: "test-workflow",
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
|
||||
const history = loadAllHistory(uwf.varStore);
|
||||
expect(history).toHaveLength(1);
|
||||
expect(history[0]?.reason).toBe("completed");
|
||||
});
|
||||
|
||||
test("mixed completed and cancelled entries preserve distinct reasons", async () => {
|
||||
test("loadHistoryThreads returns completed and cancelled", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const head1 = await seedHistoryHead(uwf, "head1");
|
||||
const head2 = await seedHistoryHead(uwf, "head2");
|
||||
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: "01JTEST000000000000CANCEL4" as ThreadId,
|
||||
workflow: "test-workflow",
|
||||
const threadId1 = "01JTEST000000000000CANCEL4" as ThreadId;
|
||||
setThread(uwf.varStore, threadId1, {
|
||||
head: head1,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId1, "completed");
|
||||
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: "01JTEST000000000000CANCEL5" as ThreadId,
|
||||
workflow: "test-workflow",
|
||||
const threadId2 = "01JTEST000000000000CANCEL5" as ThreadId;
|
||||
setThread(uwf.varStore, threadId2, {
|
||||
head: head2,
|
||||
completedAt: Date.now(),
|
||||
reason: "cancelled",
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId2, "cancelled");
|
||||
|
||||
const history = loadAllHistory(uwf.varStore);
|
||||
expect(history).toHaveLength(2);
|
||||
const reasons = history.map((entry) => entry.reason).sort();
|
||||
expect(reasons).toEqual(["cancelled", "completed"]);
|
||||
const history = loadHistoryThreads(uwf.varStore);
|
||||
expect(Object.keys(history)).toHaveLength(2);
|
||||
const statuses = Object.values(history)
|
||||
.map((entry) => entry.status)
|
||||
.sort();
|
||||
expect(statuses).toEqual(["cancelled", "completed"]);
|
||||
});
|
||||
|
||||
test("mixed completed and cancelled entries preserve distinct statuses", async () => {
|
||||
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const head1 = await seedHistoryHead(uwf, "head1");
|
||||
const head2 = await seedHistoryHead(uwf, "head2");
|
||||
|
||||
const threadId1 = "01JTEST000000000000CANCEL6" as ThreadId;
|
||||
setThread(uwf.varStore, threadId1, {
|
||||
head: head1,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId1, "completed");
|
||||
|
||||
const threadId2 = "01JTEST000000000000CANCEL7" as ThreadId;
|
||||
setThread(uwf.varStore, threadId2, {
|
||||
head: head2,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId2, "cancelled");
|
||||
|
||||
const history = loadHistoryThreads(uwf.varStore);
|
||||
expect(Object.keys(history)).toHaveLength(2);
|
||||
const statuses = Object.values(history)
|
||||
.map((entry) => entry.status)
|
||||
.sort();
|
||||
expect(statuses).toEqual(["cancelled", "completed"]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,9 +10,8 @@ import { cmdThreadList } from "../commands/thread.js";
|
||||
import { parseTimeInput } from "../commands/thread-time-parser.js";
|
||||
import type { UwfStore } from "../store.js";
|
||||
import {
|
||||
addHistoryEntry,
|
||||
completeThread as completeThreadInStore,
|
||||
createUwfStore,
|
||||
deleteThread,
|
||||
loadAllThreads,
|
||||
setThread,
|
||||
} from "../store.js";
|
||||
@@ -77,14 +76,7 @@ async function completeThread(
|
||||
headHash: CasRef,
|
||||
) {
|
||||
const uwfIdx = await createUwfStore(storageRoot);
|
||||
deleteThread(uwfIdx.varStore, threadId);
|
||||
addHistoryEntry(uwfIdx.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
head: headHash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
completeThreadInStore(uwfIdx.varStore, threadId, "completed");
|
||||
}
|
||||
|
||||
// ── test setup ────────────────────────────────────────────────────────────────
|
||||
@@ -500,8 +492,10 @@ describe("edge cases", () => {
|
||||
)) as CasRef;
|
||||
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
|
||||
head: placeholderHead,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
};
|
||||
for (const [tid, ent] of Object.entries(index)) {
|
||||
setThread(uwfIdx.varStore, tid as ThreadId, ent);
|
||||
|
||||
@@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
|
||||
await seedThreads(tmpDir, {
|
||||
[THREAD_ID]: {
|
||||
head: stepHash,
|
||||
status: "suspended",
|
||||
suspendedRole: "worker",
|
||||
suspendMessage: SUSPEND_MESSAGE,
|
||||
completedAt: null,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -347,8 +349,10 @@ describe("uwf thread resume", () => {
|
||||
const uwfAfterFirst = await createUwfStore(tmpDir);
|
||||
expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({
|
||||
head: firstResume.head,
|
||||
status: "suspended",
|
||||
suspendedRole: "worker",
|
||||
suspendMessage: SUSPEND_MESSAGE,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent(
|
||||
|
||||
@@ -7,9 +7,8 @@ import { describe, expect, test } from "vitest";
|
||||
import { createMarker, deleteMarker } from "../background/index.js";
|
||||
import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
|
||||
import {
|
||||
addHistoryEntry,
|
||||
completeThread,
|
||||
createUwfStore,
|
||||
deleteThread,
|
||||
loadAllThreads,
|
||||
setThread,
|
||||
} from "../store.js";
|
||||
@@ -118,7 +117,13 @@ async function insertStepNode(
|
||||
assembledPrompt: null,
|
||||
})) as CasRef;
|
||||
|
||||
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: stepHash,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
}
|
||||
|
||||
describe("thread show status field", () => {
|
||||
@@ -208,15 +213,7 @@ describe("thread show status field", () => {
|
||||
const head = index[threadId]!.head;
|
||||
if (!head) throw new Error("Thread not found in index");
|
||||
|
||||
deleteThread(uwfForIndex.varStore, threadId);
|
||||
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
});
|
||||
completeThread(uwfForIndex.varStore, threadId, "completed");
|
||||
|
||||
const result = await cmdThreadShow(storageRoot, threadId);
|
||||
|
||||
@@ -245,15 +242,7 @@ describe("thread show status field", () => {
|
||||
const head = index[threadId]!.head;
|
||||
if (!head) throw new Error("Thread not found in index");
|
||||
|
||||
deleteThread(uwfForIndex.varStore, threadId);
|
||||
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "cancelled",
|
||||
});
|
||||
completeThread(uwfForIndex.varStore, threadId, "cancelled");
|
||||
|
||||
const result = await cmdThreadShow(storageRoot, threadId);
|
||||
|
||||
@@ -282,15 +271,7 @@ describe("thread show status field", () => {
|
||||
const head = index[threadId]!.head;
|
||||
if (!head) throw new Error("Thread not found in index");
|
||||
|
||||
deleteThread(uwfForIndex.varStore, threadId);
|
||||
|
||||
addHistoryEntry(uwfForIndex.varStore, {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
});
|
||||
completeThread(uwfForIndex.varStore, threadId, "completed");
|
||||
|
||||
const result = await cmdThreadShow(storageRoot, threadId);
|
||||
|
||||
|
||||
@@ -160,8 +160,10 @@ describe("suspend step CAS chain and threads.yaml metadata", () => {
|
||||
const threadEntry = getThread(uwf.varStore, threadId);
|
||||
expect(threadEntry).toEqual({
|
||||
head: stepHash,
|
||||
status: "suspended",
|
||||
suspendedRole: "worker",
|
||||
suspendMessage: "Please clarify: Which API?",
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
const showResult = await cmdThreadShow(tmpDir, threadId);
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
THREAD_READ_DEFAULT_QUOTA,
|
||||
} from "../commands/thread.js";
|
||||
import type { UwfStore } from "../store.js";
|
||||
import { addHistoryEntry, createUwfStore } from "../store.js";
|
||||
import { completeThread, createUwfStore, setThread } from "../store.js";
|
||||
import { seedThreads } from "./thread-test-helpers.js";
|
||||
|
||||
// ── schemas used in tests ────────────────────────────────────────────────────
|
||||
@@ -745,13 +745,14 @@ describe("cmdStepList with completed threads", () => {
|
||||
const threadId = "01JTEST0000000000000000A2" as ThreadId;
|
||||
// Thread is NOT in active index (simulating completed thread)
|
||||
// But it IS in history variable store
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: step2Hash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const result = await cmdStepList(tmpDir, threadId);
|
||||
|
||||
@@ -872,14 +873,15 @@ describe("cmdStepShow with completed threads", () => {
|
||||
|
||||
const threadId = "01JTEST0000000000000000B2" as ThreadId;
|
||||
// Thread is NOT in active index
|
||||
// But it IS in history variable store
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
// But it IS in the unified store with completed status
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: stepHash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const result = await cmdStepShow(tmpDir, stepHash);
|
||||
|
||||
@@ -934,15 +936,15 @@ describe("cmdThreadRead with completed threads", () => {
|
||||
});
|
||||
|
||||
const threadId = "01JTEST0000000000000000C1" as ThreadId;
|
||||
// Thread is NOT in active index
|
||||
// But it IS in history variable store
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
// Thread is in store with completed status
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: stepHash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
|
||||
|
||||
@@ -998,13 +1000,14 @@ describe("cmdThreadRead with completed threads", () => {
|
||||
});
|
||||
|
||||
const threadId = "01JTEST0000000000000000C2" as ThreadId;
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
setThread(uwf.varStore, threadId, {
|
||||
head: step3Hash,
|
||||
completedAt: Date.now(),
|
||||
reason: null,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
|
||||
const markdown = await cmdThreadRead(
|
||||
tmpDir,
|
||||
|
||||
@@ -6,7 +6,7 @@ import type {
|
||||
StepNodePayload,
|
||||
ThreadId,
|
||||
} from "@united-workforce/protocol";
|
||||
import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js";
|
||||
import { createUwfStore, getThread, type UwfStore } from "../store.js";
|
||||
|
||||
type ChainState = {
|
||||
startHash: CasRef;
|
||||
@@ -207,10 +207,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
|
||||
if (entry !== null) {
|
||||
return entry.head;
|
||||
}
|
||||
const hist = findHistoryEntry(uwf.varStore, threadId);
|
||||
if (hist !== null) {
|
||||
return hist.head;
|
||||
}
|
||||
fail(`thread not found: ${threadId}`);
|
||||
}
|
||||
|
||||
|
||||
@@ -114,8 +114,10 @@ export async function cmdStepFork(
|
||||
const newThreadId = generateUlid(Date.now()) as ThreadId;
|
||||
setThread(uwf.varStore, newThreadId, {
|
||||
head: stepHash,
|
||||
status: "idle",
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: null,
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
@@ -38,17 +38,14 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index
|
||||
import { createIncludeTag } from "../include.js";
|
||||
import { evaluate, isSuspendResult } from "../moderator/index.js";
|
||||
import {
|
||||
addHistoryEntry,
|
||||
completeThread,
|
||||
createUwfStore,
|
||||
deleteThread,
|
||||
findHistoryEntry,
|
||||
getThread,
|
||||
loadAllHistory,
|
||||
loadAllThreads,
|
||||
loadActiveThreads,
|
||||
loadHistoryThreads,
|
||||
loadWorkflowRegistry,
|
||||
resolveWorkflowHash,
|
||||
setThread,
|
||||
type ThreadHistoryLine,
|
||||
type UwfStore,
|
||||
} from "../store.js";
|
||||
import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js";
|
||||
@@ -485,61 +482,55 @@ export async function cmdThreadShow(
|
||||
): Promise<ThreadShowOutput> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
if (entry !== null) {
|
||||
const activeHead = entry.head;
|
||||
const workflow = resolveWorkflowFromHead(uwf, activeHead);
|
||||
if (workflow === null) {
|
||||
fail(`failed to resolve workflow from head: ${activeHead}`);
|
||||
}
|
||||
if (entry === null) {
|
||||
fail(`thread not found: ${threadId}`);
|
||||
}
|
||||
|
||||
const status = await resolveActiveThreadStatus(
|
||||
storageRoot,
|
||||
threadId,
|
||||
uwf,
|
||||
activeHead,
|
||||
workflow,
|
||||
);
|
||||
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
|
||||
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
|
||||
|
||||
const hint =
|
||||
status === "suspended"
|
||||
? `Thread is suspended. Resume with: uwf thread resume ${threadId}`
|
||||
: null;
|
||||
const activeHead = entry.head;
|
||||
const workflow = resolveWorkflowFromHead(uwf, activeHead);
|
||||
if (workflow === null) {
|
||||
fail(`failed to resolve workflow from head: ${activeHead}`);
|
||||
}
|
||||
|
||||
// Determine if this is a completed/cancelled thread
|
||||
if (entry.status === "completed" || entry.status === "cancelled") {
|
||||
const hint = null;
|
||||
return {
|
||||
workflow,
|
||||
thread: threadId,
|
||||
head: activeHead,
|
||||
status,
|
||||
currentRole,
|
||||
suspendedRole: suspendFields.suspendedRole,
|
||||
suspendMessage: suspendFields.suspendMessage,
|
||||
done: false,
|
||||
background: null,
|
||||
hint,
|
||||
};
|
||||
}
|
||||
|
||||
const hist = findHistoryEntry(uwf.varStore, threadId);
|
||||
if (hist !== null) {
|
||||
const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed";
|
||||
|
||||
return {
|
||||
workflow: hist.workflow,
|
||||
thread: threadId,
|
||||
head: hist.head,
|
||||
status,
|
||||
status: entry.status,
|
||||
currentRole: null,
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
done: true,
|
||||
background: null,
|
||||
hint: null,
|
||||
hint,
|
||||
};
|
||||
}
|
||||
|
||||
fail(`thread not found: ${threadId}`);
|
||||
// Active thread
|
||||
const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, activeHead, workflow);
|
||||
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
|
||||
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
|
||||
|
||||
const hint =
|
||||
status === "suspended"
|
||||
? `Thread is suspended. Resume with: uwf thread resume ${threadId}`
|
||||
: null;
|
||||
|
||||
return {
|
||||
workflow,
|
||||
thread: threadId,
|
||||
head: activeHead,
|
||||
status,
|
||||
currentRole,
|
||||
suspendedRole: suspendFields.suspendedRole,
|
||||
suspendMessage: suspendFields.suspendMessage,
|
||||
done: false,
|
||||
background: null,
|
||||
hint,
|
||||
};
|
||||
}
|
||||
|
||||
export type ThreadListItemWithStatus = ThreadListItem & {
|
||||
@@ -598,15 +589,15 @@ function collectCompletedThreads(
|
||||
activeIds: Set<ThreadId>,
|
||||
): ThreadListItemWithStatus[] {
|
||||
const items: ThreadListItemWithStatus[] = [];
|
||||
const history = loadAllHistory(varStore);
|
||||
const history = loadHistoryThreads(varStore);
|
||||
const seen = new Set<ThreadId>(); // Deduplication (issue #470)
|
||||
for (const entry of history) {
|
||||
if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) {
|
||||
seen.add(entry.thread);
|
||||
const status = entry.reason === "cancelled" ? "cancelled" : "completed";
|
||||
for (const [threadId, entry] of Object.entries(history)) {
|
||||
if (!activeIds.has(threadId as ThreadId) && !seen.has(threadId as ThreadId)) {
|
||||
seen.add(threadId as ThreadId);
|
||||
const status = entry.status;
|
||||
items.push({
|
||||
thread: entry.thread,
|
||||
workflow: entry.workflow,
|
||||
thread: threadId as ThreadId,
|
||||
workflow: "", // Will be resolved later if needed
|
||||
head: entry.head,
|
||||
status,
|
||||
currentRole: null,
|
||||
@@ -659,7 +650,7 @@ export async function cmdThreadList(
|
||||
take: number | null,
|
||||
): Promise<ThreadListItemWithStatus[]> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const index = loadAllThreads(uwf.varStore);
|
||||
const index = loadActiveThreads(uwf.varStore);
|
||||
|
||||
// Collect active threads
|
||||
let items = await collectActiveThreads(storageRoot, uwf, index);
|
||||
@@ -1035,15 +1026,8 @@ function spawnAgent(
|
||||
return obj as unknown as AdapterOutput;
|
||||
}
|
||||
|
||||
function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void {
|
||||
deleteThread(uwf.varStore, threadId);
|
||||
addHistoryEntry(uwf.varStore, {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "completed",
|
||||
});
|
||||
function archiveThread(uwf: UwfStore, threadId: ThreadId, _workflow: CasRef, _head: CasRef): void {
|
||||
completeThread(uwf.varStore, threadId, "completed");
|
||||
}
|
||||
|
||||
export async function cmdThreadResume(
|
||||
@@ -1450,10 +1434,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
|
||||
if (entry !== null) {
|
||||
return entry.head;
|
||||
}
|
||||
const hist = findHistoryEntry(uwf.varStore, threadId);
|
||||
if (hist !== null) {
|
||||
return hist.head;
|
||||
}
|
||||
fail(`thread not found: ${threadId}`);
|
||||
}
|
||||
|
||||
@@ -1533,7 +1513,6 @@ export async function cmdThreadCancel(
|
||||
if (entry === null) {
|
||||
fail(`thread not active: ${threadId}`);
|
||||
}
|
||||
const head = entry.head;
|
||||
|
||||
// Check if thread is running in background and terminate it
|
||||
const runningMarker = await isThreadRunning(storageRoot, threadId);
|
||||
@@ -1546,21 +1525,7 @@ export async function cmdThreadCancel(
|
||||
await deleteMarker(storageRoot, threadId);
|
||||
}
|
||||
|
||||
const workflow = resolveWorkflowFromHead(uwf, head);
|
||||
if (workflow === null) {
|
||||
fail(`failed to resolve workflow from head: ${head}`);
|
||||
}
|
||||
|
||||
deleteThread(uwf.varStore, threadId);
|
||||
|
||||
const historyEntry: ThreadHistoryLine = {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
head,
|
||||
completedAt: Date.now(),
|
||||
reason: "cancelled",
|
||||
};
|
||||
addHistoryEntry(uwf.varStore, historyEntry);
|
||||
completeThread(uwf.varStore, threadId, "cancelled");
|
||||
|
||||
return { thread: threadId, cancelled: true };
|
||||
}
|
||||
|
||||
+92
-45
@@ -26,8 +26,6 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
|
||||
/** Variable name prefix for active thread entries (`@uwf/thread/<thread-id>`). */
|
||||
export const THREAD_VAR_PREFIX = "@uwf/thread/";
|
||||
|
||||
/** Variable name prefix for completed/cancelled thread history (`@uwf/history/<thread-id>`). */
|
||||
export const HISTORY_VAR_PREFIX = "@uwf/history/";
|
||||
|
||||
/** A workflow entry discovered from the project-local .workflows/ directory. */
|
||||
export type ProjectWorkflowEntry = {
|
||||
@@ -160,10 +158,6 @@ export function getThreadsPath(storageRoot: string): string {
|
||||
return join(storageRoot, "threads.yaml");
|
||||
}
|
||||
|
||||
export type ThreadHistoryLine = ThreadListItem & {
|
||||
completedAt: number;
|
||||
reason: "completed" | "cancelled" | null;
|
||||
};
|
||||
|
||||
export type UwfStore = {
|
||||
storageRoot: string;
|
||||
@@ -183,6 +177,7 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
|
||||
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
|
||||
await migrateThreadsIndexIfNeeded(storageRoot, varStore);
|
||||
await migrateHistoryIfNeeded(storageRoot, varStore);
|
||||
migrateHistoryVarsToThreadVars(varStore);
|
||||
return { storageRoot, store, schemas, varStore };
|
||||
}
|
||||
|
||||
@@ -303,8 +298,10 @@ function threadVarName(threadId: ThreadId): string {
|
||||
function entryFromVariable(v: { value: string; tags: Record<string, string> }): ThreadIndexEntry {
|
||||
return {
|
||||
head: v.value as CasRef,
|
||||
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
|
||||
suspendedRole: v.tags.suspendedRole ?? null,
|
||||
suspendMessage: v.tags.suspendMessage ?? null,
|
||||
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -335,21 +332,75 @@ export function setThread(varStore: VarStore, threadId: ThreadId, entry: ThreadI
|
||||
// Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first.
|
||||
varStore.remove(name);
|
||||
const tags: Record<string, string> = {};
|
||||
if (entry.status !== "idle") {
|
||||
tags.status = entry.status;
|
||||
}
|
||||
if (entry.suspendedRole !== null) {
|
||||
tags.suspendedRole = entry.suspendedRole;
|
||||
}
|
||||
if (entry.suspendMessage !== null) {
|
||||
tags.suspendMessage = entry.suspendMessage;
|
||||
}
|
||||
if (entry.completedAt !== null) {
|
||||
tags.completedAt = String(entry.completedAt);
|
||||
}
|
||||
varStore.set(name, entry.head, { tags });
|
||||
}
|
||||
|
||||
/** Remove an active thread entry (on complete/cancel). */
|
||||
export function deleteThread(varStore: VarStore, threadId: ThreadId): void {
|
||||
varStore.remove(threadVarName(threadId));
|
||||
/** Load only active threads (status not in completed/cancelled). */
|
||||
export function loadActiveThreads(varStore: VarStore): ThreadsIndex {
|
||||
const all = loadAllThreads(varStore);
|
||||
const active: ThreadsIndex = {};
|
||||
for (const [threadId, entry] of Object.entries(all)) {
|
||||
if (entry.status !== "completed" && entry.status !== "cancelled") {
|
||||
active[threadId as ThreadId] = entry;
|
||||
}
|
||||
}
|
||||
return active;
|
||||
}
|
||||
|
||||
function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null {
|
||||
/** Load only completed/cancelled threads (history). */
|
||||
export function loadHistoryThreads(varStore: VarStore): ThreadsIndex {
|
||||
const all = loadAllThreads(varStore);
|
||||
const history: ThreadsIndex = {};
|
||||
for (const [threadId, entry] of Object.entries(all)) {
|
||||
if (entry.status === "completed" || entry.status === "cancelled") {
|
||||
history[threadId as ThreadId] = entry;
|
||||
}
|
||||
}
|
||||
return history;
|
||||
}
|
||||
|
||||
/** Complete a thread by marking it completed or cancelled. */
|
||||
export function completeThread(
|
||||
varStore: VarStore,
|
||||
threadId: ThreadId,
|
||||
reason: "completed" | "cancelled",
|
||||
): void {
|
||||
const entry = getThread(varStore, threadId);
|
||||
if (entry === null) {
|
||||
return;
|
||||
}
|
||||
const completed = {
|
||||
head: entry.head,
|
||||
status: reason,
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: Date.now(),
|
||||
} as ThreadIndexEntry;
|
||||
setThread(varStore, threadId, completed);
|
||||
}
|
||||
|
||||
|
||||
type LegacyHistoryEntry = {
|
||||
thread: ThreadId;
|
||||
workflow: CasRef;
|
||||
head: CasRef;
|
||||
completedAt: number;
|
||||
reason: "completed" | "cancelled" | null;
|
||||
};
|
||||
|
||||
function parseLegacyHistoryJsonlLine(trimmed: string): LegacyHistoryEntry | null {
|
||||
let raw: unknown;
|
||||
try {
|
||||
raw = JSON.parse(trimmed) as unknown;
|
||||
@@ -383,7 +434,7 @@ function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null {
|
||||
return null;
|
||||
}
|
||||
|
||||
/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */
|
||||
/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/thread/*` variables with status tags. */
|
||||
export async function migrateHistoryIfNeeded(
|
||||
storageRoot: string,
|
||||
varStore: VarStore,
|
||||
@@ -399,47 +450,43 @@ export async function migrateHistoryIfNeeded(
|
||||
if (trimmed === "") {
|
||||
continue;
|
||||
}
|
||||
const entry = parseHistoryJsonlLine(trimmed);
|
||||
const entry = parseLegacyHistoryJsonlLine(trimmed);
|
||||
if (entry !== null) {
|
||||
addHistoryEntry(varStore, entry);
|
||||
const status = entry.reason === "cancelled" ? "cancelled" : "completed";
|
||||
const threadEntry: ThreadIndexEntry = {
|
||||
head: entry.head,
|
||||
status: status as ThreadIndexEntry["status"],
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt: entry.completedAt,
|
||||
};
|
||||
setThread(varStore, entry.thread, threadEntry);
|
||||
}
|
||||
}
|
||||
|
||||
await rename(path, `${path}.migrated`);
|
||||
}
|
||||
|
||||
export function loadAllHistory(varStore: VarStore): ThreadHistoryLine[] {
|
||||
const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX });
|
||||
return vars.map((v) => ({
|
||||
thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId,
|
||||
workflow: v.tags.workflow ?? "",
|
||||
head: v.value as CasRef,
|
||||
completedAt: Number(v.tags.completedAt ?? "0"),
|
||||
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
|
||||
}));
|
||||
}
|
||||
/** Migrate `@uwf/history/*` variables to `@uwf/thread/*` with status tags. */
|
||||
export function migrateHistoryVarsToThreadVars(varStore: VarStore): void {
|
||||
const LEGACY_HISTORY_VAR_PREFIX = "@uwf/history/";
|
||||
const vars = varStore.list({ namePrefix: LEGACY_HISTORY_VAR_PREFIX });
|
||||
|
||||
export function findHistoryEntry(varStore: VarStore, threadId: ThreadId): ThreadHistoryLine | null {
|
||||
const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` });
|
||||
const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`);
|
||||
if (v === undefined) {
|
||||
return null;
|
||||
for (const v of vars) {
|
||||
const threadId = v.name.slice(LEGACY_HISTORY_VAR_PREFIX.length) as ThreadId;
|
||||
const reason = v.tags.reason;
|
||||
const status = reason === "cancelled" ? "cancelled" : "completed";
|
||||
const completedAt = Number(v.tags.completedAt ?? Date.now());
|
||||
|
||||
const threadEntry: ThreadIndexEntry = {
|
||||
head: v.value as CasRef,
|
||||
status: status as ThreadIndexEntry["status"],
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
completedAt,
|
||||
};
|
||||
|
||||
setThread(varStore, threadId, threadEntry);
|
||||
varStore.remove(v.name);
|
||||
}
|
||||
return {
|
||||
thread: threadId,
|
||||
workflow: v.tags.workflow ?? "",
|
||||
head: v.value as CasRef,
|
||||
completedAt: Number(v.tags.completedAt ?? "0"),
|
||||
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
|
||||
};
|
||||
}
|
||||
|
||||
export function addHistoryEntry(varStore: VarStore, entry: ThreadHistoryLine): void {
|
||||
varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, {
|
||||
tags: {
|
||||
workflow: entry.workflow,
|
||||
completedAt: String(entry.completedAt),
|
||||
reason: entry.reason ?? "completed",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -88,8 +88,10 @@ export async function getActiveThreadEntry(
|
||||
}
|
||||
return {
|
||||
head: v.value as CasRef,
|
||||
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
|
||||
suspendedRole: v.tags.suspendedRole ?? null,
|
||||
suspendMessage: v.tags.suspendMessage ?? null,
|
||||
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user