refactor: migrate workflow registry from YAML to ocas variable store (Phase 4a)
CI / check (pull_request) Failing after 8m40s

- UwfStore gains varStore: VariableStore (SQLite at ~/.ocas/variables.db)
- loadWorkflowRegistry reads from @uwf/registry/* variables
- saveWorkflowRegistry writes individual @uwf/registry/<name> variables
- Auto-migration: workflows.yaml → variables on first run, renames to .migrated
- Updated callers in workflow.ts and thread.ts
- Tests updated and passing

Ref #11
This commit is contained in:
2026-06-02 21:58:58 +08:00
parent 323bbf4d13
commit 8052473728
7 changed files with 108 additions and 82 deletions
@@ -2,7 +2,14 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createUwfStore, getCasDir, getGlobalCasDir } from "../store.js";
import {
createUwfStore,
getCasDir,
getGlobalCasDir,
getRegistryPath,
loadWorkflowRegistry,
saveWorkflowRegistry,
} from "../store.js";
describe("Global CAS directory", () => {
let tmpDir: string;
@@ -85,6 +92,7 @@ describe("Global CAS directory", () => {
expect(uwf.storageRoot).toBe(storageRoot);
expect(uwf.store).toBeDefined();
expect(uwf.schemas).toBeDefined();
expect(uwf.varStore).toBeDefined();
// The global CAS directory should be created
const { stat } = await import("node:fs/promises");
@@ -137,29 +145,50 @@ describe("Global CAS directory", () => {
expect(files.length).toBeGreaterThan(0);
});
test("workflow metadata remains in storageRoot, not global CAS", async () => {
test("workflow registry is stored in global CAS variable store", async () => {
const globalCasDir = join(tmpDir, "global-cas");
process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage");
await mkdir(storageRoot, { recursive: true });
const _uwf = await createUwfStore(storageRoot);
const uwf = await createUwfStore(storageRoot);
const hash = await uwf.store.put(uwf.schemas.text, "registry-test");
saveWorkflowRegistry(uwf.varStore, "test-workflow", hash);
// Write workflow registry file
const { saveWorkflowRegistry } = await import("../store.js");
await saveWorkflowRegistry(storageRoot, { "test-workflow": "ABC123" });
const registry = loadWorkflowRegistry(uwf.varStore);
expect(registry["test-workflow"]).toBe(hash);
const { access } = await import("node:fs/promises");
await access(join(globalCasDir, "variables.db"));
// Verify registry is in storageRoot, not global CAS
const { readFile } = await import("node:fs/promises");
const registryPath = join(storageRoot, "workflows.yaml");
const content = await readFile(registryPath, "utf8");
expect(content).toContain("test-workflow");
expect(content).toContain("ABC123");
await expect(access(registryPath)).rejects.toThrow();
});
// Verify registry is NOT in global CAS directory
const globalRegistryPath = join(globalCasDir, "workflows.yaml");
await expect(readFile(globalRegistryPath, "utf8")).rejects.toThrow();
test("migrates workflows.yaml to variable store and renames file", async () => {
const globalCasDir = join(tmpDir, "global-cas");
process.env.UNCAGED_CAS_DIR = globalCasDir;
const storageRoot = join(tmpDir, "storage-migrate");
await mkdir(storageRoot, { recursive: true });
const uwfSeed = await createUwfStore(storageRoot);
const hash = await uwfSeed.store.put(uwfSeed.schemas.text, "migrated-workflow");
const registryPath = getRegistryPath(storageRoot);
const { writeFile, access, readFile } = await import("node:fs/promises");
await writeFile(registryPath, `migrated-workflow: ${hash}\n`, "utf8");
const uwf = await createUwfStore(storageRoot);
const registry = loadWorkflowRegistry(uwf.varStore);
expect(registry["migrated-workflow"]).toBe(hash);
await expect(access(registryPath)).rejects.toThrow();
const migratedPath = `${registryPath}.migrated`;
const migratedContent = await readFile(migratedPath, "utf8");
expect(migratedContent).toContain("migrated-workflow");
expect(migratedContent).toContain(hash);
});
test("thread metadata remains in storageRoot", async () => {
@@ -3,12 +3,11 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { bootstrap, putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdThreadRead, THREAD_READ_DEFAULT_QUOTA } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { saveThreadsIndex } from "../store.js";
import { createUwfStore, saveThreadsIndex } from "../store.js";
// ── schemas used in tests ────────────────────────────────────────────────────
@@ -53,11 +52,8 @@ const DETAIL_SCHEMA = {
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
return createUwfStore(storageRoot);
}
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
@@ -3,7 +3,7 @@ import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { bootstrap, putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { cmdStepList, cmdStepShow } from "../commands/step.js";
import {
@@ -11,9 +11,8 @@ import {
extractLastAssistantContent,
THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { appendThreadHistory, saveThreadsIndex } from "../store.js";
import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js";
// ── schemas used in tests ────────────────────────────────────────────────────
@@ -58,11 +57,8 @@ const DETAIL_SCHEMA = {
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
return createUwfStore(storageRoot);
}
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
@@ -594,12 +590,7 @@ describe("cmdStepShow (process.exit tests - must be last)", () => {
});
test("before with unknown hash rejects", async () => {
const _uwf = await makeUwfStore(tmpDir);
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const uwfStore: UwfStore = { storageRoot: tmpDir, store, schemas };
const uwfStore = await makeUwfStore(tmpDir);
const workflowHash = await uwfStore.store.put(uwfStore.schemas.workflow, {
name: "wf2",
@@ -2,24 +2,19 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createFsStore } from "@ocas/fs";
import type { CasRef, WorkflowPayload } from "@united-workforce/protocol";
import { stringify } from "yaml";
import { cmdThreadStart } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { loadWorkflowRegistry, saveWorkflowRegistry } from "../store.js";
import { createUwfStore, saveWorkflowRegistry } from "../store.js";
// ── helpers ───────────────────────────────────────────────────────────────────
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
// Set UNCAGED_CAS_DIR to use the test's CAS directory
process.env.UNCAGED_CAS_DIR = casDir;
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
return createUwfStore(storageRoot);
}
function makeMinimalPayload(name: string, description: string): WorkflowPayload {
@@ -308,9 +303,7 @@ describe("Strategy 4: Global Registry Resolution", () => {
test("should resolve workflow from global registry when not found locally", async () => {
const uwf = await makeUwfStore(storageRoot);
const hash = await storeWorkflow(uwf, "deploy-pipeline");
const registry = await loadWorkflowRegistry(storageRoot);
registry["deploy-pipeline"] = hash;
await saveWorkflowRegistry(storageRoot, registry);
saveWorkflowRegistry(uwf.varStore, "deploy-pipeline", hash);
const isolatedRoot = join(tmpDir, "isolated");
await mkdir(isolatedRoot, { recursive: true });
@@ -360,9 +353,7 @@ describe("Resolution Priority", () => {
// Setup: Register globally
const globalHash = await storeWorkflow(uwf, "solve-issue");
const registry = await loadWorkflowRegistry(storageRoot);
registry["solve-issue"] = globalHash;
await saveWorkflowRegistry(storageRoot, registry);
saveWorkflowRegistry(uwf.varStore, "solve-issue", globalHash);
// Setup: Create local .workflow/
const workflowDir = join(projectRoot, ".workflow");
+2 -3
View File
@@ -362,7 +362,6 @@ async function materializeLocalWorkflow(uwf: UwfStore, filePath: string): Promis
async function resolveWorkflowCasRef(
uwf: UwfStore,
storageRoot: string,
workflowId: string,
projectRoot: string,
): Promise<CasRef> {
@@ -397,7 +396,7 @@ async function resolveWorkflowCasRef(
}
// Strategy 4: Global registry fallback
const registry = await loadWorkflowRegistry(storageRoot);
const registry = loadWorkflowRegistry(uwf.varStore);
const hash = resolveWorkflowHash(registry, trimmed);
if (!isCasRef(hash)) {
fail(`workflow not found: ${trimmed}`);
@@ -449,7 +448,7 @@ export async function cmdThreadStart(
}
const uwf = await createUwfStore(storageRoot);
const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot);
const workflowHash = await resolveWorkflowCasRef(uwf, workflowId, projectRoot);
const threadId = generateUlid(Date.now()) as ThreadId;
const plog = createProcessLogger({
@@ -156,9 +156,7 @@ export async function cmdWorkflowAdd(
fail("stored workflow failed schema validation");
}
const registry = await loadWorkflowRegistry(storageRoot);
registry[materialized.name] = hash;
await saveWorkflowRegistry(storageRoot, registry);
saveWorkflowRegistry(uwf.varStore, materialized.name, hash);
return { name: materialized.name, hash };
}
@@ -168,7 +166,7 @@ export async function cmdWorkflowShow(
id: string,
): Promise<WorkflowShowOutput> {
const uwf = await createUwfStore(storageRoot);
const registry = await loadWorkflowRegistry(storageRoot);
const registry = loadWorkflowRegistry(uwf.varStore);
const hash = resolveWorkflowHash(registry, id);
const node = uwf.store.get(hash);
@@ -193,8 +191,9 @@ export async function cmdWorkflowList(
storageRoot: string,
projectRoot: string,
): Promise<WorkflowListEntry[]> {
const uwf = await createUwfStore(storageRoot);
const localEntries = await discoverProjectWorkflows(projectRoot);
const registry = await loadWorkflowRegistry(storageRoot);
const registry = loadWorkflowRegistry(uwf.varStore);
const result: WorkflowListEntry[] = [];
const localNames = new Set<string>();
+48 -27
View File
@@ -1,10 +1,11 @@
import type { Dirent } from "node:fs";
import { existsSync, symlinkSync } from "node:fs";
import { access, appendFile, mkdir, readdir, readFile, writeFile } from "node:fs/promises";
import { access, appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
import { homedir } from "node:os";
import { join } from "node:path";
import type { BootstrapCapableStore, Hash } from "@ocas/core";
import { createVariableStore, type VariableStore } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type {
CasRef,
@@ -24,6 +25,9 @@ import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js";
export type WorkflowRegistry = Record<string, CasRef>;
/** Variable name prefix for workflow registry entries (`@uwf/registry/<name>`). */
export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
/** A workflow entry discovered from the project-local .workflows/ directory. */
export type ProjectWorkflowEntry = {
/** Workflow name (from YAML `name` field, equals filename stem). */
@@ -200,6 +204,7 @@ export type UwfStore = {
storageRoot: string;
store: BootstrapCapableStore;
schemas: UwfSchemaHashes;
varStore: VariableStore;
};
export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
@@ -207,41 +212,57 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
return { storageRoot, store, schemas };
const varStore = createVariableStore(join(casDir, "variables.db"), store);
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
return { storageRoot, store, schemas, varStore };
}
export async function loadWorkflowRegistry(storageRoot: string): Promise<WorkflowRegistry> {
async function loadWorkflowRegistryFromYaml(storageRoot: string): Promise<WorkflowRegistry> {
const path = getRegistryPath(storageRoot);
try {
const text = await readFile(path, "utf8");
const raw = parse(text) as unknown;
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return {};
}
const registry: WorkflowRegistry = {};
for (const [name, hash] of Object.entries(raw as Record<string, unknown>)) {
if (typeof hash === "string") {
registry[name] = hash;
}
}
return registry;
} catch (e) {
const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") {
return {};
}
throw e;
const text = await readFile(path, "utf8");
const raw = parse(text) as unknown;
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return {};
}
const registry: WorkflowRegistry = {};
for (const [name, hash] of Object.entries(raw as Record<string, unknown>)) {
if (typeof hash === "string") {
registry[name] = hash;
}
}
return registry;
}
export async function saveWorkflowRegistry(
/** One-time migration: `~/.uwf/workflows.yaml` → `@uwf/registry/*` variables. */
export async function migrateWorkflowRegistryIfNeeded(
storageRoot: string,
registry: WorkflowRegistry,
varStore: VariableStore,
): Promise<void> {
const path = getRegistryPath(storageRoot);
await mkdir(storageRoot, { recursive: true });
const text = stringify(registry, { indent: 2 });
await writeFile(path, text, "utf8");
if (!existsSync(path)) {
return;
}
const registry = await loadWorkflowRegistryFromYaml(storageRoot);
for (const [name, hash] of Object.entries(registry)) {
saveWorkflowRegistry(varStore, name, hash);
}
await rename(path, `${path}.migrated`);
}
export function loadWorkflowRegistry(varStore: VariableStore): WorkflowRegistry {
const vars = varStore.list({ namePrefix: REGISTRY_VAR_PREFIX });
const registry: WorkflowRegistry = {};
for (const v of vars) {
const name = v.name.slice(REGISTRY_VAR_PREFIX.length);
registry[name] = v.value;
}
return registry;
}
export function saveWorkflowRegistry(varStore: VariableStore, name: string, hash: CasRef): void {
varStore.set(`${REGISTRY_VAR_PREFIX}${name}`, hash);
}
export function resolveWorkflowHash(registry: WorkflowRegistry, id: string): CasRef {