Merge pull request 'refactor: migrate threads index from YAML to ocas variable (Phase 4b)' (#17) from refactor/threads-to-ocas-variable into main
CI / check (push) Failing after 10m24s
CI / check (pull_request) Failing after 10m33s

This commit was merged in pull request #17.
This commit is contained in:
2026-06-02 14:28:17 +00:00
22 changed files with 419 additions and 283 deletions
@@ -7,7 +7,7 @@ import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol";
import { registerUwfSchemas } from "../schemas.js"; import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js";
// ── schemas ────────────────────────────────────────────────────────────────── // ── schemas ──────────────────────────────────────────────────────────────────
@@ -67,8 +67,10 @@ describe("C1: adapter JSON round-trip integration", () => {
prompt: "Test round-trip task", prompt: "Test round-trip task",
}); });
process.env.UNCAGED_CAS_DIR = casDir;
const threadId = "01ROUNDTRIPTEST0000000000" as ThreadId; const threadId = "01ROUNDTRIPTEST0000000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: startHash }); await seedThreads(tmpDir, { [threadId]: startHash });
// 2. Pre-create CAS nodes that the mock agent would produce // 2. Pre-create CAS nodes that the mock agent would produce
const outputHash = await store.put(outputSchemaHash, { const outputHash = await store.put(outputSchemaHash, {
@@ -9,8 +9,9 @@ import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread
import { import {
appendThreadHistory, appendThreadHistory,
createUwfStore, createUwfStore,
loadThreadsIndex, deleteThread,
saveThreadsIndex, loadAllThreads,
setThread,
} from "../store.js"; } from "../store.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
@@ -174,7 +175,7 @@ async function insertStepNode(
outputPayload: Record<string, unknown>, outputPayload: Record<string, unknown>,
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = loadAllThreads(uwf.varStore);
const headEntry = index[threadId]; const headEntry = index[threadId];
if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head; const head = headEntry.head;
@@ -195,13 +196,17 @@ async function insertStepNode(
start: startHash, start: startHash,
prev: isStart ? null : head, prev: isStart ? null : head,
role, role,
prompt: `Do ${role}`,
output: outputHash, output: outputHash,
detail: detailHash, detail: detailHash,
agent: "uwf-test",
edgePrompt: `Do ${role}`,
startedAtMs: Date.now(),
completedAtMs: Date.now() + 1,
cwd: storageRoot,
assembledPrompt: null,
})) as CasRef; })) as CasRef;
index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
await saveThreadsIndex(storageRoot, index);
} }
describe("currentRole field", () => { describe("currentRole field", () => {
@@ -280,10 +285,9 @@ describe("currentRole field", () => {
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const head = index[tid]!.head; const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
delete index[tid]; deleteThread(uwfForIndex.varStore, tid);
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: tid, thread: tid,
workflow, workflow,
@@ -309,10 +313,9 @@ describe("currentRole field", () => {
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const head = index[tid]!.head; const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
delete index[tid]; deleteThread(uwfForIndex.varStore, tid);
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: tid, thread: tid,
workflow, workflow,
@@ -371,10 +374,9 @@ describe("currentRole field", () => {
// completed thread // completed thread
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId; const compId = comp.thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const compHead = index[compId]!.head; const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head;
delete index[compId]; deleteThread(uwfForIndex.varStore, compId);
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: compId, thread: compId,
workflow: comp.workflow, workflow: comp.workflow,
@@ -2,9 +2,9 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises"; import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol";
import { resolveHeadHash } from "../commands/shared.js"; import { resolveHeadHash } from "../commands/shared.js";
import { appendThreadHistory, saveThreadsIndex } from "../store.js"; import { appendThreadHistory } from "../store.js";
let tmpDir: string; let tmpDir: string;
@@ -17,11 +17,12 @@ afterEach(async () => {
}); });
describe("resolveHeadHash", () => { describe("resolveHeadHash", () => {
test("returns head hash from threads.yaml for active thread", async () => { test("returns head hash from variable store for active thread", async () => {
const threadId = "01JTEST0000000000000000001" as ThreadId; const threadId = "01JTEST0000000000000000001" as ThreadId;
const headHash = "active_hash_123" as CasRef; const { createUwfStore, setThread } = await import("../store.js");
const uwf = await createUwfStore(tmpDir);
await saveThreadsIndex(tmpDir, { [threadId]: headHash }); const headHash = (await uwf.store.put(uwf.schemas.text, "active")) as CasRef;
setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash as CasRef));
const result = await resolveHeadHash(tmpDir, threadId); const result = await resolveHeadHash(tmpDir, threadId);
@@ -34,7 +35,6 @@ describe("resolveHeadHash", () => {
const workflowHash = "workflow_hash_789" as CasRef; const workflowHash = "workflow_hash_789" as CasRef;
// No entry in threads.yaml, only in history.jsonl // No entry in threads.yaml, only in history.jsonl
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
workflow: workflowHash, workflow: workflowHash,
@@ -54,12 +54,14 @@ describe("resolveHeadHash", () => {
test("prioritizes active thread over history when thread exists in both", async () => { test("prioritizes active thread over history when thread exists in both", async () => {
const threadId = "01JTEST0000000000000000004" as ThreadId; const threadId = "01JTEST0000000000000000004" as ThreadId;
const activeHash = "active_hash_v2" as CasRef;
const historicalHash = "historical_hash_v1" as CasRef; const historicalHash = "historical_hash_v1" as CasRef;
const workflowHash = "workflow_hash_xyz" as CasRef; const workflowHash = "workflow_hash_xyz" as CasRef;
// Thread exists in both locations (should not happen normally, but test the precedence) const { createUwfStore, setThread } = await import("../store.js");
await saveThreadsIndex(tmpDir, { [threadId]: activeHash }); const { createThreadIndexEntry } = await import("@united-workforce/protocol");
const uwf = await createUwfStore(tmpDir);
const activeHead = (await uwf.store.put(uwf.schemas.text, "active-v2")) as CasRef;
setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead));
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
workflow: workflowHash, workflow: workflowHash,
@@ -71,7 +73,7 @@ describe("resolveHeadHash", () => {
const result = await resolveHeadHash(tmpDir, threadId); const result = await resolveHeadHash(tmpDir, threadId);
// Should return the active head, not the historical one // Should return the active head, not the historical one
expect(result).toBe(activeHash); expect(result).toBe(activeHead);
}); });
test("finds thread from multiple history entries", async () => { test("finds thread from multiple history entries", async () => {
@@ -82,8 +84,6 @@ describe("resolveHeadHash", () => {
const hash2 = "hash_thread2" as CasRef; const hash2 = "hash_thread2" as CasRef;
const hash3 = "hash_thread3" as CasRef; const hash3 = "hash_thread3" as CasRef;
const workflowHash = "workflow_hash_abc" as CasRef; const workflowHash = "workflow_hash_abc" as CasRef;
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId1, thread: threadId1,
workflow: workflowHash, workflow: workflowHash,
@@ -9,7 +9,7 @@ import { STEP_NODE_SCHEMA } from "@united-workforce/protocol";
import { cmdStepList } from "../commands/step.js"; import { cmdStepList } from "../commands/step.js";
import { cmdThreadRead } from "../commands/thread.js"; import { cmdThreadRead } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js"; import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js";
// ── schemas ────────────────────────────────────────────────────────────────── // ── schemas ──────────────────────────────────────────────────────────────────
@@ -216,7 +216,7 @@ describe("step list timing", () => {
}); });
const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const result = await cmdStepList(tmpDir, threadId); const result = await cmdStepList(tmpDir, threadId);
const stepEntries = result.steps.slice(1); // skip start entry const stepEntries = result.steps.slice(1); // skip start entry
@@ -290,7 +290,7 @@ describe("thread read timing", () => {
}); });
const threadId = "01HX2Q3R4S5T6V7W8X9YZ3" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ3" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false);
expect(markdown).toContain("**Duration:** 42.0s"); expect(markdown).toContain("**Duration:** 42.0s");
@@ -356,7 +356,7 @@ describe("thread read timing", () => {
}); });
const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false);
expect(markdown).toContain("**Duration:** 350ms"); expect(markdown).toContain("**Duration:** 350ms");
@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, rm } from "node:fs/promises"; import { mkdir, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol";
import { import {
createUwfStore, createUwfStore,
getCasDir, getCasDir,
@@ -9,6 +10,7 @@ import {
getRegistryPath, getRegistryPath,
loadWorkflowRegistry, loadWorkflowRegistry,
saveWorkflowRegistry, saveWorkflowRegistry,
setThread,
} from "../store.js"; } from "../store.js";
describe("Global CAS directory", () => { describe("Global CAS directory", () => {
@@ -191,29 +193,49 @@ describe("Global CAS directory", () => {
expect(migratedContent).toContain(hash); expect(migratedContent).toContain(hash);
}); });
test("thread metadata remains in storageRoot", async () => { test("migrates threads.yaml to variable store and renames file", async () => {
const globalCasDir = join(tmpDir, "global-cas-threads");
process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage-threads-migrate");
await mkdir(storageRoot, { recursive: true });
const threadId = "01JTEST0000000000000000AB" as ThreadId;
const uwfSeed = await createUwfStore(storageRoot);
const headHash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-thread-head");
const { writeFile, access, readFile } = await import("node:fs/promises");
const threadsPath = join(storageRoot, "threads.yaml");
await writeFile(threadsPath, `${threadId}: ${headHash}\n`, "utf8");
const uwf = await createUwfStore(storageRoot);
const entry = uwf.varStore.list({ exactName: `@uwf/thread/${threadId}` })[0];
expect(entry?.value).toBe(headHash);
await expect(access(threadsPath)).rejects.toThrow();
const migratedContent = await readFile(`${threadsPath}.migrated`, "utf8");
expect(migratedContent).toContain(threadId);
expect(migratedContent).toContain(headHash);
});
test("thread metadata stored in ocas variable store", async () => {
const globalCasDir = join(tmpDir, "global-cas"); const globalCasDir = join(tmpDir, "global-cas");
process.env.UNCAGED_CAS_DIR = globalCasDir; process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage"); const storageRoot = join(tmpDir, "storage");
await mkdir(storageRoot, { recursive: true }); await mkdir(storageRoot, { recursive: true });
await createUwfStore(storageRoot); const threadId = "01JTEST000000000000000123" as ThreadId;
const uwfSeed = await createUwfStore(storageRoot);
const headHash = await uwfSeed.store.put(uwfSeed.schemas.text, "hash-456");
setThread(uwfSeed.varStore, threadId, createThreadIndexEntry(headHash));
// Write threads index const uwf = await createUwfStore(storageRoot);
const { saveThreadsIndex } = await import("../store.js"); const entry = uwf.varStore.list({ exactName: `@uwf/thread/${threadId}` })[0];
await saveThreadsIndex(storageRoot, { "thread-123": "hash-456" }); expect(entry?.value).toBe(headHash);
// Verify threads.yaml is in storageRoot, not global CAS
const { readFile } = await import("node:fs/promises"); const { readFile } = await import("node:fs/promises");
const threadsPath = join(storageRoot, "threads.yaml"); const threadsPath = join(storageRoot, "threads.yaml");
const content = await readFile(threadsPath, "utf8"); await expect(readFile(threadsPath, "utf8")).rejects.toThrow();
expect(content).toContain("thread-123");
expect(content).toContain("hash-456");
// Verify threads.yaml is NOT in global CAS directory
const globalThreadsPath = join(globalCasDir, "threads.yaml");
await expect(readFile(globalThreadsPath, "utf8")).rejects.toThrow();
}); });
test("history remains in storageRoot", async () => { test("history remains in storageRoot", async () => {
@@ -9,7 +9,13 @@ import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadList } from "../commands/thread.js"; import { cmdThreadList } from "../commands/thread.js";
import { parseTimeInput } from "../commands/thread-time-parser.js"; import { parseTimeInput } from "../commands/thread-time-parser.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js"; import {
appendThreadHistory,
createUwfStore,
deleteThread,
loadAllThreads,
setThread,
} from "../store.js";
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
@@ -50,10 +56,7 @@ async function createTestThread(
}; };
const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload);
// Load existing index and add new thread setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
const existingIndex = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
existingIndex[threadId] = createThreadIndexEntry(headHash);
await saveThreadsIndex(storageRoot, existingIndex);
return threadId; return threadId;
} }
@@ -73,9 +76,8 @@ async function completeThread(
workflowHash: CasRef, workflowHash: CasRef,
headHash: CasRef, headHash: CasRef,
) { ) {
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const uwfIdx = await createUwfStore(storageRoot);
delete index[threadId]; deleteThread(uwfIdx.varStore, threadId);
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: threadId, thread: threadId,
workflow: workflowHash, workflow: workflowHash,
@@ -110,7 +112,8 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const thread3Head = index[thread3]!.head; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -134,7 +137,8 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const thread3Head = index[thread3]!.head; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -158,7 +162,8 @@ describe("cmdThreadList status filter", () => {
const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const thread3Head = index[thread3]!.head; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -180,7 +185,8 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const thread3Head = index[thread3]!.head; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -352,7 +358,8 @@ describe("combined filters", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const thread3Head = index[thread3]!.head; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -376,7 +383,8 @@ describe("combined filters", () => {
for (let i = 9; i >= 0; i--) { for (let i = 9; i >= 0; i--) {
const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000);
threads.push(thread); threads.push(thread);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const headHash = index[thread]!.head; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
@@ -425,7 +433,8 @@ describe("combined filters", () => {
threads.push(thread); threads.push(thread);
if (i % 2 === 0) { if (i % 2 === 0) {
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const headHash = index[thread]!.head; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
@@ -483,13 +492,20 @@ describe("edge cases", () => {
const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const uwfIdx = await createUwfStore(tmpDir);
const index = loadAllThreads(uwfIdx.varStore);
const placeholderHead = (await uwfIdx.store.put(
uwfIdx.schemas.text,
"invalid-ulid-placeholder",
)) as CasRef;
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
head: "01J6HMVRNQKJV2", head: placeholderHead,
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
}; };
await saveThreadsIndex(tmpDir, index); for (const [tid, ent] of Object.entries(index)) {
setThread(uwfIdx.varStore, tid as ThreadId, ent);
}
const afterMs = Date.now() - 3000; const afterMs = Date.now() - 3000;
const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null); const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null);
@@ -4,7 +4,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol"; import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol";
import { cmdThreadStart } from "../commands/thread.js"; import { cmdThreadStart } from "../commands/thread.js";
import { createUwfStore } from "../store.js"; import { createUwfStore, getThread } from "../store.js";
describe("Thread and edge location integration", () => { describe("Thread and edge location integration", () => {
let tmpDir: string; let tmpDir: string;
@@ -79,8 +79,7 @@ graph:
// Verify StartNode has the cwd field // Verify StartNode has the cwd field
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const headHash = getThread(uwf.varStore, result.thread as ThreadId)!.head;
const headHash = index[result.thread as ThreadId]!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -174,8 +173,7 @@ graph:
const result = await cmdThreadStart(storageRoot, workflowPath, "test", tmpDir); const result = await cmdThreadStart(storageRoot, workflowPath, "test", tmpDir);
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const headHash = getThread(uwf.varStore, result.thread as ThreadId)!.head;
const headHash = index[result.thread as ThreadId]!.head;
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
const startPayload = startNode?.payload as StartNodePayload; const startPayload = startNode?.payload as StartNodePayload;
@@ -7,7 +7,7 @@ import { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdThreadRead } from "../commands/thread.js"; import { cmdThreadRead } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js"; import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -158,7 +158,7 @@ describe("thread read --quota flag", () => {
} }
const threadId = "01HX2Q3R4S5T6V7W8X9YZ0" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ0" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); await seedThreads(tmpDir, { [threadId]: steps[2] as CasRef });
// Set quota to 800 chars - should only fit most recent steps // Set quota to 800 chars - should only fit most recent steps
const markdown = await cmdThreadRead(tmpDir, threadId, 800, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 800, null, false);
@@ -266,7 +266,7 @@ describe("thread read --quota flag", () => {
}); });
const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step2Hash }); await seedThreads(tmpDir, { [threadId]: step2Hash });
// Set quota to 500 chars // Set quota to 500 chars
const markdown = await cmdThreadRead(tmpDir, threadId, 500, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 500, null, false);
@@ -354,7 +354,7 @@ describe("thread read --quota flag", () => {
} }
const threadId = "01HX2Q3R4S5T6V7W8X9YZ2" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ2" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[1] as CasRef }); await seedThreads(tmpDir, { [threadId]: steps[1] as CasRef });
// Set tight quota with --start flag // Set tight quota with --start flag
const markdown = await cmdThreadRead(tmpDir, threadId, 600, null, true); const markdown = await cmdThreadRead(tmpDir, threadId, 600, null, true);
@@ -432,7 +432,7 @@ describe("thread read --quota flag", () => {
}); });
const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
// Minimal quota // Minimal quota
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
@@ -512,7 +512,7 @@ describe("thread read --quota flag", () => {
} }
const threadId = "01HX2Q3R4S5T6V7W8X9YZ5" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ5" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); await seedThreads(tmpDir, { [threadId]: steps[2] as CasRef });
// Very large quota // Very large quota
const markdown = await cmdThreadRead(tmpDir, threadId, 1000000, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 1000000, null, false);
@@ -594,7 +594,7 @@ describe("thread read --quota flag", () => {
} }
const threadId = "01HX2Q3R4S5T6V7W8X9YZ6" as ThreadId; const threadId = "01HX2Q3R4S5T6V7W8X9YZ6" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[4] as CasRef }); await seedThreads(tmpDir, { [threadId]: steps[4] as CasRef });
// Use --before to limit to steps 1-2, then set quota that allows only 1 // Use --before to limit to steps 1-2, then set quota that allows only 1
const markdown = await cmdThreadRead(tmpDir, threadId, 500, steps[2] as CasRef, false); const markdown = await cmdThreadRead(tmpDir, threadId, 500, steps[2] as CasRef, false);
@@ -7,7 +7,8 @@ import type { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js"; import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { createUwfStore, saveThreadsIndex } from "../store.js"; import { createUwfStore } from "../store.js";
import { seedThreads } from "./thread-test-helpers.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -143,7 +144,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000001" as ThreadId; const threadId = "01JTEST0000000000000001" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -221,7 +222,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000002" as ThreadId; const threadId = "01JTEST0000000000000002" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -296,7 +297,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000003" as ThreadId; const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step2 }); await seedThreads(tmpDir, { [threadId]: step2 });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -351,7 +352,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000004" as ThreadId; const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -406,7 +407,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000005" as ThreadId; const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -461,7 +462,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000006" as ThreadId; const threadId = "01JTEST0000000000000006" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true);
@@ -560,7 +561,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000007" as ThreadId; const threadId = "01JTEST0000000000000007" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step3 }); await seedThreads(tmpDir, { [threadId]: step3 });
const markdown = await cmdThreadRead( const markdown = await cmdThreadRead(
tmpDir, tmpDir,
@@ -641,7 +642,7 @@ describe("thread read XML tag isolation", () => {
}); });
const threadId = "01JTEST0000000000000008" as ThreadId; const threadId = "01JTEST0000000000000008" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -701,7 +702,7 @@ describe("thread read XML tag isolation", () => {
} }
const threadId = "01JTEST0000000000000009" as ThreadId; const threadId = "01JTEST0000000000000009" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: steps[steps.length - 1]! }); await seedThreads(tmpDir, { [threadId]: steps[steps.length - 1]! });
// Use very small quota // Use very small quota
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
@@ -6,10 +6,9 @@ import { join } from "node:path";
import { putSchema } from "@ocas/core"; import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol";
import { parse } from "yaml";
import { cmdThreadShow } from "../commands/thread.js"; import { cmdThreadShow } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js"; import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
type: "object" as const, type: "object" as const,
@@ -89,7 +88,8 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
cwd: tmpDir, cwd: tmpDir,
}); });
await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); process.env.UNCAGED_CAS_DIR = casDir;
await seedThreads(tmpDir, { [THREAD_ID]: startHash });
const outputHash = await store.put(outputSchemaHash, { const outputHash = await store.put(outputSchemaHash, {
$status: "needs_input", $status: "needs_input",
@@ -114,7 +114,7 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
assembledPrompt: null, assembledPrompt: null,
}); });
await saveThreadsIndex(tmpDir, { await seedThreads(tmpDir, {
[THREAD_ID]: { [THREAD_ID]: {
head: stepHash, head: stepHash,
suspendedRole: "worker", suspendedRole: "worker",
@@ -241,7 +241,8 @@ describe("uwf thread resume", () => {
cwd: tmpDir, cwd: tmpDir,
}); });
await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); process.env.UNCAGED_CAS_DIR = casDir;
await seedThreads(tmpDir, { [THREAD_ID]: startHash });
const result = runUwf(["thread", "resume", THREAD_ID], casDir); const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0); expect(result.status).not.toBe(0);
@@ -264,9 +265,12 @@ describe("uwf thread resume", () => {
expect(cliOutput.suspendMessage).toBeNull(); expect(cliOutput.suspendMessage).toBeNull();
expect(cliOutput.done).toBe(false); expect(cliOutput.done).toBe(false);
const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); const { createUwfStore, getThread } = await import("../store.js");
const threadsIndex = parse(threadsYaml) as Record<string, unknown>; const uwf = await createUwfStore(tmpDir);
expect(threadsIndex[THREAD_ID]).toBe(cliOutput.head); const entry = getThread(uwf.varStore, THREAD_ID);
expect(entry?.head).toBe(cliOutput.head);
expect(entry?.suspendedRole).toBeNull();
expect(entry?.suspendMessage).toBeNull();
const showResult = await cmdThreadShow(tmpDir, THREAD_ID); const showResult = await cmdThreadShow(tmpDir, THREAD_ID);
expect(showResult.status).toBe("idle"); expect(showResult.status).toBe("idle");
@@ -338,10 +342,9 @@ describe("uwf thread resume", () => {
expect(firstResume.suspendedRole).toBe("worker"); expect(firstResume.suspendedRole).toBe("worker");
expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE); expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE);
const threadsAfterFirst = parse( const { createUwfStore, getThread } = await import("../store.js");
await readFile(join(tmpDir, "threads.yaml"), "utf8"), const uwfAfterFirst = await createUwfStore(tmpDir);
) as Record<string, unknown>; expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({
expect(threadsAfterFirst[THREAD_ID]).toEqual({
head: firstResume.head, head: firstResume.head,
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE, suspendMessage: SUSPEND_MESSAGE,
@@ -9,8 +9,9 @@ import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import { import {
appendThreadHistory, appendThreadHistory,
createUwfStore, createUwfStore,
loadThreadsIndex, deleteThread,
saveThreadsIndex, loadAllThreads,
setThread,
} from "../store.js"; } from "../store.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
@@ -89,7 +90,7 @@ async function insertStepNode(
outputPayload: Record<string, unknown>, outputPayload: Record<string, unknown>,
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = loadAllThreads(uwf.varStore);
const headEntry = index[threadId]; const headEntry = index[threadId];
if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head; const head = headEntry.head;
@@ -117,8 +118,7 @@ async function insertStepNode(
assembledPrompt: null, assembledPrompt: null,
})) as CasRef; })) as CasRef;
index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
await saveThreadsIndex(storageRoot, index);
} }
describe("thread show status field", () => { describe("thread show status field", () => {
@@ -203,15 +203,12 @@ describe("thread show status field", () => {
const workflow = startResult.workflow; const workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const index = loadAllThreads(uwfForIndex.varStore);
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'completed' deleteThread(uwfForIndex.varStore, threadId);
const { saveThreadsIndex } = await import("../store.js");
const newIndex = { ...index };
delete newIndex[threadId];
await saveThreadsIndex(storageRoot, newIndex);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: threadId, thread: threadId,
@@ -243,15 +240,12 @@ describe("thread show status field", () => {
const workflow = startResult.workflow; const workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const index = loadAllThreads(uwfForIndex.varStore);
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'cancelled' deleteThread(uwfForIndex.varStore, threadId);
const { saveThreadsIndex } = await import("../store.js");
const newIndex = { ...index };
delete newIndex[threadId];
await saveThreadsIndex(storageRoot, newIndex);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: threadId, thread: threadId,
@@ -283,15 +277,12 @@ describe("thread show status field", () => {
const workflow = startResult.workflow; const workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const index = loadAllThreads(uwfForIndex.varStore);
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason null (legacy format) deleteThread(uwfForIndex.varStore, threadId);
const { saveThreadsIndex } = await import("../store.js");
const newIndex = { ...index };
delete newIndex[threadId];
await saveThreadsIndex(storageRoot, newIndex);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: threadId, thread: threadId,
@@ -5,7 +5,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol"; import type { CasRef, StartNodePayload, ThreadId } from "@united-workforce/protocol";
import { cmdThreadStart } from "../commands/thread.js"; import { cmdThreadStart } from "../commands/thread.js";
import { createUwfStore, loadThreadsIndex } from "../store.js"; import { createUwfStore, getThread } from "../store.js";
describe("thread start --cwd CLI option", () => { describe("thread start --cwd CLI option", () => {
let tmpDir: string; let tmpDir: string;
@@ -74,8 +74,8 @@ graph:
async function getStartNodeCwd(threadId: string): Promise<string> { async function getStartNodeCwd(threadId: string): Promise<string> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const entry = getThread(uwf.varStore, threadId as ThreadId);
const headHash = index[threadId as ThreadId]!.head; const headHash = entry!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -1,15 +1,14 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { execFileSync } from "node:child_process"; import { execFileSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { putSchema } from "@ocas/core"; import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol"; import type { CasRef, StepNodePayload, ThreadId } from "@united-workforce/protocol";
import { parse } from "yaml";
import { cmdThreadShow } from "../commands/thread.js"; import { cmdThreadShow } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js"; import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js"; import { seedThreads } from "./thread-test-helpers.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
type: "object" as const, type: "object" as const,
@@ -76,7 +75,7 @@ describe("suspend step CAS chain and threads.yaml metadata", () => {
}); });
const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId; const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: startHash }); await seedThreads(tmpDir, { [threadId]: startHash });
const outputHash = await store.put(outputSchemaHash, { const outputHash = await store.put(outputSchemaHash, {
$status: "needs_input", $status: "needs_input",
@@ -155,9 +154,9 @@ describe("suspend step CAS chain and threads.yaml metadata", () => {
question: "Which API?", question: "Which API?",
}); });
const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); const { createUwfStore, getThread } = await import("../store.js");
const threadsIndex = parse(threadsYaml) as Record<string, unknown>; const uwf = await createUwfStore(tmpDir);
const threadEntry = threadsIndex[threadId]; const threadEntry = getThread(uwf.varStore, threadId);
expect(threadEntry).toEqual({ expect(threadEntry).toEqual({
head: stepHash, head: stepHash,
suspendedRole: "worker", suspendedRole: "worker",
@@ -6,7 +6,8 @@ import { putSchema } from "@ocas/core";
import type { ThreadId } from "@united-workforce/protocol"; import type { ThreadId } from "@united-workforce/protocol";
import { createThreadIndexEntry, markThreadSuspended } from "@united-workforce/protocol"; import { createThreadIndexEntry, markThreadSuspended } from "@united-workforce/protocol";
import { cmdThreadList, cmdThreadShow } from "../commands/thread.js"; import { cmdThreadList, cmdThreadShow } from "../commands/thread.js";
import { createUwfStore, saveThreadsIndex } from "../store.js"; import { createUwfStore } from "../store.js";
import { seedThreads } from "./thread-test-helpers.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
type: "object" as const, type: "object" as const,
@@ -109,7 +110,7 @@ describe("suspended thread display", () => {
}); });
const idleEntry = createThreadIndexEntry(idleStartHash); const idleEntry = createThreadIndexEntry(idleStartHash);
await saveThreadsIndex(tmpDir, { await seedThreads(tmpDir, {
[suspendedThreadId]: suspendedEntry, [suspendedThreadId]: suspendedEntry,
[idleThreadId]: idleEntry, [idleThreadId]: idleEntry,
}); });
@@ -205,7 +206,7 @@ describe("suspended thread display", () => {
"Need clarification: Which database to use?", "Need clarification: Which database to use?",
); );
await saveThreadsIndex(tmpDir, { [threadId]: suspendedEntry }); await seedThreads(tmpDir, { [threadId]: suspendedEntry });
// Test thread show // Test thread show
const showResult = await cmdThreadShow(tmpDir, threadId); const showResult = await cmdThreadShow(tmpDir, threadId);
@@ -258,7 +259,7 @@ describe("suspended thread display", () => {
}); });
const threadId = "01NORMALTHREAD000000000" as ThreadId; const threadId = "01NORMALTHREAD000000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: createThreadIndexEntry(startHash) }); await seedThreads(tmpDir, { [threadId]: createThreadIndexEntry(startHash) });
// Test thread show // Test thread show
const showResult = await cmdThreadShow(tmpDir, threadId); const showResult = await cmdThreadShow(tmpDir, threadId);
@@ -0,0 +1,37 @@
import type { CasRef, ThreadId, ThreadIndexEntry } from "@united-workforce/protocol";
import { createThreadIndexEntry } from "@united-workforce/protocol";
import { createUwfStore, setThread } from "../store.js";
async function ensureHeadInCas(
uwf: Awaited<ReturnType<typeof createUwfStore>>,
head: CasRef,
threadId: ThreadId,
): Promise<CasRef> {
if (uwf.store.get(head) !== null) {
return head;
}
return (await uwf.store.put(uwf.schemas.text, `thread-head:${threadId}:${head}`)) as CasRef;
}
export async function seedThread(
storageRoot: string,
threadId: ThreadId,
entry: ThreadIndexEntry | CasRef,
): Promise<void> {
const uwf = await createUwfStore(storageRoot);
const normalized = typeof entry === "string" ? createThreadIndexEntry(entry) : entry;
const head = await ensureHeadInCas(uwf, normalized.head, threadId);
setThread(uwf.varStore, threadId, { ...normalized, head });
}
export async function seedThreads(
storageRoot: string,
entries: Record<ThreadId, ThreadIndexEntry | CasRef>,
): Promise<void> {
const uwf = await createUwfStore(storageRoot);
for (const [threadId, entry] of Object.entries(entries)) {
const normalized = typeof entry === "string" ? createThreadIndexEntry(entry as CasRef) : entry;
const head = await ensureHeadInCas(uwf, normalized.head, threadId as ThreadId);
setThread(uwf.varStore, threadId as ThreadId, { ...normalized, head });
}
}
@@ -12,7 +12,8 @@ import {
THREAD_READ_DEFAULT_QUOTA, THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js"; } from "../commands/thread.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js"; import { appendThreadHistory, createUwfStore } from "../store.js";
import { seedThreads } from "./thread-test-helpers.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -258,7 +259,7 @@ describe("cmdThreadRead <output> section", () => {
}); });
const threadId = "01JTEST0000000000000000001" as ThreadId; const threadId = "01JTEST0000000000000000001" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -303,7 +304,7 @@ describe("cmdThreadRead <output> section", () => {
}); });
const threadId = "01JTEST0000000000000000002" as ThreadId; const threadId = "01JTEST0000000000000000002" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -438,7 +439,7 @@ describe("cmdThreadRead <prompt> deduplication", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["writer", "writer"]); const headHash = await makeThreadWithRoles(uwf, ["writer", "writer"]);
const threadId = "01JTEST0000000000000003" as ThreadId; const threadId = "01JTEST0000000000000003" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash }); await seedThreads(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length; const count = (markdown.match(/<prompt>/g) ?? []).length;
@@ -449,7 +450,7 @@ describe("cmdThreadRead <prompt> deduplication", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["planner", "coder"]); const headHash = await makeThreadWithRoles(uwf, ["planner", "coder"]);
const threadId = "01JTEST0000000000000004" as ThreadId; const threadId = "01JTEST0000000000000004" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash }); await seedThreads(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length; const count = (markdown.match(/<prompt>/g) ?? []).length;
@@ -460,7 +461,7 @@ describe("cmdThreadRead <prompt> deduplication", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const headHash = await makeThreadWithRoles(uwf, ["roleA", "roleB", "roleA"]); const headHash = await makeThreadWithRoles(uwf, ["roleA", "roleB", "roleA"]);
const threadId = "01JTEST0000000000000005" as ThreadId; const threadId = "01JTEST0000000000000005" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: headHash }); await seedThreads(tmpDir, { [threadId]: headHash });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
const count = (markdown.match(/<prompt>/g) ?? []).length; const count = (markdown.match(/<prompt>/g) ?? []).length;
@@ -528,7 +529,7 @@ describe("cmdThreadRead start section / before / quota", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST0000000000000006" as ThreadId; const threadId = "01JTEST0000000000000006" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true);
expect(markdown).toContain("# Thread"); expect(markdown).toContain("# Thread");
@@ -540,7 +541,7 @@ describe("cmdThreadRead start section / before / quota", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST0000000000000007" as ThreadId; const threadId = "01JTEST0000000000000007" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
// When before=null, the start section is always shown regardless of showStart // When before=null, the start section is always shown regardless of showStart
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -553,7 +554,7 @@ describe("cmdThreadRead start section / before / quota", () => {
const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]); const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]);
const [_hashA, hashB, hashC] = stepHashes as [CasRef, CasRef, CasRef]; const [_hashA, hashB, hashC] = stepHashes as [CasRef, CasRef, CasRef];
const threadId = "01JTEST0000000000000008" as ThreadId; const threadId = "01JTEST0000000000000008" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: hashC }); await seedThreads(tmpDir, { [threadId]: hashC });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, hashB, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, hashB, false);
expect(markdown).toContain("roleA"); expect(markdown).toContain("roleA");
@@ -565,7 +566,7 @@ describe("cmdThreadRead start section / before / quota", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]); const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]);
const threadId = "01JTEST000000000000000A" as ThreadId; const threadId = "01JTEST000000000000000A" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! }); await seedThreads(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
expect(markdown).toContain("earlier step"); expect(markdown).toContain("earlier step");
@@ -575,7 +576,7 @@ describe("cmdThreadRead start section / before / quota", () => {
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]); const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST000000000000000B" as ThreadId; const threadId = "01JTEST000000000000000B" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[0]! }); await seedThreads(tmpDir, { [threadId]: stepHashes[0]! });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).not.toContain("earlier step"); expect(markdown).not.toContain("earlier step");
@@ -627,7 +628,7 @@ describe("cmdStepShow (process.exit tests - must be last)", () => {
detail: null, detail: null,
agent: "uwf-test", agent: "uwf-test",
}); });
await saveThreadsIndex(tmpDir, { ["01JTEST000000000000000C" as ThreadId]: stepHash as CasRef }); await seedThreads(tmpDir, { ["01JTEST000000000000000C" as ThreadId]: stepHash as CasRef });
await expect( await expect(
cmdThreadRead( cmdThreadRead(
@@ -692,7 +693,7 @@ describe("cmdStepList with completed threads", () => {
}); });
const threadId = "01JTEST0000000000000000A1" as ThreadId; const threadId = "01JTEST0000000000000000A1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step3Hash }); await seedThreads(tmpDir, { [threadId]: step3Hash });
const result = await cmdStepList(tmpDir, threadId); const result = await cmdStepList(tmpDir, threadId);
@@ -744,7 +745,6 @@ describe("cmdStepList with completed threads", () => {
const threadId = "01JTEST0000000000000000A2" as ThreadId; const threadId = "01JTEST0000000000000000A2" as ThreadId;
// Thread is NOT in threads.yaml (simulating completed thread) // Thread is NOT in threads.yaml (simulating completed thread)
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl // But it IS in history.jsonl
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
@@ -812,7 +812,7 @@ describe("cmdStepShow with completed threads", () => {
}); });
const threadId = "01JTEST0000000000000000B1" as ThreadId; const threadId = "01JTEST0000000000000000B1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); await seedThreads(tmpDir, { [threadId]: stepHash });
const result = await cmdStepShow(tmpDir, stepHash); const result = await cmdStepShow(tmpDir, stepHash);
@@ -873,7 +873,6 @@ describe("cmdStepShow with completed threads", () => {
const threadId = "01JTEST0000000000000000B2" as ThreadId; const threadId = "01JTEST0000000000000000B2" as ThreadId;
// Thread is NOT in threads.yaml // Thread is NOT in threads.yaml
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl // But it IS in history.jsonl
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
@@ -937,7 +936,6 @@ describe("cmdThreadRead with completed threads", () => {
const threadId = "01JTEST0000000000000000C1" as ThreadId; const threadId = "01JTEST0000000000000000C1" as ThreadId;
// Thread is NOT in threads.yaml // Thread is NOT in threads.yaml
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl // But it IS in history.jsonl
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
@@ -1001,7 +999,6 @@ describe("cmdThreadRead with completed threads", () => {
}); });
const threadId = "01JTEST0000000000000000C2" as ThreadId; const threadId = "01JTEST0000000000000000C2" as ThreadId;
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, { await appendThreadHistory(tmpDir, {
thread: threadId, thread: threadId,
workflow: workflowHash, workflow: workflowHash,
+5 -5
View File
@@ -6,7 +6,7 @@ import type {
StepNodePayload, StepNodePayload,
ThreadId, ThreadId,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import { findThreadInHistory, loadThreadsIndex, type UwfStore } from "../store.js"; import { createUwfStore, findThreadInHistory, getThread, type UwfStore } from "../store.js";
type ChainState = { type ChainState = {
startHash: CasRef; startHash: CasRef;
@@ -202,10 +202,10 @@ function collectOrderedSteps(
} }
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const activeHead = index[threadId]?.head; const entry = getThread(uwf.varStore, threadId);
if (activeHead !== undefined) { if (entry !== null) {
return activeHead; return entry.head;
} }
const hist = await findThreadInHistory(storageRoot, threadId); const hist = await findThreadInHistory(storageRoot, threadId);
if (hist !== null) { if (hist !== null) {
+6 -4
View File
@@ -9,7 +9,7 @@ import type {
ThreadStepsOutput, ThreadStepsOutput,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import { generateUlid } from "@united-workforce/util"; import { generateUlid } from "@united-workforce/util";
import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js"; import { createUwfStore, setThread } from "../store.js";
import { import {
collectOrderedSteps, collectOrderedSteps,
expandDeep, expandDeep,
@@ -112,9 +112,11 @@ export async function cmdStepFork(
} }
const newThreadId = generateUlid(Date.now()) as ThreadId; const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot); setThread(uwf.varStore, newThreadId, {
index[newThreadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; head: stepHash,
await saveThreadsIndex(storageRoot, index); suspendedRole: null,
suspendMessage: null,
});
return { return {
thread: newThreadId, thread: newThreadId,
+50 -60
View File
@@ -1,6 +1,7 @@
import { execFileSync, spawn } from "node:child_process"; import { execFileSync, spawn } from "node:child_process";
import { access, readFile } from "node:fs/promises"; import { access, readFile } from "node:fs/promises";
import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path";
import type { VariableStore } from "@ocas/core";
import { validate } from "@ocas/core"; import { validate } from "@ocas/core";
import type { import type {
AgentAlias, AgentAlias,
@@ -39,12 +40,14 @@ import { evaluate, isSuspendResult } from "../moderator/index.js";
import { import {
appendThreadHistory, appendThreadHistory,
createUwfStore, createUwfStore,
deleteThread,
findThreadInHistory, findThreadInHistory,
getThread,
loadAllThreads,
loadThreadHistory, loadThreadHistory,
loadThreadsIndex,
loadWorkflowRegistry, loadWorkflowRegistry,
resolveWorkflowHash, resolveWorkflowHash,
saveThreadsIndex, setThread,
type ThreadHistoryLine, type ThreadHistoryLine,
type UwfStore, type UwfStore,
} from "../store.js"; } from "../store.js";
@@ -136,7 +139,7 @@ function resolveSuspendFieldsForShow(
} }
async function ensureThreadSuspendMetadata( async function ensureThreadSuspendMetadata(
storageRoot: string, varStore: VariableStore,
threadId: ThreadId, threadId: ThreadId,
entry: ThreadIndexEntry, entry: ThreadIndexEntry,
suspendedRole: string, suspendedRole: string,
@@ -146,9 +149,7 @@ async function ensureThreadSuspendMetadata(
return entry; return entry;
} }
const updated = markThreadSuspended(entry, suspendedRole, suspendMessage); const updated = markThreadSuspended(entry, suspendedRole, suspendMessage);
const index = await loadThreadsIndex(storageRoot); setThread(varStore, threadId, updated);
index[threadId] = updated;
await saveThreadsIndex(storageRoot, index);
return updated; return updated;
} }
@@ -467,9 +468,7 @@ export async function cmdThreadStart(
fail("stored StartNode failed schema validation"); fail("stored StartNode failed schema validation");
} }
const index = await loadThreadsIndex(storageRoot); setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
index[threadId] = createThreadIndexEntry(headHash);
await saveThreadsIndex(storageRoot, index);
plog.log( plog.log(
PL_THREAD_START, PL_THREAD_START,
@@ -484,11 +483,10 @@ export async function cmdThreadShow(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
): Promise<ThreadShowOutput> { ): Promise<ThreadShowOutput> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry !== undefined) { if (entry !== null) {
const activeHead = entry.head; const activeHead = entry.head;
const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, activeHead); const workflow = resolveWorkflowFromHead(uwf, activeHead);
if (workflow === null) { if (workflow === null) {
fail(`failed to resolve workflow from head: ${activeHead}`); fail(`failed to resolve workflow from head: ${activeHead}`);
@@ -661,7 +659,7 @@ export async function cmdThreadList(
take: number | null, take: number | null,
): Promise<ThreadListItemWithStatus[]> { ): Promise<ThreadListItemWithStatus[]> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = loadAllThreads(uwf.varStore);
// Collect active threads // Collect active threads
let items = await collectActiveThreads(storageRoot, uwf, index); let items = await collectActiveThreads(storageRoot, uwf, index);
@@ -1038,14 +1036,13 @@ function spawnAgent(
} }
async function archiveThread( async function archiveThread(
uwf: UwfStore,
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
workflow: CasRef, workflow: CasRef,
head: CasRef, head: CasRef,
): Promise<void> { ): Promise<void> {
const index = await loadThreadsIndex(storageRoot); deleteThread(uwf.varStore, threadId);
delete index[threadId];
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
thread: threadId, thread: threadId,
workflow, workflow,
@@ -1066,13 +1063,12 @@ export async function cmdThreadResume(
fail(`thread already executing in background (PID: ${runningMarker.pid})`); fail(`thread already executing in background (PID: ${runningMarker.pid})`);
} }
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry === undefined) { if (entry === null) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const uwf = await createUwfStore(storageRoot);
const headHash = entry.head; const headHash = entry.head;
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow; const workflowHash = chain.start.workflow;
@@ -1179,12 +1175,11 @@ async function resolveActiveThreadWorkflowHash(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
): Promise<CasRef> { ): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry === undefined) { if (entry === null) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, entry.head); const chain = walkChain(uwf, entry.head);
return chain.start.workflow; return chain.start.workflow;
} }
@@ -1197,16 +1192,13 @@ async function cmdThreadStepBackground(
plog: ProcessLogger, plog: ProcessLogger,
workflowHash: CasRef, workflowHash: CasRef,
): Promise<StepOutput[]> { ): Promise<StepOutput[]> {
// Get current head to return to caller const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const entry = getThread(uwf.varStore, threadId);
const entry = index[threadId]; if (entry === null) {
if (entry === undefined) {
failStep(plog, `thread not active: ${threadId}`); failStep(plog, `thread not active: ${threadId}`);
} }
const headHash = entry.head; const headHash = entry.head;
const uwf = await createUwfStore(storageRoot);
// Spawn detached background process // Spawn detached background process
const scriptPath = process.argv[1]; const scriptPath = process.argv[1];
if (scriptPath === undefined) { if (scriptPath === undefined) {
@@ -1292,7 +1284,7 @@ async function resolveModeratorStepTarget(
if (isSuspendResult(nextResult.value)) { if (isSuspendResult(nextResult.value)) {
await ensureThreadSuspendMetadata( await ensureThreadSuspendMetadata(
storageRoot, uwf.varStore,
threadId, threadId,
entry, entry,
nextResult.value.suspendedRole, nextResult.value.suspendedRole,
@@ -1310,7 +1302,7 @@ async function resolveModeratorStepTarget(
if (nextResult.value.role === END_ROLE) { if (nextResult.value.role === END_ROLE) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null); plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null);
await archiveThread(storageRoot, threadId, workflowHash, headHash); await archiveThread(uwf, storageRoot, threadId, workflowHash, headHash);
return { return {
workflow: workflowHash, workflow: workflowHash,
thread: threadId, thread: threadId,
@@ -1340,10 +1332,8 @@ async function finalizeAgentStep(
uwfAfter: UwfStore, uwfAfter: UwfStore,
plog: ProcessLogger, plog: ProcessLogger,
): Promise<StepOutput> { ): Promise<StepOutput> {
const freshIndex = await loadThreadsIndex(storageRoot); const priorEntry = getThread(uwfAfter.varStore, threadId) ?? createThreadIndexEntry(newHead);
const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead); setThread(uwfAfter.varStore, threadId, updateThreadHead(priorEntry, newHead));
freshIndex[threadId] = updateThreadHead(priorEntry, newHead);
await saveThreadsIndex(storageRoot, freshIndex);
const chainAfter = walkChain(uwfAfter, newHead); const chainAfter = walkChain(uwfAfter, newHead);
const { lastRole: lastRoleAfter, lastOutput: lastOutputAfter } = resolveEvaluateArgs( const { lastRole: lastRoleAfter, lastOutput: lastOutputAfter } = resolveEvaluateArgs(
@@ -1356,12 +1346,15 @@ async function finalizeAgentStep(
} }
if (isSuspendResult(afterResult.value)) { if (isSuspendResult(afterResult.value)) {
freshIndex[threadId] = markThreadSuspended( setThread(
freshIndex[threadId] ?? createThreadIndexEntry(newHead), uwfAfter.varStore,
afterResult.value.suspendedRole, threadId,
afterResult.value.prompt, markThreadSuspended(
getThread(uwfAfter.varStore, threadId) ?? createThreadIndexEntry(newHead),
afterResult.value.suspendedRole,
afterResult.value.prompt,
),
); );
await saveThreadsIndex(storageRoot, freshIndex);
return buildStepOutputFromEvaluation( return buildStepOutputFromEvaluation(
workflowHash, workflowHash,
threadId, threadId,
@@ -1375,7 +1368,7 @@ async function finalizeAgentStep(
const done = afterResult.value.role === END_ROLE; const done = afterResult.value.role === END_ROLE;
if (done) { if (done) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null); plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null);
await archiveThread(storageRoot, threadId, workflowHash, newHead); await archiveThread(uwfAfter, storageRoot, threadId, workflowHash, newHead);
} }
const status: ThreadStatus = done ? "completed" : "idle"; const status: ThreadStatus = done ? "completed" : "idle";
@@ -1401,14 +1394,13 @@ async function cmdThreadStepOnce(
plog: ProcessLogger, plog: ProcessLogger,
resume: ResumeStepConfig | null = null, resume: ResumeStepConfig | null = null,
): Promise<StepOutput> { ): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry === undefined) { if (entry === null) {
failStep(plog, `thread not active: ${threadId}`); failStep(plog, `thread not active: ${threadId}`);
} }
const headHash = entry.head; const headHash = entry.head;
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow; const workflowHash = chain.start.workflow;
const workflow = loadWorkflowPayload(uwf, workflowHash); const workflow = loadWorkflowPayload(uwf, workflowHash);
@@ -1459,10 +1451,10 @@ async function cmdThreadStepOnce(
} }
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const activeHead = index[threadId]?.head; const entry = getThread(uwf.varStore, threadId);
if (activeHead !== undefined) { if (entry !== null) {
return activeHead; return entry.head;
} }
const hist = await findThreadInHistory(storageRoot, threadId); const hist = await findThreadInHistory(storageRoot, threadId);
if (hist !== null) { if (hist !== null) {
@@ -1512,9 +1504,9 @@ export type CancelOutput = {
* Stop background execution of a thread (but keep thread active) * Stop background execution of a thread (but keep thread active)
*/ */
export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> { export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry === undefined) { if (entry === null) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
@@ -1542,9 +1534,9 @@ export async function cmdThreadCancel(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
): Promise<CancelOutput> { ): Promise<CancelOutput> {
const index = await loadThreadsIndex(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = index[threadId]; const entry = getThread(uwf.varStore, threadId);
if (entry === undefined) { if (entry === null) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const head = entry.head; const head = entry.head;
@@ -1560,14 +1552,12 @@ export async function cmdThreadCancel(
await deleteMarker(storageRoot, threadId); await deleteMarker(storageRoot, threadId);
} }
const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, head); const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) { if (workflow === null) {
fail(`failed to resolve workflow from head: ${head}`); fail(`failed to resolve workflow from head: ${head}`);
} }
delete index[threadId]; deleteThread(uwf.varStore, threadId);
await saveThreadsIndex(storageRoot, index);
const historyEntry: ThreadHistoryLine = { const historyEntry: ThreadHistoryLine = {
thread: threadId, thread: threadId,
+78 -25
View File
@@ -1,6 +1,6 @@
import type { Dirent } from "node:fs"; import type { Dirent } from "node:fs";
import { existsSync, symlinkSync } from "node:fs"; import { existsSync, symlinkSync } from "node:fs";
import { access, appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises"; import { access, appendFile, mkdir, readdir, readFile, rename } from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
@@ -14,12 +14,8 @@ import type {
ThreadListItem, ThreadListItem,
ThreadsIndex, ThreadsIndex,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import { import { parseThreadsIndex } from "@united-workforce/protocol";
createThreadIndexEntry, import { parse } from "yaml";
parseThreadsIndex,
serializeThreadsIndex,
} from "@united-workforce/protocol";
import { parse, stringify } from "yaml";
import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js";
@@ -28,6 +24,9 @@ export type WorkflowRegistry = Record<string, CasRef>;
/** Variable name prefix for workflow registry entries (`@uwf/registry/<name>`). */ /** Variable name prefix for workflow registry entries (`@uwf/registry/<name>`). */
export const REGISTRY_VAR_PREFIX = "@uwf/registry/"; export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
/** Variable name prefix for active thread entries (`@uwf/thread/<thread-id>`). */
export const THREAD_VAR_PREFIX = "@uwf/thread/";
/** A workflow entry discovered from the project-local .workflows/ directory. */ /** A workflow entry discovered from the project-local .workflows/ directory. */
export type ProjectWorkflowEntry = { export type ProjectWorkflowEntry = {
/** Workflow name (from YAML `name` field, equals filename stem). */ /** Workflow name (from YAML `name` field, equals filename stem). */
@@ -214,6 +213,7 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
const schemas = await registerUwfSchemas(store); const schemas = await registerUwfSchemas(store);
const varStore = createVariableStore(join(casDir, "variables.db"), store); const varStore = createVariableStore(join(casDir, "variables.db"), store);
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
await migrateThreadsIndexIfNeeded(storageRoot, varStore);
return { storageRoot, store, schemas, varStore }; return { storageRoot, store, schemas, varStore };
} }
@@ -294,7 +294,7 @@ export function findRegistryName(registry: WorkflowRegistry, hash: Hash): string
return null; return null;
} }
export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsIndex> { async function loadThreadsIndexFromYaml(storageRoot: string): Promise<ThreadsIndex> {
const path = getThreadsPath(storageRoot); const path = getThreadsPath(storageRoot);
try { try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
@@ -309,26 +309,79 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
} }
} }
/** Accept legacy CasRef values for test convenience. */ /** One-time migration: `~/.uwf/threads.yaml` → `@uwf/thread/*` variables. */
export type ThreadsIndexInput = Record<ThreadId, ThreadIndexEntry | CasRef>; export async function migrateThreadsIndexIfNeeded(
function normalizeThreadsIndexInput(index: ThreadsIndexInput): ThreadsIndex {
const normalized: ThreadsIndex = {};
for (const [threadId, value] of Object.entries(index)) {
normalized[threadId as ThreadId] =
typeof value === "string" ? createThreadIndexEntry(value as CasRef) : value;
}
return normalized;
}
export async function saveThreadsIndex(
storageRoot: string, storageRoot: string,
index: ThreadsIndexInput, varStore: VariableStore,
): Promise<void> { ): Promise<void> {
const path = getThreadsPath(storageRoot); const path = getThreadsPath(storageRoot);
await mkdir(storageRoot, { recursive: true }); if (!existsSync(path)) {
const text = stringify(serializeThreadsIndex(normalizeThreadsIndexInput(index)), { indent: 2 }); return;
await writeFile(path, text, "utf8"); }
const index = await loadThreadsIndexFromYaml(storageRoot);
for (const [threadId, entry] of Object.entries(index)) {
setThread(varStore, threadId as ThreadId, entry);
}
await rename(path, `${path}.migrated`);
}
function threadVarName(threadId: ThreadId): string {
return `${THREAD_VAR_PREFIX}${threadId}`;
}
function entryFromVariable(v: { value: string; tags: Record<string, string> }): ThreadIndexEntry {
return {
head: v.value as CasRef,
suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null,
};
}
/** Load all active threads (equivalent to legacy `loadThreadsIndex`). */
export function loadAllThreads(varStore: VariableStore): ThreadsIndex {
const vars = varStore.list({ namePrefix: THREAD_VAR_PREFIX });
const index: ThreadsIndex = {};
for (const v of vars) {
const threadId = v.name.slice(THREAD_VAR_PREFIX.length) as ThreadId;
index[threadId] = entryFromVariable(v);
}
return index;
}
/** Get a single active thread entry, or null if not found. */
export function getThread(varStore: VariableStore, threadId: ThreadId): ThreadIndexEntry | null {
const vars = varStore.list({ exactName: threadVarName(threadId) });
const v = vars[0];
if (v === undefined) {
return null;
}
return entryFromVariable(v);
}
/** Set or update a single active thread entry. */
export function setThread(
varStore: VariableStore,
threadId: ThreadId,
entry: ThreadIndexEntry,
): void {
const name = threadVarName(threadId);
// Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first.
varStore.remove(name);
const tags: Record<string, string> = {};
if (entry.suspendedRole !== null) {
tags.suspendedRole = entry.suspendedRole;
}
if (entry.suspendMessage !== null) {
tags.suspendMessage = entry.suspendMessage;
}
varStore.set(name, entry.head, { tags });
}
/** Remove an active thread entry (on complete/cancel). */
export function deleteThread(varStore: VariableStore, threadId: ThreadId): void {
varStore.remove(threadVarName(threadId));
} }
export async function loadThreadHistory(storageRoot: string): Promise<ThreadHistoryLine[]> { export async function loadThreadHistory(storageRoot: string): Promise<ThreadHistoryLine[]> {
+10 -12
View File
@@ -7,7 +7,7 @@ import type {
ThreadId, ThreadId,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import type { AgentStore } from "./storage.js"; import type { AgentStore } from "./storage.js";
import { createAgentStore, loadThreadsIndex, resolveStorageRoot } from "./storage.js"; import { createAgentStore, getActiveThreadEntry, resolveStorageRoot } from "./storage.js";
import type { AgentContext } from "./types.js"; import type { AgentContext } from "./types.js";
type ChainState = { type ChainState = {
@@ -162,13 +162,12 @@ export async function buildContext(
const agentStore = await createAgentStore(storageRoot); const agentStore = await createAgentStore(storageRoot);
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const entry = await getActiveThreadEntry(storageRoot, threadId);
const headHash = index[threadId]?.head; if (entry === null) {
if (headHash === undefined) { fail(`thread not found in active thread index: ${threadId}`);
fail(`thread not found in threads.yaml: ${threadId}`);
} }
const chain = walkChain(store, schemas, headHash); const chain = walkChain(store, schemas, entry.head);
const workflow = await loadWorkflow(store, schemas, chain.start.workflow); const workflow = await loadWorkflow(store, schemas, chain.start.workflow);
const roleDef = workflow.roles[role]; const roleDef = workflow.roles[role];
if (roleDef === undefined) { if (roleDef === undefined) {
@@ -211,13 +210,12 @@ export async function buildContextWithMeta(
const agentStore = await createAgentStore(storageRoot); const agentStore = await createAgentStore(storageRoot);
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const entry = await getActiveThreadEntry(storageRoot, threadId);
const headHash = index[threadId]?.head; if (entry === null) {
if (headHash === undefined) { fail(`thread not found in active thread index: ${threadId}`);
fail(`thread not found in threads.yaml: ${threadId}`);
} }
const chain = walkChain(store, schemas, headHash); const chain = walkChain(store, schemas, entry.head);
const workflow = await loadWorkflow(store, schemas, chain.start.workflow); const workflow = await loadWorkflow(store, schemas, chain.start.workflow);
const roleDef = workflow.roles[role]; const roleDef = workflow.roles[role];
if (roleDef === undefined) { if (roleDef === undefined) {
@@ -237,6 +235,6 @@ export async function buildContextWithMeta(
outputFormatInstruction: "", outputFormatInstruction: "",
edgePrompt, edgePrompt,
isFirstVisit, isFirstVisit,
meta: { storageRoot, store, schemas, headHash, chain }, meta: { storageRoot, store, schemas, headHash: entry.head, chain },
}; };
} }
+44 -20
View File
@@ -2,21 +2,22 @@ import { readFile } from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { Store } from "@ocas/core"; import { createVariableStore, type Store } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { import type {
AgentAlias, AgentAlias,
AgentConfig, AgentConfig,
CasRef,
ModelAlias, ModelAlias,
ModelConfig, ModelConfig,
ProviderAlias, ProviderAlias,
ProviderConfig, ProviderConfig,
Scenario, Scenario,
ThreadsIndex, ThreadId,
ThreadIndexEntry,
WorkflowConfig, WorkflowConfig,
WorkflowName, WorkflowName,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import { parseThreadsIndex } from "@united-workforce/protocol";
import { parse } from "yaml"; import { parse } from "yaml";
import { registerAgentSchemas } from "./schemas.js"; import { registerAgentSchemas } from "./schemas.js";
@@ -58,8 +59,46 @@ export function getEnvPath(storageRoot: string): string {
return join(storageRoot, ".env"); return join(storageRoot, ".env");
} }
export function getThreadsPath(storageRoot: string): string { const THREAD_VAR_PREFIX = "@uwf/thread/";
return join(storageRoot, "threads.yaml");
/**
* Global CAS directory (same as uwf CLI).
* Priority: `OCAS_DIR` → `UNCAGED_CAS_DIR` (legacy) → default ~/.ocas
*/
export function getGlobalCasDir(): string {
const primary = process.env.OCAS_DIR;
if (primary !== undefined && primary !== "") {
return primary;
}
const legacy = process.env.UNCAGED_CAS_DIR;
if (legacy !== undefined && legacy !== "") {
return legacy;
}
return join(homedir(), ".ocas");
}
function threadVarName(threadId: ThreadId): string {
return `${THREAD_VAR_PREFIX}${threadId}`;
}
/** Read active thread head + suspend metadata from ocas variable store. */
export async function getActiveThreadEntry(
_storageRoot: string,
threadId: ThreadId,
): Promise<ThreadIndexEntry | null> {
const casDir = getGlobalCasDir();
const store = createFsStore(casDir);
const varStore = createVariableStore(join(casDir, "variables.db"), store);
const vars = varStore.list({ exactName: threadVarName(threadId) });
const v = vars[0];
if (v === undefined) {
return null;
}
return {
head: v.value as CasRef,
suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null,
};
} }
export type AgentStore = { export type AgentStore = {
@@ -205,18 +244,3 @@ export async function loadWorkflowConfig(storageRoot: string): Promise<WorkflowC
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
return normalizeWorkflowConfig(raw); return normalizeWorkflowConfig(raw);
} }
export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsIndex> {
const path = getThreadsPath(storageRoot);
try {
const text = await readFile(path, "utf8");
const raw = parse(text) as unknown;
return parseThreadsIndex(raw);
} catch (e) {
const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") {
return {};
}
throw e;
}
}