Compare commits

...

6 Commits

Author SHA1 Message Date
xiaoju 80e8efb05e test: E2E integration tests with uwf-mock agent (#33)
CI / check (pull_request) Failing after 2m30s
Three scenarios testing the full CLI pipeline:
1. Linear workflow (planner → worker → $END): CAS chain integrity
2. Loop workflow (developer ↔ reviewer): moderator routing through cycles
3. Role mismatch detection: agent catches routing bugs

Uses workflow add → thread start → thread exec with uwf-mock,
verifying CAS state, thread lifecycle, and error handling.

Updated assertions to use getThread().status === 'completed'
(aligned with PR #45 unified thread storage).

Refs #33
2026-06-04 08:06:22 +00:00
xiaoju 75fb752a82 feat: add agent-mock package for deterministic E2E testing (#33)
New package @united-workforce/agent-mock (uwf-mock CLI):
- Reads pre-scripted outputs from a YAML mock data file (--mock-data)
- Counts existing CAS chain steps to determine step index
- Validates expected role matches actual moderator routing
- Stores minimal detail node in CAS for valid step refs
- Zero LLM, instant execution, 100% deterministic

Usage in config.yaml:
  agents:
    mock:
      command: uwf-mock
      args: ["--mock-data", "./fixtures/scenario.yaml"]

Refs #33
2026-06-04 08:00:07 +00:00
xiaomo bbea89c067 Merge pull request 'refactor: unified thread storage + resume completed threads' (#45) from refactor/39-unified-thread-storage into main
CI / check (push) Failing after 1m26s
refactor: unified thread storage + resume completed threads (#45)
2026-06-04 07:25:56 +00:00
xingyue bda3e3a861 feat(cli): resume completed threads (衔尾蛇: end → start)
CI / check (pull_request) Failing after 3m45s
uwf thread resume now supports completed threads:
- Evaluates workflow graph from $START to find first role
- Clears completed state (status → idle, completedAt → null)
- Builds resume prompt with supplement context
- Full CAS chain preserved for rich context

Suspended resume behavior unchanged.
Cancelled/idle threads still rejected.

425 tests pass.

Part of #39, closes #43
2026-06-04 15:13:47 +08:00
xingyue ca7b68ca5f refactor(cli): unify thread storage, remove history prefix
- store.ts: all threads in @uwf/thread/* with status tag
- Remove HISTORY_VAR_PREFIX, ThreadHistoryLine, deleteThread
- Add loadActiveThreads, loadHistoryThreads, completeThread
- Add migrateHistoryVarsToThreadVars migration
- thread.ts: replace deleteThread+addHistoryEntry with completeThread
- shared.ts: remove findHistoryEntry fallback
- Update all tests for unified storage model

422 tests pass.

Part of #39, closes #41, closes #42
2026-06-04 15:01:20 +08:00
xingyue 23e2ae9eb4 refactor(protocol): add status + completedAt to ThreadIndexEntry
- ThreadIndexEntry gains status and completedAt fields
- createThreadIndexEntry defaults to idle/null
- normalizeThreadIndexEntry backward-compat defaults
- updateThreadHead resets to idle (衔尾蛇 resume prep)
- markThreadSuspended sets status=suspended
- New markThreadCompleted(entry, status, now) function
- serializeThreadIndexEntry includes new fields

Part of #39, closes #40
2026-06-04 14:42:14 +08:00
36 changed files with 1758 additions and 375 deletions
@@ -0,0 +1,18 @@
steps:
- role: planner
output: |
---
$status: ready
plan: test-plan-hash
repoPath: /tmp/test-repo
---
Plan: implement the feature.
- role: developer
output: |
---
$status: done
branch: fix/1-test
worktree: /tmp/worktree
---
Implemented the feature.
@@ -0,0 +1,48 @@
import { readFile } from "node:fs/promises";
import { join } from "node:path";
import { describe, expect, test } from "vitest";
import { parseScenario, selectMockStep } from "../src/mock-agent.js";
const FIXTURE = join(__dirname, "fixtures", "simple-scenario.yaml");
describe("parseScenario", () => {
test("parses the 2-step fixture in order", async () => {
const scenario = parseScenario(await readFile(FIXTURE, "utf8"));
expect(scenario.steps).toHaveLength(2);
expect(scenario.steps[0].role).toBe("planner");
expect(scenario.steps[1].role).toBe("developer");
expect(scenario.steps[0].output).toContain("$status: ready");
expect(scenario.steps[1].output).toContain("branch: fix/1-test");
});
test("rejects documents without a steps array", () => {
expect(() => parseScenario("foo: bar")).toThrow(/steps/);
});
test("rejects steps missing role or output", () => {
expect(() => parseScenario("steps:\n - role: planner")).toThrow(/role.*output/);
});
});
describe("selectMockStep", () => {
const scenario = {
steps: [
{ role: "planner", output: "plan-output" },
{ role: "developer", output: "dev-output" },
],
};
test("step index counts existing steps to pick the current step", () => {
expect(selectMockStep(scenario, 0, "planner").output).toBe("plan-output");
expect(selectMockStep(scenario, 1, "developer").output).toBe("dev-output");
});
test("throws when the moderator routes to an unexpected role", () => {
expect(() => selectMockStep(scenario, 0, "developer")).toThrow(/expected role "planner"/);
});
test("throws when the step index runs past the scripted steps", () => {
expect(() => selectMockStep(scenario, 2, "planner")).toThrow(/no step at index 2/);
});
});
+47
View File
@@ -0,0 +1,47 @@
{
"name": "@united-workforce/agent-mock",
"version": "0.5.0",
"files": [
"src",
"dist",
"package.json"
],
"type": "module",
"bin": {
"uwf-mock": "./src/cli.ts"
},
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"scripts": {
"prepublishOnly": "echo 'Use pnpm run release from repo root' && exit 1",
"test": "vitest run __tests__/",
"test:ci": "vitest run __tests__/"
},
"dependencies": {
"@ocas/core": "^0.3.0",
"@united-workforce/protocol": "workspace:^",
"@united-workforce/util": "workspace:^",
"@united-workforce/util-agent": "workspace:^",
"yaml": "^2.9.0"
},
"devDependencies": {
"typescript": "^5.8.3"
},
"publishConfig": {
"access": "public"
},
"repository": {
"type": "git",
"url": "https://git.shazhou.work/shazhou/united-workforce.git",
"directory": "packages/agent-mock"
},
"homepage": "https://git.shazhou.work/shazhou/united-workforce#readme",
"bugs": {
"url": "https://git.shazhou.work/shazhou/united-workforce/issues"
},
"license": "MIT"
}
+18
View File
@@ -0,0 +1,18 @@
#!/usr/bin/env node
import { createMockAgent } from "./mock-agent.js";
const USAGE = "usage: uwf-mock --mock-data <path> --thread <id> --role <role> --prompt <text>";
function getMockDataPath(argv: string[]): string {
const idx = argv.indexOf("--mock-data");
if (idx === -1 || idx + 1 >= argv.length || argv[idx + 1] === "") {
process.stderr.write(`--mock-data is required. ${USAGE}\n`);
process.exit(1);
}
return argv[idx + 1];
}
const mockDataPath = getMockDataPath(process.argv);
const main = createMockAgent(mockDataPath);
void main();
+2
View File
@@ -0,0 +1,2 @@
export { createMockAgent, parseScenario, selectMockStep } from "./mock-agent.js";
export type { MockScenario, MockStep } from "./types.js";
+128
View File
@@ -0,0 +1,128 @@
import { readFile } from "node:fs/promises";
import { bootstrap, type JSONSchema, putSchema, type Store } from "@ocas/core";
import { createLogger } from "@united-workforce/util";
import { type AgentContext, type AgentRunResult, createAgent } from "@united-workforce/util-agent";
import { parse } from "yaml";
import type { MockScenario, MockStep } from "./types.js";
const log = createLogger({ sink: { kind: "stderr" } });
const MOCK_DETAIL_SCHEMA: JSONSchema = {
title: "mock-detail",
type: "object",
required: ["sessionId", "role", "stepIndex"],
properties: {
sessionId: { type: "string" },
role: { type: "string" },
stepIndex: { type: "integer" },
},
additionalProperties: false,
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
/** Parse a YAML mock data document into a {@link MockScenario}. Pure — no I/O. */
export function parseScenario(text: string): MockScenario {
const raw = parse(text) as unknown;
if (!isRecord(raw) || !Array.isArray(raw.steps)) {
throw new Error("mock data must be a mapping with a 'steps' array");
}
const steps: MockStep[] = raw.steps.map((entry, i) => {
if (!isRecord(entry) || typeof entry.role !== "string" || typeof entry.output !== "string") {
throw new Error(`mock step ${i} must have string 'role' and string 'output'`);
}
return { role: entry.role, output: entry.output };
});
return { steps };
}
async function loadScenario(path: string): Promise<MockScenario> {
const text = await readFile(path, "utf8");
return parseScenario(text);
}
/**
* Pick the scripted step for the given index and verify the moderator routed to
* the expected role. Throws on out-of-range index or role mismatch so routing
* bugs surface loudly during E2E runs.
*/
export function selectMockStep(scenario: MockScenario, stepIndex: number, role: string): MockStep {
const step = scenario.steps[stepIndex];
if (step === undefined) {
throw new Error(
`mock scenario has no step at index ${stepIndex} (total ${scenario.steps.length}); ` +
`moderator routed to role "${role}"`,
);
}
if (step.role !== role) {
throw new Error(
`mock step ${stepIndex} expected role "${step.role}" but moderator routed to "${role}"`,
);
}
return step;
}
/** Persist a minimal detail node so the step node has a valid CAS ref. */
async function storeMockDetail(
store: Store,
sessionId: string,
role: string,
stepIndex: number,
): Promise<string> {
await bootstrap(store);
const schemaHash = await putSchema(store, MOCK_DETAIL_SCHEMA);
return store.cas.put(schemaHash, { sessionId, role, stepIndex });
}
/**
* Agent CLI factory: a deterministic, LLM-free agent that replays pre-scripted
* outputs from a YAML mock data file. The step index is derived by counting the
* existing steps in the thread's CAS chain (exposed via `ctx.steps`).
*/
export function createMockAgent(mockDataPath: string): () => Promise<void> {
let lastResult: AgentRunResult | null = null;
async function run(ctx: AgentContext): Promise<AgentRunResult> {
const scenario = await loadScenario(mockDataPath);
const stepIndex = ctx.steps.length;
log(
"MK7X2QPV",
`mock step ${stepIndex} for role "${ctx.role}" (${scenario.steps.length} scripted)`,
);
const step = selectMockStep(scenario, stepIndex, ctx.role);
const sessionId = `mock-${stepIndex}`;
const detailHash = await storeMockDetail(ctx.store, sessionId, ctx.role, stepIndex);
const result: AgentRunResult = {
output: step.output,
detailHash,
sessionId,
assembledPrompt: "",
};
lastResult = result;
return result;
}
async function continueRun(
sessionId: string,
_message: string,
_store: Store,
): Promise<AgentRunResult> {
if (lastResult === null) {
throw new Error("mock continue called before run");
}
log("MK3N8RTW", `mock continue for session ${sessionId}, replaying scripted output`);
return lastResult;
}
return createAgent({
name: "mock",
run,
continue: continueRun,
});
}
+12
View File
@@ -0,0 +1,12 @@
/** One pre-scripted step in a mock scenario. */
export type MockStep = {
/** Role this step is expected to run as. Validated against the actual `--role` argument. */
role: string;
/** Frontmatter markdown output the mock agent emits for this step. */
output: string;
};
/** Deterministic, pre-scripted agent script loaded from a YAML mock data file. */
export type MockScenario = {
steps: MockStep[];
};
+9
View File
@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [{ "path": "../util-agent" }, { "path": "../util" }, { "path": "../protocol" }]
}
+16 -32
View File
@@ -7,10 +7,9 @@ import { describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import {
addHistoryEntry,
completeThread,
createUwfStore,
deleteThread,
loadAllThreads,
loadActiveThreads,
setThread,
} from "../store.js";
@@ -175,7 +174,7 @@ async function insertStepNode(
outputPayload: Record<string, unknown>,
): Promise<void> {
const uwf = await createUwfStore(storageRoot);
const index = loadAllThreads(uwf.varStore);
const index = loadActiveThreads(uwf.varStore);
const headEntry = index[threadId];
if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head;
@@ -206,7 +205,13 @@ async function insertStepNode(
assembledPrompt: null,
})) as CasRef;
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
setThread(uwf.varStore, threadId, {
head: stepHash,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
}
describe("currentRole field", () => {
@@ -286,15 +291,8 @@ describe("currentRole field", () => {
const tid = thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot);
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
deleteThread(uwfForIndex.varStore, tid);
addHistoryEntry(uwfForIndex.varStore, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
completeThread(uwfForIndex.varStore, tid, "completed");
const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("completed");
@@ -314,15 +312,8 @@ describe("currentRole field", () => {
const tid = thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot);
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head;
deleteThread(uwfForIndex.varStore, tid);
addHistoryEntry(uwfForIndex.varStore, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
});
const head = loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
completeThread(uwfForIndex.varStore, tid, "cancelled");
const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("cancelled");
@@ -375,15 +366,8 @@ describe("currentRole field", () => {
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot);
const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head;
deleteThread(uwfForIndex.varStore, compId);
addHistoryEntry(uwfForIndex.varStore, {
thread: compId,
workflow: comp.workflow,
head: compHead,
completedAt: Date.now(),
reason: "completed",
});
const compHead = loadActiveThreads(uwfForIndex.varStore)[compId]!.head;
completeThread(uwfForIndex.varStore, compId, "completed");
const list = await cmdThreadList(storageRoot, null, null, null, 0, 100);
@@ -0,0 +1,296 @@
import { execFileSync } from "node:child_process";
import { existsSync } from "node:fs";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { openStore } from "@ocas/fs";
import type { CasRef, StartNodePayload, StepNodePayload } from "@united-workforce/protocol";
import { afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest";
import { stringify } from "yaml";
import { cmdThreadStart } from "../commands/thread.js";
import { cmdWorkflowAdd } from "../commands/workflow.js";
import { createUwfStore, getThread } from "../store.js";
// ── paths ──────────────────────────────────────────────────────────────────
const TEST_DIR = dirname(fileURLToPath(import.meta.url));
const FIXTURES_DIR = join(TEST_DIR, "fixtures");
const CLI_PATH = join(TEST_DIR, "..", "..", "dist", "cli.js");
const REPO_ROOT = join(TEST_DIR, "..", "..", "..", "..");
const AGENT_MOCK_DIR = join(REPO_ROOT, "packages", "agent-mock");
const AGENT_MOCK_CLI = join(AGENT_MOCK_DIR, "dist", "cli.js");
// ── shared fixture state ─────────────────────────────────────────────────────
let tmpDir: string;
let uwfHome: string;
let casDir: string;
let savedEnv: { uwf: string | undefined; ocas: string | undefined };
/**
* The mock agent runs from its built `dist/cli.js`. When the test suite runs
* standalone (no prior `pnpm run build`), build it on demand so the E2E run is
* self-contained.
*/
beforeAll(() => {
if (existsSync(AGENT_MOCK_CLI)) {
return;
}
execFileSync(
process.execPath,
[
join(REPO_ROOT, "node_modules", "typescript", "bin", "tsc"),
"--build",
"--force",
AGENT_MOCK_DIR,
],
{ cwd: REPO_ROOT, stdio: "ignore" },
);
}, 120000);
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-e2e-mock-"));
uwfHome = join(tmpDir, "uwf");
casDir = join(tmpDir, "ocas");
await mkdir(uwfHome, { recursive: true });
await mkdir(casDir, { recursive: true });
// Programmatic CLI APIs (cmdWorkflowAdd, cmdThreadStart) read the global CAS
// directory from OCAS_HOME and the storage root from UWF_HOME.
savedEnv = { uwf: process.env.UWF_HOME, ocas: process.env.OCAS_HOME };
process.env.UWF_HOME = uwfHome;
process.env.OCAS_HOME = casDir;
});
afterEach(async () => {
process.env.UWF_HOME = savedEnv.uwf;
process.env.OCAS_HOME = savedEnv.ocas;
await rm(tmpDir, { recursive: true, force: true });
});
// ── helpers ──────────────────────────────────────────────────────────────────
/**
* Write a `config.yaml` into UWF_HOME that wires the default agent to the mock
* agent. The mock data path is baked into the agent args so the CLI's
* `thread exec` (without an `--agent` override) resolves it from config.
*/
async function writeMockConfig(mockDataFixture: string): Promise<void> {
const config = {
defaultAgent: "mock",
defaultModel: "test",
providers: {},
models: {},
agentOverrides: null,
agents: {
mock: {
command: process.execPath,
args: [AGENT_MOCK_CLI, "--mock-data", join(FIXTURES_DIR, mockDataFixture)],
},
},
};
await writeFile(join(uwfHome, "config.yaml"), stringify(config));
}
/**
* `cmdWorkflowAdd` enforces filename↔name consistency, so copy the fixture into
* UWF_HOME under `<workflow-name>.yaml` before registering it.
*/
async function addWorkflow(workflowFixture: string, workflowName: string): Promise<CasRef> {
const text = await readFile(join(FIXTURES_DIR, workflowFixture), "utf8");
const filePath = join(uwfHome, `${workflowName}.yaml`);
await writeFile(filePath, text);
const result = await cmdWorkflowAdd(uwfHome, filePath);
return result.hash;
}
type ExecResult = { stdout: string; stderr: string; exitCode: number };
function runExec(threadId: string): ExecResult {
try {
const stdout = execFileSync(process.execPath, [CLI_PATH, "thread", "exec", threadId], {
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, UWF_HOME: uwfHome, OCAS_HOME: casDir },
cwd: tmpDir,
timeout: 30000,
});
return { stdout, stderr: "", exitCode: 0 };
} catch (e: unknown) {
const err = e as NodeJS.ErrnoException & {
stdout?: string;
stderr?: string;
status?: number;
};
return { stdout: err.stdout ?? "", stderr: err.stderr ?? "", exitCode: err.status ?? 1 };
}
}
type StepOutputJson = {
thread: string;
head: string;
status: string;
currentRole: string | null;
done: boolean;
};
function execStep(threadId: string): StepOutputJson {
const { stdout, stderr, exitCode } = runExec(threadId);
if (exitCode !== 0) {
throw new Error(`thread exec failed (code ${exitCode})\nstdout: ${stdout}\nstderr: ${stderr}`);
}
return JSON.parse(stdout.trim()) as StepOutputJson;
}
function getStepNode(store: Awaited<ReturnType<typeof openStore>>, hash: string): StepNodePayload {
const node = store.cas.get(hash as CasRef);
expect(node).not.toBeNull();
return node!.payload as StepNodePayload;
}
function getStatus(store: Awaited<ReturnType<typeof openStore>>, outputRef: CasRef): unknown {
const node = store.cas.get(outputRef);
expect(node).not.toBeNull();
return (node!.payload as Record<string, unknown>).$status;
}
// ── scenarios ─────────────────────────────────────────────────────────────────
describe("E2E mock-agent: full uwf pipeline", () => {
test("1. linear workflow runs planner then worker and reaches $END", async () => {
await writeMockConfig("e2e-linear.mock.yaml");
const workflowHash = await addWorkflow("e2e-linear.workflow.yaml", "test-linear");
const start = await cmdThreadStart(uwfHome, workflowHash, "Build the thing", uwfHome, tmpDir);
const threadId = start.thread;
// Capture the start node hash (thread head before any step).
const startHash = getThread((await createUwfStore(uwfHome)).varStore, threadId)?.head;
expect(startHash).toBeDefined();
// Step 1 → planner.
const step1 = execStep(threadId);
expect(step1.thread).toBe(threadId);
expect(step1.done).toBe(false);
expect(step1.status).toBe("idle");
expect(step1.currentRole).toBe("worker");
// Step 2 → worker → $END (thread archived to history).
const step2 = execStep(threadId);
expect(step2.done).toBe(true);
expect(step2.status).toBe("completed");
expect(step2.currentRole).toBeNull();
// Verify CAS chain integrity: start → step1 → step2.
const store = await openStore(casDir);
const s1 = getStepNode(store, step1.head);
const s2 = getStepNode(store, step2.head);
expect(s1.role).toBe("planner");
expect(s1.prev).toBeNull();
expect(s1.start).toBe(startHash);
expect(s2.role).toBe("worker");
expect(s2.prev).toBe(step1.head);
expect(s2.start).toBe(s1.start);
// Output frontmatter statuses persisted correctly.
expect(getStatus(store, s1.output)).toBe("ready");
expect(getStatus(store, s2.output)).toBe("done");
// The start node points at the registered workflow.
const startNode = store.cas.get(startHash as CasRef);
expect((startNode!.payload as StartNodePayload).workflow).toBe(workflowHash);
// Thread is completed: status changed to "completed", head updated.
const uwf = await createUwfStore(uwfHome);
const finalEntry = getThread(uwf.varStore, threadId);
expect(finalEntry).not.toBeNull();
expect(finalEntry!.status).toBe("completed");
expect(finalEntry!.head).toBe(step2.head);
});
test("2. branching workflow loops developer→reviewer→developer→reviewer→$END", async () => {
await writeMockConfig("e2e-loop.mock.yaml");
const workflowHash = await addWorkflow("e2e-loop.workflow.yaml", "test-loop");
const start = await cmdThreadStart(uwfHome, workflowHash, "Implement feature", uwfHome, tmpDir);
const threadId = start.thread;
// 4 steps: developer, reviewer (rejected → loop), developer, reviewer (approved → $END).
const s1 = execStep(threadId);
expect(s1.status).toBe("idle");
expect(s1.currentRole).toBe("reviewer");
const s2 = execStep(threadId);
expect(s2.status).toBe("idle");
// reviewer rejected → loops back to developer.
expect(s2.currentRole).toBe("developer");
const s3 = execStep(threadId);
expect(s3.status).toBe("idle");
expect(s3.currentRole).toBe("reviewer");
const s4 = execStep(threadId);
expect(s4.done).toBe(true);
expect(s4.status).toBe("completed");
// Verify the chain order and roles.
const store = await openStore(casDir);
const n1 = getStepNode(store, s1.head);
const n2 = getStepNode(store, s2.head);
const n3 = getStepNode(store, s3.head);
const n4 = getStepNode(store, s4.head);
expect([n1.role, n2.role, n3.role, n4.role]).toEqual([
"developer",
"reviewer",
"developer",
"reviewer",
]);
expect(n1.prev).toBeNull();
expect(n2.prev).toBe(s1.head);
expect(n3.prev).toBe(s2.head);
expect(n4.prev).toBe(s3.head);
// All steps share the same start node.
expect(new Set([n1.start, n2.start, n3.start, n4.start]).size).toBe(1);
// Statuses drove the loop routing.
expect(getStatus(store, n1.output)).toBe("review_needed");
expect(getStatus(store, n2.output)).toBe("rejected");
expect(getStatus(store, n3.output)).toBe("review_needed");
expect(getStatus(store, n4.output)).toBe("approved");
const uwf = await createUwfStore(uwfHome);
const finalEntry = getThread(uwf.varStore, threadId);
expect(finalEntry).not.toBeNull();
expect(finalEntry!.status).toBe("completed");
});
test("3. role mismatch in mock data makes the agent exit with an error", async () => {
// Reuses the linear workflow but with a mock whose step[1].role is wrong.
await writeMockConfig("e2e-mismatch.mock.yaml");
const workflowHash = await addWorkflow("e2e-linear.workflow.yaml", "test-linear");
const start = await cmdThreadStart(uwfHome, workflowHash, "Build the thing", uwfHome, tmpDir);
const threadId = start.thread;
// Step 1 (planner) matches and succeeds.
const step1 = execStep(threadId);
expect(step1.status).toBe("idle");
expect(step1.currentRole).toBe("worker");
// Step 2: moderator routes to "worker" but mock step[1].role is "planner".
const result = runExec(threadId);
expect(result.exitCode).not.toBe(0);
expect(`${result.stdout}\n${result.stderr}`).toMatch(/expected role "planner"/);
// The thread remains active (no step node was written for the failed step).
const uwf = await createUwfStore(uwfHome);
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry!.status).not.toBe("completed");
expect(entry!.head).toBe(step1.head);
});
});
@@ -0,0 +1,13 @@
steps:
- role: planner
output: |
---
$status: ready
---
Planning complete.
- role: worker
output: |
---
$status: done
---
Work complete.
@@ -0,0 +1,32 @@
name: test-linear
description: Simple 2-step linear test (planner -> worker -> $END)
roles:
planner:
description: Plans work
goal: Plan the task
capabilities: []
procedure: Plan it
output: Output a plan and set $status to ready
frontmatter:
oneOf:
- properties:
$status: { const: ready }
required: [$status]
worker:
description: Does work
goal: Do the work
capabilities: []
procedure: Do it
output: Output the result and set $status to done
frontmatter:
oneOf:
- properties:
$status: { const: done }
required: [$status]
graph:
$START:
_: { role: planner, prompt: 'Plan the task' }
planner:
ready: { role: worker, prompt: 'Do the work' }
worker:
done: { role: '$END', prompt: 'Done' }
@@ -0,0 +1,25 @@
steps:
- role: developer
output: |
---
$status: review_needed
---
First implementation.
- role: reviewer
output: |
---
$status: rejected
---
Needs changes, sending back.
- role: developer
output: |
---
$status: review_needed
---
Second implementation addressing feedback.
- role: reviewer
output: |
---
$status: approved
---
Looks good, approved.
@@ -0,0 +1,36 @@
name: test-loop
description: Branching test where the reviewer can reject and loop back to the developer
roles:
developer:
description: Implements changes
goal: Implement the change
capabilities: []
procedure: Write code
output: Summarize the change and set $status to review_needed
frontmatter:
oneOf:
- properties:
$status: { const: review_needed }
required: [$status]
reviewer:
description: Reviews changes
goal: Review the change
capabilities: []
procedure: Review code
output: Approve or reject; set $status to approved or rejected
frontmatter:
oneOf:
- properties:
$status: { const: rejected }
required: [$status]
- properties:
$status: { const: approved }
required: [$status]
graph:
$START:
_: { role: developer, prompt: 'Implement the change' }
developer:
review_needed: { role: reviewer, prompt: 'Review the change' }
reviewer:
rejected: { role: developer, prompt: 'Fix the issues and resubmit' }
approved: { role: '$END', prompt: 'Approved, done' }
@@ -0,0 +1,16 @@
# Reuses the test-linear workflow. The moderator routes step 0 -> planner and
# step 1 -> worker, but step[1].role below is "planner", so the mock agent must
# detect the role mismatch on the second step and exit with an error.
steps:
- role: planner
output: |
---
$status: ready
---
Planning complete.
- role: planner
output: |
---
$status: done
---
This step claims to be planner, but the moderator routes to worker.
@@ -4,7 +4,7 @@ import { join } from "node:path";
import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { resolveHeadHash } from "../commands/shared.js";
import { addHistoryEntry, createUwfStore, setThread } from "../store.js";
import { completeThread, createUwfStore, setThread } from "../store.js";
let tmpDir: string;
@@ -31,19 +31,13 @@ describe("resolveHeadHash", () => {
expect(result).toBe(headHash);
});
test("falls back to history variable when thread not in active index", async () => {
test("finds completed thread", async () => {
const threadId = "01JTEST0000000000000000002" as ThreadId;
const workflowHash = "workflow_hash_789" as CasRef;
const uwf = await createUwfStore(tmpDir);
const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef;
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
head: headHash,
completedAt: Date.now(),
reason: null,
});
setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
completeThread(uwf.varStore, threadId, "completed");
const result = await resolveHeadHash(tmpDir, threadId);
@@ -54,58 +48,36 @@ describe("resolveHeadHash", () => {
// calls fail() which does process.exit(1), terminating the test runner.
// The error behavior is tested in integration tests below via CLI invocation.
test("prioritizes active thread over history when thread exists in both", async () => {
test("prioritizes active thread", async () => {
const threadId = "01JTEST0000000000000000004" as ThreadId;
const workflowHash = "workflow_hash_xyz" as CasRef;
const uwf = await createUwfStore(tmpDir);
const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef;
const historicalHash = (await uwf.store.cas.put(uwf.schemas.text, "historical-v1")) as CasRef;
setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead));
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
head: historicalHash,
completedAt: Date.now(),
reason: null,
});
const result = await resolveHeadHash(tmpDir, threadId);
// Should return the active head, not the historical one
// Should return the active head
expect(result).toBe(activeHead);
});
test("finds thread from multiple history entries", async () => {
test("finds thread from multiple completed threads", async () => {
const threadId1 = "01JTEST0000000000000000005" as ThreadId;
const threadId2 = "01JTEST0000000000000000006" as ThreadId;
const threadId3 = "01JTEST0000000000000000007" as ThreadId;
const workflowHash = "workflow_hash_abc" as CasRef;
const uwf = await createUwfStore(tmpDir);
const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef;
const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef;
const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef;
addHistoryEntry(uwf.varStore, {
thread: threadId1,
workflow: workflowHash,
head: hash1,
completedAt: Date.now() - 2000,
reason: null,
});
addHistoryEntry(uwf.varStore, {
thread: threadId2,
workflow: workflowHash,
head: hash2,
completedAt: Date.now() - 1000,
reason: null,
});
addHistoryEntry(uwf.varStore, {
thread: threadId3,
workflow: workflowHash,
head: hash3,
completedAt: Date.now(),
reason: null,
});
setThread(uwf.varStore, threadId1, createThreadIndexEntry(hash1));
completeThread(uwf.varStore, threadId1, "completed");
setThread(uwf.varStore, threadId2, createThreadIndexEntry(hash2));
completeThread(uwf.varStore, threadId2, "completed");
setThread(uwf.varStore, threadId3, createThreadIndexEntry(hash3));
completeThread(uwf.varStore, threadId3, "completed");
const result = await resolveHeadHash(tmpDir, threadId2);
@@ -226,19 +226,15 @@ describe("Global CAS directory", () => {
const uwf = await createUwfStore(storageRoot);
const threadId = "thread-123" as ThreadId;
const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head");
const { addHistoryEntry, findHistoryEntry } = await import("../store.js");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: "workflow-456",
head: headHash,
completedAt: Date.now(),
reason: "completed",
});
const { completeThread, setThread, getThread } = await import("../store.js");
const { createThreadIndexEntry } = await import("@united-workforce/protocol");
const entry = findHistoryEntry(uwf.varStore, threadId);
expect(entry?.thread).toBe(threadId);
expect(entry?.workflow).toBe("workflow-456");
setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry?.head).toBe(headHash);
expect(entry?.status).toBe("completed");
const { access } = await import("node:fs/promises");
await access(join(globalCasDir, "vars"));
@@ -274,15 +270,12 @@ describe("Global CAS directory", () => {
);
const uwf = await createUwfStore(storageRoot);
const { findHistoryEntry } = await import("../store.js");
const entry = findHistoryEntry(uwf.varStore, threadId);
expect(entry).toEqual({
thread: threadId,
workflow: workflowHash,
head: headHash,
completedAt,
reason: "cancelled",
});
const { getThread } = await import("../store.js");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.head).toBe(headHash);
expect(entry?.status).toBe("cancelled");
expect(entry?.completedAt).toBe(completedAt);
await expect(access(historyPath)).rejects.toThrow();
const migratedContent = await readFile(`${historyPath}.migrated`, "utf8");
@@ -0,0 +1,235 @@
import { mkdir, mkdtemp } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest";
import {
completeThread,
createUwfStore,
getThread,
loadActiveThreads,
loadHistoryThreads,
setThread,
} from "../store.js";
async function makeUwfStore(storageRoot: string) {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
process.env.OCAS_DIR = casDir;
return createUwfStore(storageRoot);
}
async function seedThreadHead(
uwf: Awaited<ReturnType<typeof createUwfStore>>,
label: string,
): Promise<CasRef> {
return (await uwf.store.cas.put(uwf.schemas.text, label)) as CasRef;
}
describe("unified thread storage", () => {
test("loadActiveThreads excludes completed threads", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000ACTIVE1" as ThreadId;
const threadId2 = "01JTEST000000000000ACTIVE2" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "completed-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const active = loadActiveThreads(uwf.varStore);
expect(Object.keys(active)).toHaveLength(1);
expect(active[threadId1]).toBeDefined();
expect(active[threadId2]).toBeUndefined();
});
test("loadActiveThreads excludes cancelled threads", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000ACTIVE3" as ThreadId;
const threadId2 = "01JTEST000000000000ACTIVE4" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "cancelled-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const active = loadActiveThreads(uwf.varStore);
expect(Object.keys(active)).toHaveLength(1);
expect(active[threadId1]).toBeDefined();
expect(active[threadId2]).toBeUndefined();
});
test("loadHistoryThreads only returns completed and cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-history-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000HISTOR1" as ThreadId;
const threadId2 = "01JTEST000000000000HISTOR2" as ThreadId;
const threadId3 = "01JTEST000000000000HISTOR3" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "completed-head");
const head3 = await seedThreadHead(uwf, "cancelled-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
setThread(uwf.varStore, threadId3, {
head: head3,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const history = loadHistoryThreads(uwf.varStore);
expect(Object.keys(history)).toHaveLength(2);
expect(history[threadId1]).toBeUndefined();
expect(history[threadId2]).toBeDefined();
expect(history[threadId3]).toBeDefined();
});
test("completeThread marks thread as completed", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE1" as ThreadId;
const head = await seedThreadHead(uwf, "active-head");
setThread(uwf.varStore, threadId, {
head,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.completedAt).toBeDefined();
expect(entry?.completedAt).toBeGreaterThan(0);
});
test("completeThread marks thread as cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE2" as ThreadId;
const head = await seedThreadHead(uwf, "active-head");
setThread(uwf.varStore, threadId, {
head,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "cancelled");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("cancelled");
expect(entry?.completedAt).toBeDefined();
expect(entry?.completedAt).toBeGreaterThan(0);
});
test("completeThread clears suspend metadata", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE3" as ThreadId;
const head = await seedThreadHead(uwf, "suspended-head");
setThread(uwf.varStore, threadId, {
head,
status: "suspended",
suspendedRole: "test-role",
suspendMessage: "test message",
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.suspendedRole).toBeNull();
expect(entry?.suspendMessage).toBeNull();
});
test("completeThread handles non-existent thread gracefully", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000NOEXIST" as ThreadId;
// Should not throw
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).toBeNull();
});
test("status and completedAt tags are persisted and loaded", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-tags-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000TAGTEST" as ThreadId;
const head = await seedThreadHead(uwf, "test-head");
const now = Date.now();
setThread(uwf.varStore, threadId, {
head,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: now,
});
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.completedAt).toBe(now);
});
});
@@ -3,7 +3,13 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest";
import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js";
import {
completeThread,
createUwfStore,
getThread,
loadHistoryThreads,
setThread,
} from "../store.js";
async function makeUwfStore(storageRoot: string) {
const casDir = join(storageRoot, "cas");
@@ -20,88 +26,113 @@ async function seedHistoryHead(
}
describe("thread cancel status", () => {
test("cancelled history entry has reason 'cancelled'", async () => {
test("cancelled thread has status 'cancelled'", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL1" as ThreadId;
const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "cancelled-head");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: "test-workflow",
setThread(uwf.varStore, threadId, {
head,
completedAt: Date.now(),
reason: "cancelled",
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
const history = loadAllHistory(uwf.varStore);
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("cancelled");
completeThread(uwf.varStore, threadId, "cancelled");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("cancelled");
});
test("completed history entry has reason 'completed'", async () => {
test("completed thread has status 'completed'", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL2" as ThreadId;
const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "completed-head");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: "test-workflow",
setThread(uwf.varStore, threadId, {
head,
completedAt: Date.now(),
reason: "completed",
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
const history = loadAllHistory(uwf.varStore);
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("completed");
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
});
test("history entry with null reason is stored as completed", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL3" as ThreadId;
const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "legacy-head");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: "test-workflow",
head,
completedAt: Date.now(),
reason: null,
});
const history = loadAllHistory(uwf.varStore);
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("completed");
});
test("mixed completed and cancelled entries preserve distinct reasons", async () => {
test("loadHistoryThreads returns completed and cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const uwf = await makeUwfStore(tmpDir);
const head1 = await seedHistoryHead(uwf, "head1");
const head2 = await seedHistoryHead(uwf, "head2");
addHistoryEntry(uwf.varStore, {
thread: "01JTEST000000000000CANCEL4" as ThreadId,
workflow: "test-workflow",
const threadId1 = "01JTEST000000000000CANCEL4" as ThreadId;
setThread(uwf.varStore, threadId1, {
head: head1,
completedAt: Date.now(),
reason: "completed",
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId1, "completed");
addHistoryEntry(uwf.varStore, {
thread: "01JTEST000000000000CANCEL5" as ThreadId,
workflow: "test-workflow",
const threadId2 = "01JTEST000000000000CANCEL5" as ThreadId;
setThread(uwf.varStore, threadId2, {
head: head2,
completedAt: Date.now(),
reason: "cancelled",
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId2, "cancelled");
const history = loadHistoryThreads(uwf.varStore);
expect(Object.keys(history)).toHaveLength(2);
const statuses = Object.values(history)
.map((entry) => entry.status)
.sort();
expect(statuses).toEqual(["cancelled", "completed"]);
});
const history = loadAllHistory(uwf.varStore);
expect(history).toHaveLength(2);
const reasons = history.map((entry) => entry.reason).sort();
expect(reasons).toEqual(["cancelled", "completed"]);
test("mixed completed and cancelled entries preserve distinct statuses", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const uwf = await makeUwfStore(tmpDir);
const head1 = await seedHistoryHead(uwf, "head1");
const head2 = await seedHistoryHead(uwf, "head2");
const threadId1 = "01JTEST000000000000CANCEL6" as ThreadId;
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId1, "completed");
const threadId2 = "01JTEST000000000000CANCEL7" as ThreadId;
setThread(uwf.varStore, threadId2, {
head: head2,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId2, "cancelled");
const history = loadHistoryThreads(uwf.varStore);
expect(Object.keys(history)).toHaveLength(2);
const statuses = Object.values(history)
.map((entry) => entry.status)
.sort();
expect(statuses).toEqual(["cancelled", "completed"]);
});
});
@@ -10,9 +10,8 @@ import { cmdThreadList } from "../commands/thread.js";
import { parseTimeInput } from "../commands/thread-time-parser.js";
import type { UwfStore } from "../store.js";
import {
addHistoryEntry,
completeThread as completeThreadInStore,
createUwfStore,
deleteThread,
loadAllThreads,
setThread,
} from "../store.js";
@@ -77,14 +76,7 @@ async function completeThread(
headHash: CasRef,
) {
const uwfIdx = await createUwfStore(storageRoot);
deleteThread(uwfIdx.varStore, threadId);
addHistoryEntry(uwfIdx.varStore, {
thread: threadId,
workflow: workflowHash,
head: headHash,
completedAt: Date.now(),
reason: null,
});
completeThreadInStore(uwfIdx.varStore, threadId, "completed");
}
// ── test setup ────────────────────────────────────────────────────────────────
@@ -500,8 +492,10 @@ describe("edge cases", () => {
)) as CasRef;
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
head: placeholderHead,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
};
for (const [tid, ent] of Object.entries(index)) {
setThread(uwfIdx.varStore, tid as ThreadId, ent);
@@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: stepHash,
status: "suspended",
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
},
});
@@ -247,7 +249,7 @@ describe("uwf thread resume", () => {
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread is not suspended");
expect(result.stderr).toContain("thread cannot be resumed");
});
test("resume suspended thread executes step and becomes idle", async () => {
@@ -347,8 +349,10 @@ describe("uwf thread resume", () => {
const uwfAfterFirst = await createUwfStore(tmpDir);
expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({
head: firstResume.head,
status: "suspended",
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
});
const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent(
@@ -444,3 +448,266 @@ echo '${adapterJson}'
return { mockAgentPath };
}
describe("uwf thread resume - completed threads", () => {
test("resume completed thread starts from $START role", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "test-completed-resume",
description: "completed thread resume test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
reviewer: {
description: "Reviewer role",
goal: "Review",
capabilities: [],
procedure: "review",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: { _: { role: "reviewer", prompt: "Review the work", location: null } },
reviewer: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Initial task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
const workerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const reviewerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const detailHash = await store.cas.put(schemas.text, "mock detail");
const workerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600000000,
completedAtMs: 1716600001000,
cwd: tmpDir,
assembledPrompt: null,
});
const reviewerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: workerStepHash,
role: "reviewer",
output: reviewerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Review the work",
startedAtMs: 1716600001000,
completedAtMs: 1716600002000,
cwd: tmpDir,
assembledPrompt: null,
});
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: reviewerStepHash,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: 1716600002000,
},
});
// Verify the status was actually set
const { createUwfStore, getThread } = await import("../store.js");
const verifyUwf = await createUwfStore(tmpDir);
const verifyEntry = getThread(verifyUwf.varStore, THREAD_ID);
// biome-ignore lint/nursery/noConsole: test debugging
console.log("Seeded entry status:", verifyEntry?.status);
// biome-ignore lint/nursery/noConsole: test debugging
console.log("Seeded entry:", JSON.stringify(verifyEntry, null, 2));
const promptCapturePath = join(tmpDir, "captured-prompt-completed.txt");
const mockAgentPath = join(tmpDir, "mock-agent-completed.sh");
const newWorkerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: reviewerStepHash,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
cwd: tmpDir,
assembledPrompt: null,
});
const adapterJson = JSON.stringify({
stepHash: newWorkerStepHash,
detailHash,
role: "worker",
frontmatter: { $status: "_" },
body: "",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
});
await writeFile(
mockAgentPath,
`#!/bin/sh
prompt=""
while [ $# -gt 0 ]; do
if [ "$1" = "--prompt" ]; then
prompt="$2"
shift 2
else
shift
fi
done
printf '%s' "$prompt" > '${promptCapturePath}'
echo '${adapterJson}'
`,
{ mode: 0o755 },
);
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
const result = runUwf(
["thread", "resume", THREAD_ID, "-p", "Additional context", "--agent", mockAgentPath],
casDir,
);
if (result.status !== 0) {
// biome-ignore lint/nursery/noConsole: test debugging
console.error("Command failed:", result.stderr);
}
expect(result.status).toBe(0);
const cliOutput = JSON.parse(result.stdout.trim());
expect(cliOutput.status).toBe("idle");
expect(cliOutput.currentRole).toBe("reviewer");
expect(cliOutput.done).toBe(false);
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toContain("Previous run completed");
expect(capturedPrompt).toContain("Additional context");
const storeModule = await import("../store.js");
const uwf2 = await storeModule.createUwfStore(tmpDir);
const entry2 = storeModule.getThread(uwf2.varStore, THREAD_ID);
expect(entry2?.status).toBe("idle");
expect(entry2?.completedAt).toBeNull();
});
test("resume cancelled thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "cancelled-workflow",
description: "cancelled thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: startHash,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
},
});
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("cancelled");
});
test("resume idle thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "idle-workflow",
description: "idle thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
await seedThreads(tmpDir, { [THREAD_ID]: startHash });
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("idle");
});
});
@@ -7,9 +7,8 @@ import { describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import {
addHistoryEntry,
completeThread,
createUwfStore,
deleteThread,
loadAllThreads,
setThread,
} from "../store.js";
@@ -118,7 +117,13 @@ async function insertStepNode(
assembledPrompt: null,
})) as CasRef;
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null });
setThread(uwf.varStore, threadId, {
head: stepHash,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
}
describe("thread show status field", () => {
@@ -208,15 +213,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId);
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
completeThread(uwfForIndex.varStore, threadId, "completed");
const result = await cmdThreadShow(storageRoot, threadId);
@@ -245,15 +242,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId);
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
});
completeThread(uwfForIndex.varStore, threadId, "cancelled");
const result = await cmdThreadShow(storageRoot, threadId);
@@ -282,15 +271,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId);
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: null,
});
completeThread(uwfForIndex.varStore, threadId, "completed");
const result = await cmdThreadShow(storageRoot, threadId);
@@ -160,8 +160,10 @@ describe("suspend step CAS chain and threads.yaml metadata", () => {
const threadEntry = getThread(uwf.varStore, threadId);
expect(threadEntry).toEqual({
head: stepHash,
status: "suspended",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
completedAt: null,
});
const showResult = await cmdThreadShow(tmpDir, threadId);
+27 -24
View File
@@ -11,7 +11,7 @@ import {
THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js";
import type { UwfStore } from "../store.js";
import { addHistoryEntry, createUwfStore } from "../store.js";
import { completeThread, createUwfStore, setThread } from "../store.js";
import { seedThreads } from "./thread-test-helpers.js";
// ── schemas used in tests ────────────────────────────────────────────────────
@@ -745,13 +745,14 @@ describe("cmdStepList with completed threads", () => {
const threadId = "01JTEST0000000000000000A2" as ThreadId;
// Thread is NOT in active index (simulating completed thread)
// But it IS in history variable store
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
setThread(uwf.varStore, threadId, {
head: step2Hash,
completedAt: Date.now(),
reason: null,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const result = await cmdStepList(tmpDir, threadId);
@@ -872,14 +873,15 @@ describe("cmdStepShow with completed threads", () => {
const threadId = "01JTEST0000000000000000B2" as ThreadId;
// Thread is NOT in active index
// But it IS in history variable store
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
// But it IS in the unified store with completed status
setThread(uwf.varStore, threadId, {
head: stepHash,
completedAt: Date.now(),
reason: null,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const result = await cmdStepShow(tmpDir, stepHash);
@@ -934,15 +936,15 @@ describe("cmdThreadRead with completed threads", () => {
});
const threadId = "01JTEST0000000000000000C1" as ThreadId;
// Thread is NOT in active index
// But it IS in history variable store
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
// Thread is in store with completed status
setThread(uwf.varStore, threadId, {
head: stepHash,
completedAt: Date.now(),
reason: null,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -998,13 +1000,14 @@ describe("cmdThreadRead with completed threads", () => {
});
const threadId = "01JTEST0000000000000000C2" as ThreadId;
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
setThread(uwf.varStore, threadId, {
head: step3Hash,
completedAt: Date.now(),
reason: null,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const markdown = await cmdThreadRead(
tmpDir,
+1 -5
View File
@@ -6,7 +6,7 @@ import type {
StepNodePayload,
ThreadId,
} from "@united-workforce/protocol";
import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js";
import { createUwfStore, getThread, type UwfStore } from "../store.js";
type ChainState = {
startHash: CasRef;
@@ -207,10 +207,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
if (entry !== null) {
return entry.head;
}
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`);
}
+2
View File
@@ -114,8 +114,10 @@ export async function cmdStepFork(
const newThreadId = generateUlid(Date.now()) as ThreadId;
setThread(uwf.varStore, newThreadId, {
head: stepHash,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
return {
+87 -78
View File
@@ -38,17 +38,14 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index
import { createIncludeTag } from "../include.js";
import { evaluate, isSuspendResult } from "../moderator/index.js";
import {
addHistoryEntry,
completeThread,
createUwfStore,
deleteThread,
findHistoryEntry,
getThread,
loadAllHistory,
loadAllThreads,
loadActiveThreads,
loadHistoryThreads,
loadWorkflowRegistry,
resolveWorkflowHash,
setThread,
type ThreadHistoryLine,
type UwfStore,
} from "../store.js";
import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js";
@@ -485,20 +482,35 @@ export async function cmdThreadShow(
): Promise<ThreadShowOutput> {
const uwf = await createUwfStore(storageRoot);
const entry = getThread(uwf.varStore, threadId);
if (entry !== null) {
if (entry === null) {
fail(`thread not found: ${threadId}`);
}
const activeHead = entry.head;
const workflow = resolveWorkflowFromHead(uwf, activeHead);
if (workflow === null) {
fail(`failed to resolve workflow from head: ${activeHead}`);
}
const status = await resolveActiveThreadStatus(
storageRoot,
threadId,
uwf,
activeHead,
// Determine if this is a completed/cancelled thread
if (entry.status === "completed" || entry.status === "cancelled") {
const hint = null;
return {
workflow,
);
thread: threadId,
head: activeHead,
status: entry.status,
currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true,
background: null,
hint,
};
}
// Active thread
const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, activeHead, workflow);
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
@@ -519,27 +531,6 @@ export async function cmdThreadShow(
background: null,
hint,
};
}
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed";
return {
workflow: hist.workflow,
thread: threadId,
head: hist.head,
status,
currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true,
background: null,
hint: null,
};
}
fail(`thread not found: ${threadId}`);
}
export type ThreadListItemWithStatus = ThreadListItem & {
@@ -598,15 +589,15 @@ function collectCompletedThreads(
activeIds: Set<ThreadId>,
): ThreadListItemWithStatus[] {
const items: ThreadListItemWithStatus[] = [];
const history = loadAllHistory(varStore);
const history = loadHistoryThreads(varStore);
const seen = new Set<ThreadId>(); // Deduplication (issue #470)
for (const entry of history) {
if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) {
seen.add(entry.thread);
const status = entry.reason === "cancelled" ? "cancelled" : "completed";
for (const [threadId, entry] of Object.entries(history)) {
if (!activeIds.has(threadId as ThreadId) && !seen.has(threadId as ThreadId)) {
seen.add(threadId as ThreadId);
const status = entry.status;
items.push({
thread: entry.thread,
workflow: entry.workflow,
thread: threadId as ThreadId,
workflow: "", // Will be resolved later if needed
head: entry.head,
status,
currentRole: null,
@@ -659,7 +650,7 @@ export async function cmdThreadList(
take: number | null,
): Promise<ThreadListItemWithStatus[]> {
const uwf = await createUwfStore(storageRoot);
const index = loadAllThreads(uwf.varStore);
const index = loadActiveThreads(uwf.varStore);
// Collect active threads
let items = await collectActiveThreads(storageRoot, uwf, index);
@@ -1035,15 +1026,8 @@ function spawnAgent(
return obj as unknown as AdapterOutput;
}
function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void {
deleteThread(uwf.varStore, threadId);
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
function archiveThread(uwf: UwfStore, threadId: ThreadId, _workflow: CasRef, _head: CasRef): void {
completeThread(uwf.varStore, threadId, "completed");
}
export async function cmdThreadResume(
@@ -1067,17 +1051,30 @@ export async function cmdThreadResume(
const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow;
const status = await resolveActiveThreadStatus(
// Check entry.status first for completed/cancelled (like in cmdThreadShow)
let status: ThreadStatus;
if (entry.status === "completed" || entry.status === "cancelled") {
status = entry.status;
} else {
status = await resolveActiveThreadStatus(
storageRoot,
threadId,
uwf,
headHash,
workflowHash,
);
if (status !== "suspended") {
fail(`thread is not suspended: ${threadId} (status: ${status})`);
}
if (status !== "suspended" && status !== "completed") {
fail(`thread cannot be resumed: ${threadId} (status: ${status})`);
}
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
if (status === "suspended") {
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash);
if (suspendFields.suspendedRole === null) {
fail(`thread is suspended but suspendedRole is missing: ${threadId}`);
@@ -1087,10 +1084,6 @@ export async function cmdThreadResume(
}
const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
plog.log(
PL_THREAD_RESUME,
@@ -1102,6 +1095,41 @@ export async function cmdThreadResume(
role: suspendFields.suspendedRole,
prompt: resumePrompt,
});
}
// status === "completed"
const workflow = loadWorkflowPayload(uwf, workflowHash);
const startResult = evaluate(workflow.graph, START_ROLE, {});
if (!startResult.ok) {
fail(`failed to evaluate $START: ${startResult.error.message}`);
}
if (isSuspendResult(startResult.value)) {
fail("workflow cannot start with $SUSPEND");
}
if (startResult.value.role === END_ROLE) {
fail("workflow cannot start with $END");
}
const startRole = startResult.value.role;
const completedPromptPrefix = "Previous run completed. Resuming with additional context.";
const completedResumePrompt =
supplement !== null && supplement !== ""
? `${completedPromptPrefix}\n\n${supplement}`
: completedPromptPrefix;
const updatedEntry = { ...entry, status: "idle" as const, completedAt: null };
setThread(uwf.varStore, threadId, updatedEntry);
plog.log(
PL_THREAD_RESUME,
`resume completed role=${startRole} supplement=${supplement !== null}`,
null,
);
return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, {
role: startRole,
prompt: completedResumePrompt,
});
}
export async function cmdThreadExec(
@@ -1450,10 +1478,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
if (entry !== null) {
return entry.head;
}
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`);
}
@@ -1533,7 +1557,6 @@ export async function cmdThreadCancel(
if (entry === null) {
fail(`thread not active: ${threadId}`);
}
const head = entry.head;
// Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId);
@@ -1546,21 +1569,7 @@ export async function cmdThreadCancel(
await deleteMarker(storageRoot, threadId);
}
const workflow = resolveWorkflowFromHead(uwf, head);
if (workflow === null) {
fail(`failed to resolve workflow from head: ${head}`);
}
deleteThread(uwf.varStore, threadId);
const historyEntry: ThreadHistoryLine = {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
};
addHistoryEntry(uwf.varStore, historyEntry);
completeThread(uwf.varStore, threadId, "cancelled");
return { thread: threadId, cancelled: true };
}
+90 -43
View File
@@ -26,8 +26,6 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
/** Variable name prefix for active thread entries (`@uwf/thread/<thread-id>`). */
export const THREAD_VAR_PREFIX = "@uwf/thread/";
/** Variable name prefix for completed/cancelled thread history (`@uwf/history/<thread-id>`). */
export const HISTORY_VAR_PREFIX = "@uwf/history/";
/** A workflow entry discovered from the project-local .workflows/ directory. */
export type ProjectWorkflowEntry = {
@@ -156,10 +154,6 @@ export function getThreadsPath(storageRoot: string): string {
return join(storageRoot, "threads.yaml");
}
export type ThreadHistoryLine = ThreadListItem & {
completedAt: number;
reason: "completed" | "cancelled" | null;
};
export type UwfStore = {
storageRoot: string;
@@ -179,6 +173,7 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
await migrateThreadsIndexIfNeeded(storageRoot, varStore);
await migrateHistoryIfNeeded(storageRoot, varStore);
migrateHistoryVarsToThreadVars(varStore);
return { storageRoot, store, schemas, varStore };
}
@@ -299,8 +294,10 @@ function threadVarName(threadId: ThreadId): string {
function entryFromVariable(v: { value: string; tags: Record<string, string> }): ThreadIndexEntry {
return {
head: v.value as CasRef,
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null,
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
};
}
@@ -331,21 +328,75 @@ export function setThread(varStore: VarStore, threadId: ThreadId, entry: ThreadI
// Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first.
varStore.remove(name);
const tags: Record<string, string> = {};
if (entry.status !== "idle") {
tags.status = entry.status;
}
if (entry.suspendedRole !== null) {
tags.suspendedRole = entry.suspendedRole;
}
if (entry.suspendMessage !== null) {
tags.suspendMessage = entry.suspendMessage;
}
if (entry.completedAt !== null) {
tags.completedAt = String(entry.completedAt);
}
varStore.set(name, entry.head, { tags });
}
/** Remove an active thread entry (on complete/cancel). */
export function deleteThread(varStore: VarStore, threadId: ThreadId): void {
varStore.remove(threadVarName(threadId));
/** Load only active threads (status not in completed/cancelled). */
export function loadActiveThreads(varStore: VarStore): ThreadsIndex {
const all = loadAllThreads(varStore);
const active: ThreadsIndex = {};
for (const [threadId, entry] of Object.entries(all)) {
if (entry.status !== "completed" && entry.status !== "cancelled") {
active[threadId as ThreadId] = entry;
}
}
return active;
}
function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null {
/** Load only completed/cancelled threads (history). */
export function loadHistoryThreads(varStore: VarStore): ThreadsIndex {
const all = loadAllThreads(varStore);
const history: ThreadsIndex = {};
for (const [threadId, entry] of Object.entries(all)) {
if (entry.status === "completed" || entry.status === "cancelled") {
history[threadId as ThreadId] = entry;
}
}
return history;
}
/** Complete a thread by marking it completed or cancelled. */
export function completeThread(
varStore: VarStore,
threadId: ThreadId,
reason: "completed" | "cancelled",
): void {
const entry = getThread(varStore, threadId);
if (entry === null) {
return;
}
const completed = {
head: entry.head,
status: reason,
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
} as ThreadIndexEntry;
setThread(varStore, threadId, completed);
}
type LegacyHistoryEntry = {
thread: ThreadId;
workflow: CasRef;
head: CasRef;
completedAt: number;
reason: "completed" | "cancelled" | null;
};
function parseLegacyHistoryJsonlLine(trimmed: string): LegacyHistoryEntry | null {
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
@@ -379,7 +430,7 @@ function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null {
return null;
}
/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */
/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/thread/*` variables with status tags. */
export async function migrateHistoryIfNeeded(
storageRoot: string,
varStore: VarStore,
@@ -395,47 +446,43 @@ export async function migrateHistoryIfNeeded(
if (trimmed === "") {
continue;
}
const entry = parseHistoryJsonlLine(trimmed);
const entry = parseLegacyHistoryJsonlLine(trimmed);
if (entry !== null) {
addHistoryEntry(varStore, entry);
const status = entry.reason === "cancelled" ? "cancelled" : "completed";
const threadEntry: ThreadIndexEntry = {
head: entry.head,
status: status as ThreadIndexEntry["status"],
suspendedRole: null,
suspendMessage: null,
completedAt: entry.completedAt,
};
setThread(varStore, entry.thread, threadEntry);
}
}
await rename(path, `${path}.migrated`);
}
export function loadAllHistory(varStore: VarStore): ThreadHistoryLine[] {
const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX });
return vars.map((v) => ({
thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId,
workflow: v.tags.workflow ?? "",
head: v.value as CasRef,
completedAt: Number(v.tags.completedAt ?? "0"),
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
}));
}
/** Migrate `@uwf/history/*` variables to `@uwf/thread/*` with status tags. */
export function migrateHistoryVarsToThreadVars(varStore: VarStore): void {
const LEGACY_HISTORY_VAR_PREFIX = "@uwf/history/";
const vars = varStore.list({ namePrefix: LEGACY_HISTORY_VAR_PREFIX });
export function findHistoryEntry(varStore: VarStore, threadId: ThreadId): ThreadHistoryLine | null {
const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` });
const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`);
if (v === undefined) {
return null;
}
return {
thread: threadId,
workflow: v.tags.workflow ?? "",
for (const v of vars) {
const threadId = v.name.slice(LEGACY_HISTORY_VAR_PREFIX.length) as ThreadId;
const reason = v.tags.reason;
const status = reason === "cancelled" ? "cancelled" : "completed";
const completedAt = Number(v.tags.completedAt ?? Date.now());
const threadEntry: ThreadIndexEntry = {
head: v.value as CasRef,
completedAt: Number(v.tags.completedAt ?? "0"),
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
status: status as ThreadIndexEntry["status"],
suspendedRole: null,
suspendMessage: null,
completedAt,
};
}
export function addHistoryEntry(varStore: VarStore, entry: ThreadHistoryLine): void {
varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, {
tags: {
workflow: entry.workflow,
completedAt: String(entry.completedAt),
reason: entry.reason ?? "completed",
},
});
setThread(varStore, threadId, threadEntry);
varStore.remove(v.name);
}
}
@@ -1,6 +1,7 @@
import { describe, expect, test } from "vitest";
import {
createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
@@ -16,6 +17,8 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
@@ -29,6 +32,40 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
status: "idle",
completedAt: null,
});
});
test("normalizeThreadIndexEntry preserves status and completedAt from new data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("normalizeThreadIndexEntry defaults status=idle, completedAt=null for old data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
@@ -47,10 +84,24 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
status: "suspended",
});
});
test("updateThreadHead clears suspend metadata", () => {
test("serialize completed entry as object", () => {
const entry = markThreadCompleted(
createThreadIndexEntry("0123456789ABC"),
"completed",
1234567890,
);
expect(serializeThreadIndexEntry(entry)).toEqual({
head: "0123456789ABC",
status: "completed",
completedAt: 1234567890,
});
});
test("updateThreadHead clears suspend metadata and resets status to idle", () => {
const suspended = markThreadSuspended(
createThreadIndexEntry("OLDHEAD0123456"),
"worker",
@@ -61,6 +112,44 @@ describe("thread-index", () => {
head: "NEWHEAD01234567",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
test("markThreadSuspended sets status to suspended", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const suspended = markThreadSuspended(entry, "worker", "Waiting for input");
expect(suspended).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Waiting for input",
status: "suspended",
completedAt: null,
});
});
test("markThreadCompleted sets status and completedAt", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const completed = markThreadCompleted(entry, "completed", 1234567890);
expect(completed).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("markThreadCompleted with cancelled status", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const cancelled = markThreadCompleted(entry, "cancelled", 9876543210);
expect(cancelled).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "cancelled",
completedAt: 9876543210,
});
});
@@ -71,6 +160,7 @@ describe("thread-index", () => {
head: "HEAD00000000002",
suspendedRole: "reviewer",
suspendMessage: "Need input",
status: "suspended",
},
};
const parsed = parseThreadsIndex(raw);
+1
View File
@@ -5,6 +5,7 @@ export {
} from "./schemas.js";
export {
createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
+52 -7
View File
@@ -15,10 +15,14 @@ export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null
}
const suspendedRole = rec.suspendedRole;
const suspendMessage = rec.suspendMessage;
const status = rec.status;
const completedAt = rec.completedAt;
return {
head: head as CasRef,
suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null,
suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null,
status: typeof status === "string" ? (status as "idle" | "running" | "suspended" | "completed" | "cancelled") : "idle",
completedAt: typeof completedAt === "number" ? completedAt : null,
};
}
@@ -27,6 +31,8 @@ export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry {
head,
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
};
}
@@ -35,6 +41,8 @@ export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): Thread
head,
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
};
}
@@ -47,21 +55,58 @@ export function markThreadSuspended(
head: entry.head,
suspendedRole,
suspendMessage,
status: "suspended",
completedAt: null,
};
}
export function markThreadCompleted(
entry: ThreadIndexEntry,
status: "completed" | "cancelled",
now: number,
): ThreadIndexEntry {
return {
head: entry.head,
suspendedRole: null,
suspendMessage: null,
status,
completedAt: now,
};
}
/** Serialize for variable store — compact string when not suspended. */
export function serializeThreadIndexEntry(
entry: ThreadIndexEntry,
): string | Record<string, string> {
if (entry.suspendedRole === null || entry.suspendMessage === null) {
): string | Record<string, string | number> {
// Compact string only for idle status with no suspend metadata
if (entry.status === "idle" && entry.suspendedRole === null && entry.suspendMessage === null && entry.completedAt === null) {
return entry.head;
}
return {
// Build object representation
const obj: Record<string, string | number> = {
head: entry.head,
suspendedRole: entry.suspendedRole,
suspendMessage: entry.suspendMessage,
};
// Include suspend metadata if present
if (entry.suspendedRole !== null) {
obj.suspendedRole = entry.suspendedRole;
}
if (entry.suspendMessage !== null) {
obj.suspendMessage = entry.suspendMessage;
}
// Always include status if not idle
if (entry.status !== "idle") {
obj.status = entry.status;
}
// Include completedAt if present
if (entry.completedAt !== null) {
obj.completedAt = entry.completedAt;
}
return obj;
}
export function parseThreadsIndex(raw: unknown): ThreadsIndex {
@@ -80,8 +125,8 @@ export function parseThreadsIndex(raw: unknown): ThreadsIndex {
export function serializeThreadsIndex(
index: ThreadsIndex,
): Record<string, string | Record<string, string>> {
const out: Record<string, string | Record<string, string>> = {};
): Record<string, string | Record<string, string | number>> {
const out: Record<string, string | Record<string, string | number>> = {};
for (const [threadId, entry] of Object.entries(index)) {
out[threadId] = serializeThreadIndexEntry(entry);
}
+2
View File
@@ -118,6 +118,8 @@ export type ThreadIndexEntry = {
head: CasRef;
suspendedRole: string | null;
suspendMessage: string | null;
status: ThreadStatus;
completedAt: number | null;
};
/** uwf thread steps — single step entry */
+2
View File
@@ -82,8 +82,10 @@ export async function getActiveThreadEntry(
}
return {
head: v.value as CasRef,
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null,
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
};
}
+22
View File
@@ -93,6 +93,28 @@ importers:
specifier: ^5.8.3
version: 5.9.3
packages/agent-mock:
dependencies:
'@ocas/core':
specifier: ^0.3.0
version: 0.3.0
'@united-workforce/protocol':
specifier: workspace:^
version: link:../protocol
'@united-workforce/util':
specifier: workspace:^
version: link:../util
'@united-workforce/util-agent':
specifier: workspace:^
version: link:../util-agent
yaml:
specifier: ^2.9.0
version: 2.9.0
devDependencies:
typescript:
specifier: ^5.8.3
version: 5.9.3
packages/cli:
dependencies:
'@ocas/core':
+4
View File
@@ -23,6 +23,10 @@ packages:
path: packages/agent-builtin
type: cli
- name: "@united-workforce/agent-mock"
path: packages/agent-mock
type: cli
- name: "@united-workforce/cli"
path: packages/cli
type: cli
+1
View File
@@ -23,6 +23,7 @@
{ "path": "packages/util-agent" },
{ "path": "packages/agent-hermes" },
{ "path": "packages/agent-builtin" },
{ "path": "packages/agent-mock" },
{ "path": "packages/agent-claude-code" },
{ "path": "packages/cli" }
]