refactor: migrate registry from YAML to ocas variable (Phase 4a) #16

Merged
xiaomo merged 1 commits from refactor/registry-to-ocas-variable into main 2026-06-02 14:06:52 +00:00
7 changed files with 108 additions and 82 deletions
@@ -2,7 +2,14 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, rm } from "node:fs/promises"; import { mkdir, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { createUwfStore, getCasDir, getGlobalCasDir } from "../store.js"; import {
createUwfStore,
getCasDir,
getGlobalCasDir,
getRegistryPath,
loadWorkflowRegistry,
saveWorkflowRegistry,
} from "../store.js";
describe("Global CAS directory", () => { describe("Global CAS directory", () => {
let tmpDir: string; let tmpDir: string;
@@ -85,6 +92,7 @@ describe("Global CAS directory", () => {
expect(uwf.storageRoot).toBe(storageRoot); expect(uwf.storageRoot).toBe(storageRoot);
expect(uwf.store).toBeDefined(); expect(uwf.store).toBeDefined();
expect(uwf.schemas).toBeDefined(); expect(uwf.schemas).toBeDefined();
expect(uwf.varStore).toBeDefined();
// The global CAS directory should be created // The global CAS directory should be created
const { stat } = await import("node:fs/promises"); const { stat } = await import("node:fs/promises");
@@ -137,29 +145,50 @@ describe("Global CAS directory", () => {
expect(files.length).toBeGreaterThan(0); expect(files.length).toBeGreaterThan(0);
}); });
test("workflow metadata remains in storageRoot, not global CAS", async () => { test("workflow registry is stored in global CAS variable store", async () => {
const globalCasDir = join(tmpDir, "global-cas"); const globalCasDir = join(tmpDir, "global-cas");
process.env.UNCAGED_CAS_DIR = globalCasDir; process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage"); const storageRoot = join(tmpDir, "storage");
await mkdir(storageRoot, { recursive: true }); await mkdir(storageRoot, { recursive: true });
const _uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const hash = await uwf.store.put(uwf.schemas.text, "registry-test");
saveWorkflowRegistry(uwf.varStore, "test-workflow", hash);
// Write workflow registry file const registry = loadWorkflowRegistry(uwf.varStore);
const { saveWorkflowRegistry } = await import("../store.js"); expect(registry["test-workflow"]).toBe(hash);
await saveWorkflowRegistry(storageRoot, { "test-workflow": "ABC123" });
const { access } = await import("node:fs/promises");
await access(join(globalCasDir, "variables.db"));
// Verify registry is in storageRoot, not global CAS
const { readFile } = await import("node:fs/promises");
const registryPath = join(storageRoot, "workflows.yaml"); const registryPath = join(storageRoot, "workflows.yaml");
const content = await readFile(registryPath, "utf8"); await expect(access(registryPath)).rejects.toThrow();
expect(content).toContain("test-workflow"); });
expect(content).toContain("ABC123");
// Verify registry is NOT in global CAS directory test("migrates workflows.yaml to variable store and renames file", async () => {
const globalRegistryPath = join(globalCasDir, "workflows.yaml"); const globalCasDir = join(tmpDir, "global-cas");
await expect(readFile(globalRegistryPath, "utf8")).rejects.toThrow(); process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage-migrate");
await mkdir(storageRoot, { recursive: true });
const uwfSeed = await createUwfStore(storageRoot);
const hash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-workflow");
const registryPath = getRegistryPath(storageRoot);
const { writeFile, access, readFile } = await import("node:fs/promises");
await writeFile(registryPath, `migrated-workflow: ${hash}\n`, "utf8");
const uwf = await createUwfStore(storageRoot);
const registry = loadWorkflowRegistry(uwf.varStore);
expect(registry["migrated-workflow"]).toBe(hash);
await expect(access(registryPath)).rejects.toThrow();
const migratedPath = `${registryPath}.migrated`;
const migratedContent = await readFile(migratedPath, "utf8");
expect(migratedContent).toContain("migrated-workflow");
expect(migratedContent).toContain(hash);
}); });
test("thread metadata remains in storageRoot", async () => { test("thread metadata remains in storageRoot", async () => {
@@ -3,12 +3,11 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { bootstrap, putSchema } from "@ocas/core"; import { bootstrap, putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import type { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js"; import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { saveThreadsIndex } from "../store.js"; import { createUwfStore, saveThreadsIndex } from "../store.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -53,11 +52,8 @@ const DETAIL_SCHEMA = {
async function makeUwfStore(storageRoot: string): Promise<UwfStore> { async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas"); const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true }); await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir; process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir); return createUwfStore(storageRoot);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
} }
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) { async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
@@ -3,7 +3,7 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { bootstrap, putSchema } from "@ocas/core"; import { bootstrap, putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import type { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdStepList, cmdStepShow } from "../commands/step.js"; import { cmdStepList, cmdStepShow } from "../commands/step.js";
import { import {
@@ -11,9 +11,8 @@ import {
extractLastAssistantContent, extractLastAssistantContent,
THREAD_READ_DEFAULT_QUOTA, THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js"; } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { appendThreadHistory, saveThreadsIndex } from "../store.js"; import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -58,11 +57,8 @@ const DETAIL_SCHEMA = {
async function makeUwfStore(storageRoot: string): Promise<UwfStore> { async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas"); const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true }); await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir; process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir); return createUwfStore(storageRoot);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
} }
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) { async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
@@ -594,12 +590,7 @@ describe("cmdStepShow (process.exit tests - must be last)", () => {
}); });
test("before with unknown hash rejects", async () => { test("before with unknown hash rejects", async () => {
const _uwf = await makeUwfStore(tmpDir); const uwfStore = await makeUwfStore(tmpDir);
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const uwfStore: UwfStore = { storageRoot: tmpDir, store, schemas };
const workflowHash = await uwfStore.store.put(uwfStore.schemas.workflow, { const workflowHash = await uwfStore.store.put(uwfStore.schemas.workflow, {
name: "wf2", name: "wf2",
@@ -2,24 +2,19 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { createFsStore } from "@ocas/fs";
import type { CasRef, WorkflowPayload } from "@united-workforce/protocol"; import type { CasRef, WorkflowPayload } from "@united-workforce/protocol";
import { stringify } from "yaml"; import { stringify } from "yaml";
import { cmdThreadStart } from "../commands/thread.js"; import { cmdThreadStart } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { loadWorkflowRegistry, saveWorkflowRegistry } from "../store.js"; import { createUwfStore, saveWorkflowRegistry } from "../store.js";
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
async function makeUwfStore(storageRoot: string): Promise<UwfStore> { async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas"); const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true }); await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir; process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir); return createUwfStore(storageRoot);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
} }
function makeMinimalPayload(name: string, description: string): WorkflowPayload { function makeMinimalPayload(name: string, description: string): WorkflowPayload {
@@ -308,9 +303,7 @@ describe("Strategy 4: Global Registry Resolution", () => {
test("should resolve workflow from global registry when not found locally", async () => { test("should resolve workflow from global registry when not found locally", async () => {
const uwf = await makeUwfStore(storageRoot); const uwf = await makeUwfStore(storageRoot);
const hash = await storeWorkflow(uwf, "deploy-pipeline"); const hash = await storeWorkflow(uwf, "deploy-pipeline");
const registry = await loadWorkflowRegistry(storageRoot); saveWorkflowRegistry(uwf.varStore, "deploy-pipeline", hash);
registry["deploy-pipeline"] = hash;
await saveWorkflowRegistry(storageRoot, registry);
const isolatedRoot = join(tmpDir, "isolated"); const isolatedRoot = join(tmpDir, "isolated");
await mkdir(isolatedRoot, { recursive: true }); await mkdir(isolatedRoot, { recursive: true });
@@ -360,9 +353,7 @@ describe("Resolution Priority", () => {
// Setup: Register globally // Setup: Register globally
const globalHash = await storeWorkflow(uwf, "solve-issue"); const globalHash = await storeWorkflow(uwf, "solve-issue");
const registry = await loadWorkflowRegistry(storageRoot); saveWorkflowRegistry(uwf.varStore, "solve-issue", globalHash);
registry["solve-issue"] = globalHash;
await saveWorkflowRegistry(storageRoot, registry);
// Setup: Create local .workflow/ // Setup: Create local .workflow/
const workflowDir = join(projectRoot, ".workflow"); const workflowDir = join(projectRoot, ".workflow");
+2 -3
View File
@@ -362,7 +362,6 @@ async function materializeLocalWorkflow(uwf: UwfStore, filePath: string): Promis
async function resolveWorkflowCasRef( async function resolveWorkflowCasRef(
uwf: UwfStore, uwf: UwfStore,
storageRoot: string,
workflowId: string, workflowId: string,
projectRoot: string, projectRoot: string,
): Promise<CasRef> { ): Promise<CasRef> {
@@ -397,7 +396,7 @@ async function resolveWorkflowCasRef(
} }
// Strategy 4: Global registry fallback // Strategy 4: Global registry fallback
const registry = await loadWorkflowRegistry(storageRoot); const registry = loadWorkflowRegistry(uwf.varStore);
const hash = resolveWorkflowHash(registry, trimmed); const hash = resolveWorkflowHash(registry, trimmed);
if (!isCasRef(hash)) { if (!isCasRef(hash)) {
fail(`workflow not found: ${trimmed}`); fail(`workflow not found: ${trimmed}`);
@@ -449,7 +448,7 @@ export async function cmdThreadStart(
} }
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot); const workflowHash = await resolveWorkflowCasRef(uwf, workflowId, projectRoot);
const threadId = generateUlid(Date.now()) as ThreadId; const threadId = generateUlid(Date.now()) as ThreadId;
const plog = createProcessLogger({ const plog = createProcessLogger({
@@ -156,9 +156,7 @@ export async function cmdWorkflowAdd(
fail("stored workflow failed schema validation"); fail("stored workflow failed schema validation");
} }
const registry = await loadWorkflowRegistry(storageRoot); saveWorkflowRegistry(uwf.varStore, materialized.name, hash);
registry[materialized.name] = hash;
await saveWorkflowRegistry(storageRoot, registry);
return { name: materialized.name, hash }; return { name: materialized.name, hash };
} }
@@ -168,7 +166,7 @@ export async function cmdWorkflowShow(
id: string, id: string,
): Promise<WorkflowShowOutput> { ): Promise<WorkflowShowOutput> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const registry = await loadWorkflowRegistry(storageRoot); const registry = loadWorkflowRegistry(uwf.varStore);
const hash = resolveWorkflowHash(registry, id); const hash = resolveWorkflowHash(registry, id);
const node = uwf.store.get(hash); const node = uwf.store.get(hash);
@@ -193,8 +191,9 @@ export async function cmdWorkflowList(
storageRoot: string, storageRoot: string,
projectRoot: string, projectRoot: string,
): Promise<WorkflowListEntry[]> { ): Promise<WorkflowListEntry[]> {
const uwf = await createUwfStore(storageRoot);
const localEntries = await discoverProjectWorkflows(projectRoot); const localEntries = await discoverProjectWorkflows(projectRoot);
const registry = await loadWorkflowRegistry(storageRoot); const registry = loadWorkflowRegistry(uwf.varStore);
const result: WorkflowListEntry[] = []; const result: WorkflowListEntry[] = [];
const localNames = new Set<string>(); const localNames = new Set<string>();
+37 -16
View File
@@ -1,10 +1,11 @@
import type { Dirent } from "node:fs"; import type { Dirent } from "node:fs";
import { existsSync, symlinkSync } from "node:fs"; import { existsSync, symlinkSync } from "node:fs";
import { access, appendFile, mkdir, readdir, readFile, writeFile } from "node:fs/promises"; import { access, appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { BootstrapCapableStore, Hash } from "@ocas/core"; import type { BootstrapCapableStore, Hash } from "@ocas/core";
import { createVariableStore, type VariableStore } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { import type {
CasRef, CasRef,
@@ -24,6 +25,9 @@ import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js";
export type WorkflowRegistry = Record<string, CasRef>; export type WorkflowRegistry = Record<string, CasRef>;
/** Variable name prefix for workflow registry entries (`@uwf/registry/<name>`). */
export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
/** A workflow entry discovered from the project-local .workflows/ directory. */ /** A workflow entry discovered from the project-local .workflows/ directory. */
export type ProjectWorkflowEntry = { export type ProjectWorkflowEntry = {
/** Workflow name (from YAML `name` field, equals filename stem). */ /** Workflow name (from YAML `name` field, equals filename stem). */
@@ -200,6 +204,7 @@ export type UwfStore = {
storageRoot: string; storageRoot: string;
store: BootstrapCapableStore; store: BootstrapCapableStore;
schemas: UwfSchemaHashes; schemas: UwfSchemaHashes;
varStore: VariableStore;
}; };
export async function createUwfStore(storageRoot: string): Promise<UwfStore> { export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
@@ -207,12 +212,13 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
await mkdir(casDir, { recursive: true }); await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir); const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store); const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas }; const varStore = createVariableStore(join(casDir, "variables.db"), store);
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
return { storageRoot, store, schemas, varStore };
} }
export async function loadWorkflowRegistry(storageRoot: string): Promise<WorkflowRegistry> { async function loadWorkflowRegistryFromYaml(storageRoot: string): Promise<WorkflowRegistry> {
const path = getRegistryPath(storageRoot); const path = getRegistryPath(storageRoot);
try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
@@ -225,23 +231,38 @@ export async function loadWorkflowRegistry(storageRoot: string): Promise<Workflo
} }
} }
return registry; return registry;
} catch (e) {
const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") {
return {};
}
throw e;
}
} }
export async function saveWorkflowRegistry( /** One-time migration: `~/.uwf/workflows.yaml` → `@uwf/registry/*` variables. */
export async function migrateWorkflowRegistryIfNeeded(
storageRoot: string, storageRoot: string,
registry: WorkflowRegistry, varStore: VariableStore,
): Promise<void> { ): Promise<void> {
const path = getRegistryPath(storageRoot); const path = getRegistryPath(storageRoot);
await mkdir(storageRoot, { recursive: true }); if (!existsSync(path)) {
const text = stringify(registry, { indent: 2 }); return;
await writeFile(path, text, "utf8"); }
const registry = await loadWorkflowRegistryFromYaml(storageRoot);
for (const [name, hash] of Object.entries(registry)) {
saveWorkflowRegistry(varStore, name, hash);
}
await rename(path, `${path}.migrated`);
}
export function loadWorkflowRegistry(varStore: VariableStore): WorkflowRegistry {
const vars = varStore.list({ namePrefix: REGISTRY_VAR_PREFIX });
const registry: WorkflowRegistry = {};
for (const v of vars) {
const name = v.name.slice(REGISTRY_VAR_PREFIX.length);
registry[name] = v.value;
}
return registry;
}
export function saveWorkflowRegistry(varStore: VariableStore, name: string, hash: CasRef): void {
varStore.set(`${REGISTRY_VAR_PREFIX}${name}`, hash);
} }
export function resolveWorkflowHash(registry: WorkflowRegistry, id: string): CasRef { export function resolveWorkflowHash(registry: WorkflowRegistry, id: string): CasRef {