Compare commits

..

14 Commits

Author SHA1 Message Date
xingyue aab846498b fix: fix 7 failing tests (OCAS_DIR → OCAS_HOME + restore workflow destructure)
CI / check (pull_request) Failing after 55s
Root cause: tests used OCAS_DIR env var but store.ts reads OCAS_HOME,
causing tests to hit the global ~/.ocas vars instead of temp dirs.

- store-unified-threads.test.ts: OCAS_DIR → OCAS_HOME (3 tests)
- thread-resume.test.ts: OCAS_DIR → OCAS_HOME (3 tests)
- current-role.test.ts: restore { thread, workflow } destructure
  that was incorrectly removed by biome unsafe fix (1 test)

Result: 745 passed, 0 failed, 1 skipped

Closes #49
2026-06-04 17:51:21 +08:00
xiaomo f56e24cf82 Merge pull request 'test: expand E2E coverage — suspend, count, mustache, completed resume' (#51) from test/33-more-e2e into main
CI / check (push) Failing after 1m28s
test: expand E2E coverage — suspend, count, mustache, completed resume (#51)
2026-06-04 09:04:09 +00:00
xiaoju 974c2b8f1b test: add E2E tests for suspend/resume, --count, mustache, and completed resume (#33)
CI / check (pull_request) Failing after 1m40s
4 new E2E scenarios:
4. $SUSPEND → resume lifecycle (suspendedRole/suspendMessage metadata)
5. --count 3 runs entire pipeline in one invocation
6. mustache template variables rendered into edgePrompt
7. completed thread resume (衔尾蛇: end → start, CAS chain preserved)

Total: 7 E2E scenarios, all passing.
2026-06-04 09:03:01 +00:00
xiaomo 6e7276425d Merge pull request 'chore: fix biome check errors (40 → 0)' (#50) from chore/fix-biome-check into main
CI / check (push) Failing after 1m16s
chore: fix biome check errors (40 → 0) (#50)
2026-06-04 09:01:49 +00:00
xingyue dbb7885ffd chore: fix biome check errors (40 → 0)
CI / check (pull_request) Failing after 1m39s
- Auto-fix: import sorting, formatting (17 files)
- Unsafe auto-fix: unused vars, template literals (7 files)
- Manual: nursery/noConsole → suspicious/noConsole suppression
- Manual: suppress noExcessiveCognitiveComplexity for cmdThreadResume and parseWorkflowPayload
- Manual: remove unused destructured vars in current-role tests

Closes #48
2026-06-04 16:45:45 +08:00
xiaomo cd7e4e77ff Merge pull request 'feat: agent-mock package for deterministic E2E testing (#33)' (#44) from test/33-mock-agent into main
CI / check (push) Failing after 1m38s
feat: agent-mock package for deterministic E2E testing (#44)
2026-06-04 08:38:51 +00:00
xiaomo 64a8bab5ce Merge pull request 'fix: resolve workflow from CAS chain in collectCompletedThreads' (#47) from fix/completed-thread-workflow into main
CI / check (push) Failing after 1m33s
fix: resolve workflow from CAS chain in collectCompletedThreads (#47)
2026-06-04 08:38:06 +00:00
xiaoju 80e8efb05e test: E2E integration tests with uwf-mock agent (#33)
CI / check (pull_request) Failing after 2m30s
Three scenarios testing the full CLI pipeline:
1. Linear workflow (planner → worker → $END): CAS chain integrity
2. Loop workflow (developer ↔ reviewer): moderator routing through cycles
3. Role mismatch detection: agent catches routing bugs

Uses workflow add → thread start → thread exec with uwf-mock,
verifying CAS state, thread lifecycle, and error handling.

Updated assertions to use getThread().status === 'completed'
(aligned with PR #45 unified thread storage).

Refs #33
2026-06-04 08:06:22 +00:00
xiaoju 75fb752a82 feat: add agent-mock package for deterministic E2E testing (#33)
New package @united-workforce/agent-mock (uwf-mock CLI):
- Reads pre-scripted outputs from a YAML mock data file (--mock-data)
- Counts existing CAS chain steps to determine step index
- Validates expected role matches actual moderator routing
- Stores minimal detail node in CAS for valid step refs
- Zero LLM, instant execution, 100% deterministic

Usage in config.yaml:
  agents:
    mock:
      command: uwf-mock
      args: ["--mock-data", "./fixtures/scenario.yaml"]

Refs #33
2026-06-04 08:00:07 +00:00
xingyue 06af1dc668 fix: resolve workflow from CAS chain in collectCompletedThreads
CI / check (pull_request) Failing after 1m28s
Instead of hardcoding workflow as empty string for completed/cancelled
threads, use resolveWorkflowFromHead to get the actual workflow hash
from the CAS chain, consistent with active thread handling.

Closes #46
2026-06-04 15:35:08 +08:00
xiaomo bbea89c067 Merge pull request 'refactor: unified thread storage + resume completed threads' (#45) from refactor/39-unified-thread-storage into main
CI / check (push) Failing after 1m26s
refactor: unified thread storage + resume completed threads (#45)
2026-06-04 07:25:56 +00:00
xingyue bda3e3a861 feat(cli): resume completed threads (衔尾蛇: end → start)
CI / check (pull_request) Failing after 3m45s
uwf thread resume now supports completed threads:
- Evaluates workflow graph from $START to find first role
- Clears completed state (status → idle, completedAt → null)
- Builds resume prompt with supplement context
- Full CAS chain preserved for rich context

Suspended resume behavior unchanged.
Cancelled/idle threads still rejected.

425 tests pass.

Part of #39, closes #43
2026-06-04 15:13:47 +08:00
xingyue ca7b68ca5f refactor(cli): unify thread storage, remove history prefix
- store.ts: all threads in @uwf/thread/* with status tag
- Remove HISTORY_VAR_PREFIX, ThreadHistoryLine, deleteThread
- Add loadActiveThreads, loadHistoryThreads, completeThread
- Add migrateHistoryVarsToThreadVars migration
- thread.ts: replace deleteThread+addHistoryEntry with completeThread
- shared.ts: remove findHistoryEntry fallback
- Update all tests for unified storage model

422 tests pass.

Part of #39, closes #41, closes #42
2026-06-04 15:01:20 +08:00
xingyue 23e2ae9eb4 refactor(protocol): add status + completedAt to ThreadIndexEntry
- ThreadIndexEntry gains status and completedAt fields
- createThreadIndexEntry defaults to idle/null
- normalizeThreadIndexEntry backward-compat defaults
- updateThreadHead resets to idle (衔尾蛇 resume prep)
- markThreadSuspended sets status=suspended
- New markThreadCompleted(entry, status, now) function
- serializeThreadIndexEntry includes new fields

Part of #39, closes #40
2026-06-04 14:42:14 +08:00
40 changed files with 1625 additions and 604 deletions
@@ -1,8 +1,8 @@
import { mkdtemp, rm } from "node:fs/promises"; import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { createMemoryStore } from "@ocas/core"; import { createMemoryStore } from "@ocas/core";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { storeBuiltinDetail } from "../src/detail.js"; import { storeBuiltinDetail } from "../src/detail.js";
import { appendSessionTurn, initSessionDir } from "../src/session.js"; import { appendSessionTurn, initSessionDir } from "../src/session.js";
import type { BuiltinTurnPayload } from "../src/types.js"; import type { BuiltinTurnPayload } from "../src/types.js";
@@ -1,51 +1,51 @@
import { describe, it, expect, beforeAll, afterAll } from "vitest"; import { mkdir, rm, writeFile } from "node:fs/promises";
import { readFileTool } from "../src/tools/read-file.js";
import { writeFile, mkdir, rm } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { readFileTool } from "../src/tools/read-file.js";
const testDir = join(tmpdir(), `read-file-test-${Date.now()}`); const testDir = join(tmpdir(), `read-file-test-${Date.now()}`);
const ctx = { cwd: testDir, storageRoot: testDir }; const ctx = { cwd: testDir, storageRoot: testDir };
beforeAll(async () => { beforeAll(async () => {
await mkdir(testDir, { recursive: true }); await mkdir(testDir, { recursive: true });
await writeFile(join(testDir, "hello.txt"), "hello world", "utf8"); await writeFile(join(testDir, "hello.txt"), "hello world", "utf8");
}); });
afterAll(async () => { afterAll(async () => {
await rm(testDir, { recursive: true, force: true }); await rm(testDir, { recursive: true, force: true });
}); });
describe("readFileTool", () => { describe("readFileTool", () => {
it("reads a file successfully", async () => { it("reads a file successfully", async () => {
const result = await readFileTool.execute({ path: "hello.txt" }, ctx); const result = await readFileTool.execute({ path: "hello.txt" }, ctx);
expect(result).toBe("hello world"); expect(result).toBe("hello world");
}); });
it("returns error for non-existent file", async () => { it("returns error for non-existent file", async () => {
const result = await readFileTool.execute({ path: "nope.txt" }, ctx); const result = await readFileTool.execute({ path: "nope.txt" }, ctx);
expect(result).toMatch(/^Error:/); expect(result).toMatch(/^Error:/);
}); });
it("returns error for directory", async () => { it("returns error for directory", async () => {
const result = await readFileTool.execute({ path: "." }, ctx); const result = await readFileTool.execute({ path: "." }, ctx);
expect(result).toBe("Error: not a file"); expect(result).toBe("Error: not a file");
}); });
it("returns error when path is not a string", async () => { it("returns error when path is not a string", async () => {
const result = await readFileTool.execute({ path: 123 }, ctx); const result = await readFileTool.execute({ path: 123 }, ctx);
expect(result).toBe("Error: path must be a string"); expect(result).toBe("Error: path must be a string");
}); });
it("returns error when args is null", async () => { it("returns error when args is null", async () => {
const result = await readFileTool.execute(null, ctx); const result = await readFileTool.execute(null, ctx);
expect(result).toBe("Error: path must be a string"); expect(result).toBe("Error: path must be a string");
}); });
it("returns error for file exceeding 512KB limit", async () => { it("returns error for file exceeding 512KB limit", async () => {
const bigFile = join(testDir, "big.txt"); const bigFile = join(testDir, "big.txt");
await writeFile(bigFile, Buffer.alloc(512 * 1024 + 1, 65)); await writeFile(bigFile, Buffer.alloc(512 * 1024 + 1, 65));
const result = await readFileTool.execute({ path: "big.txt" }, ctx); const result = await readFileTool.execute({ path: "big.txt" }, ctx);
expect(result).toMatch(/Error:.*limit/); expect(result).toMatch(/Error:.*limit/);
}); });
}); });
@@ -1,38 +1,38 @@
import { describe, it, expect } from "vitest";
import { runCommandTool } from "../src/tools/run-command.js";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { describe, expect, it } from "vitest";
import { runCommandTool } from "../src/tools/run-command.js";
const ctx = { cwd: tmpdir(), storageRoot: tmpdir() }; const ctx = { cwd: tmpdir(), storageRoot: tmpdir() };
describe("runCommandTool", () => { describe("runCommandTool", () => {
it("runs echo command and checks stdout", async () => { it("runs echo command and checks stdout", async () => {
const result = await runCommandTool.execute({ command: "echo hello" }, ctx); const result = await runCommandTool.execute({ command: "echo hello" }, ctx);
expect(result).toContain("hello"); expect(result).toContain("hello");
expect(result).toContain("stdout"); expect(result).toContain("stdout");
}); });
it("returns exit code", async () => { it("returns exit code", async () => {
const result = await runCommandTool.execute({ command: "exit 0" }, ctx); const result = await runCommandTool.execute({ command: "exit 0" }, ctx);
expect(result).toContain("exit_code: 0"); expect(result).toContain("exit_code: 0");
}); });
it("returns non-zero exit code", async () => { it("returns non-zero exit code", async () => {
const result = await runCommandTool.execute({ command: "exit 42" }, ctx); const result = await runCommandTool.execute({ command: "exit 42" }, ctx);
expect(result).toContain("exit_code: 42"); expect(result).toContain("exit_code: 42");
}); });
it("returns error when command is not a string", async () => { it("returns error when command is not a string", async () => {
const result = await runCommandTool.execute({ command: 123 }, ctx); const result = await runCommandTool.execute({ command: 123 }, ctx);
expect(result).toBe("Error: command must be a string"); expect(result).toBe("Error: command must be a string");
}); });
it("returns error when args is null", async () => { it("returns error when args is null", async () => {
const result = await runCommandTool.execute(null, ctx); const result = await runCommandTool.execute(null, ctx);
expect(result).toBe("Error: command must be a string"); expect(result).toBe("Error: command must be a string");
}); });
it("custom cwd works", async () => { it("custom cwd works", async () => {
const result = await runCommandTool.execute({ command: "pwd", cwd: "/tmp" }, ctx); const result = await runCommandTool.execute({ command: "pwd", cwd: "/tmp" }, ctx);
expect(result).toContain("/tmp"); expect(result).toContain("/tmp");
}); });
}); });
@@ -3,13 +3,13 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { afterEach, beforeEach, describe, expect, test } from "vitest";
import type { BuiltinTurnPayload } from "../src/types.js";
import { import {
appendSessionTurn, appendSessionTurn,
initSessionDir, initSessionDir,
readSessionTurns, readSessionTurns,
removeSession, removeSession,
} from "../src/session.js"; } from "../src/session.js";
import type { BuiltinTurnPayload } from "../src/types.js";
describe("session", () => { describe("session", () => {
let storageRoot: string; let storageRoot: string;
@@ -1,43 +1,43 @@
import { describe, it, expect, afterAll } from "vitest";
import { writeFileTool } from "../src/tools/write-file.js";
import { readFile, rm } from "node:fs/promises"; import { readFile, rm } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterAll, describe, expect, it } from "vitest";
import { writeFileTool } from "../src/tools/write-file.js";
const testDir = join(tmpdir(), `write-file-test-${Date.now()}`); const testDir = join(tmpdir(), `write-file-test-${Date.now()}`);
const ctx = { cwd: testDir, storageRoot: testDir }; const ctx = { cwd: testDir, storageRoot: testDir };
afterAll(async () => { afterAll(async () => {
await rm(testDir, { recursive: true, force: true }); await rm(testDir, { recursive: true, force: true });
}); });
describe("writeFileTool", () => { describe("writeFileTool", () => {
it("writes file successfully", async () => { it("writes file successfully", async () => {
const result = await writeFileTool.execute({ path: "out.txt", content: "hi" }, ctx); const result = await writeFileTool.execute({ path: "out.txt", content: "hi" }, ctx);
expect(result).toMatch(/Wrote 2 bytes/); expect(result).toMatch(/Wrote 2 bytes/);
const content = await readFile(join(testDir, "out.txt"), "utf8"); const content = await readFile(join(testDir, "out.txt"), "utf8");
expect(content).toBe("hi"); expect(content).toBe("hi");
}); });
it("creates parent directories", async () => { it("creates parent directories", async () => {
const result = await writeFileTool.execute({ path: "a/b/c.txt", content: "nested" }, ctx); const result = await writeFileTool.execute({ path: "a/b/c.txt", content: "nested" }, ctx);
expect(result).toMatch(/Wrote/); expect(result).toMatch(/Wrote/);
const content = await readFile(join(testDir, "a/b/c.txt"), "utf8"); const content = await readFile(join(testDir, "a/b/c.txt"), "utf8");
expect(content).toBe("nested"); expect(content).toBe("nested");
}); });
it("returns error when path is not a string", async () => { it("returns error when path is not a string", async () => {
const result = await writeFileTool.execute({ path: 123, content: "x" }, ctx); const result = await writeFileTool.execute({ path: 123, content: "x" }, ctx);
expect(result).toBe("Error: path and content must be strings"); expect(result).toBe("Error: path and content must be strings");
}); });
it("returns error when content is not a string", async () => { it("returns error when content is not a string", async () => {
const result = await writeFileTool.execute({ path: "x.txt", content: 42 }, ctx); const result = await writeFileTool.execute({ path: "x.txt", content: 42 }, ctx);
expect(result).toBe("Error: path and content must be strings"); expect(result).toBe("Error: path and content must be strings");
}); });
it("returns error when args is null", async () => { it("returns error when args is null", async () => {
const result = await writeFileTool.execute(null, ctx); const result = await writeFileTool.execute(null, ctx);
expect(result).toBe("Error: path and content must be strings"); expect(result).toBe("Error: path and content must be strings");
}); });
}); });
+17 -38
View File
@@ -6,13 +6,7 @@ import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js"; import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { cmdThreadList, cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import { import { completeThread, createUwfStore, loadActiveThreads, setThread } from "../store.js";
addHistoryEntry,
createUwfStore,
deleteThread,
loadAllThreads,
setThread,
} from "../store.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
type: "object" as const, type: "object" as const,
@@ -175,7 +169,7 @@ async function insertStepNode(
outputPayload: Record<string, unknown>, outputPayload: Record<string, unknown>,
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = loadAllThreads(uwf.varStore); const index = loadActiveThreads(uwf.varStore);
const headEntry = index[threadId]; const headEntry = index[threadId];
if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head; const head = headEntry.head;
@@ -206,7 +200,13 @@ async function insertStepNode(
assembledPrompt: null, assembledPrompt: null,
})) as CasRef; })) as CasRef;
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); setThread(uwf.varStore, threadId, {
head: stepHash,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
} }
describe("currentRole field", () => { describe("currentRole field", () => {
@@ -282,19 +282,12 @@ describe("currentRole field", () => {
try { try {
const wf = join(tmpDir, "test-current-role.yaml"); const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId; const tid = thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
deleteThread(uwfForIndex.varStore, tid); completeThread(uwfForIndex.varStore, tid, "completed");
addHistoryEntry(uwfForIndex.varStore, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
const result = await cmdThreadShow(storageRoot, tid); const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("completed"); expect(result.status).toBe("completed");
@@ -310,19 +303,12 @@ describe("currentRole field", () => {
try { try {
const wf = join(tmpDir, "test-current-role.yaml"); const wf = join(tmpDir, "test-current-role.yaml");
await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8"); await writeFile(wf, SIMPLE_WORKFLOW_YAML, "utf8");
const { thread, workflow } = await cmdThreadStart(storageRoot, wf, "test", tmpDir); const { thread } = await cmdThreadStart(storageRoot, wf, "test", tmpDir);
const tid = thread as ThreadId; const tid = thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const head = loadAllThreads(uwfForIndex.varStore)[tid]!.head; loadActiveThreads(uwfForIndex.varStore)[tid]!.head;
deleteThread(uwfForIndex.varStore, tid); completeThread(uwfForIndex.varStore, tid, "cancelled");
addHistoryEntry(uwfForIndex.varStore, {
thread: tid,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
});
const result = await cmdThreadShow(storageRoot, tid); const result = await cmdThreadShow(storageRoot, tid);
expect(result.status).toBe("cancelled"); expect(result.status).toBe("cancelled");
@@ -375,15 +361,8 @@ describe("currentRole field", () => {
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId; const compId = comp.thread as ThreadId;
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
const compHead = loadAllThreads(uwfForIndex.varStore)[compId]!.head; const _compHead = loadActiveThreads(uwfForIndex.varStore)[compId]!.head;
deleteThread(uwfForIndex.varStore, compId); completeThread(uwfForIndex.varStore, compId, "completed");
addHistoryEntry(uwfForIndex.varStore, {
thread: compId,
workflow: comp.workflow,
head: compHead,
completedAt: Date.now(),
reason: "completed",
});
const list = await cmdThreadList(storageRoot, null, null, null, 0, 100); const list = await cmdThreadList(storageRoot, null, null, null, 0, 100);
+201 -12
View File
@@ -10,7 +10,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, test } from "vitest
import { stringify } from "yaml"; import { stringify } from "yaml";
import { cmdThreadStart } from "../commands/thread.js"; import { cmdThreadStart } from "../commands/thread.js";
import { cmdWorkflowAdd } from "../commands/workflow.js"; import { cmdWorkflowAdd } from "../commands/workflow.js";
import { createUwfStore, findHistoryEntry, getThread } from "../store.js"; import { createUwfStore, getThread } from "../store.js";
// ── paths ────────────────────────────────────────────────────────────────── // ── paths ──────────────────────────────────────────────────────────────────
@@ -106,9 +106,13 @@ async function addWorkflow(workflowFixture: string, workflowName: string): Promi
type ExecResult = { stdout: string; stderr: string; exitCode: number }; type ExecResult = { stdout: string; stderr: string; exitCode: number };
function runExec(threadId: string): ExecResult { function runExec(threadId: string, count: number | null = null): ExecResult {
const args = [CLI_PATH, "thread", "exec", threadId];
if (count !== null) {
args.push("--count", String(count));
}
try { try {
const stdout = execFileSync(process.execPath, [CLI_PATH, "thread", "exec", threadId], { const stdout = execFileSync(process.execPath, args, {
encoding: "utf8", encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"], stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, UWF_HOME: uwfHome, OCAS_HOME: casDir }, env: { ...process.env, UWF_HOME: uwfHome, OCAS_HOME: casDir },
@@ -126,11 +130,38 @@ function runExec(threadId: string): ExecResult {
} }
} }
/** Invoke `uwf thread resume <threadId> -p <prompt>` through the built CLI. */
function runResume(threadId: string, prompt: string): ExecResult {
try {
const stdout = execFileSync(
process.execPath,
[CLI_PATH, "thread", "resume", threadId, "-p", prompt],
{
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, UWF_HOME: uwfHome, OCAS_HOME: casDir },
cwd: tmpDir,
timeout: 30000,
},
);
return { stdout, stderr: "", exitCode: 0 };
} catch (e: unknown) {
const err = e as NodeJS.ErrnoException & {
stdout?: string;
stderr?: string;
status?: number;
};
return { stdout: err.stdout ?? "", stderr: err.stderr ?? "", exitCode: err.status ?? 1 };
}
}
type StepOutputJson = { type StepOutputJson = {
thread: string; thread: string;
head: string; head: string;
status: string; status: string;
currentRole: string | null; currentRole: string | null;
suspendedRole: string | null;
suspendMessage: string | null;
done: boolean; done: boolean;
}; };
@@ -202,12 +233,12 @@ describe("E2E mock-agent: full uwf pipeline", () => {
const startNode = store.cas.get(startHash as CasRef); const startNode = store.cas.get(startHash as CasRef);
expect((startNode!.payload as StartNodePayload).workflow).toBe(workflowHash); expect((startNode!.payload as StartNodePayload).workflow).toBe(workflowHash);
// Thread is completed: removed from active index, present in history. // Thread is completed: status changed to "completed", head updated.
const uwf = await createUwfStore(uwfHome); const uwf = await createUwfStore(uwfHome);
expect(getThread(uwf.varStore, threadId)).toBeNull(); const finalEntry = getThread(uwf.varStore, threadId);
const hist = findHistoryEntry(uwf.varStore, threadId); expect(finalEntry).not.toBeNull();
expect(hist).not.toBeNull(); expect(finalEntry!.status).toBe("completed");
expect(hist!.head).toBe(step2.head); expect(finalEntry!.head).toBe(step2.head);
}); });
test("2. branching workflow loops developer→reviewer→developer→reviewer→$END", async () => { test("2. branching workflow loops developer→reviewer→developer→reviewer→$END", async () => {
@@ -263,8 +294,9 @@ describe("E2E mock-agent: full uwf pipeline", () => {
expect(getStatus(store, n4.output)).toBe("approved"); expect(getStatus(store, n4.output)).toBe("approved");
const uwf = await createUwfStore(uwfHome); const uwf = await createUwfStore(uwfHome);
expect(getThread(uwf.varStore, threadId)).toBeNull(); const finalEntry = getThread(uwf.varStore, threadId);
expect(findHistoryEntry(uwf.varStore, threadId)).not.toBeNull(); expect(finalEntry).not.toBeNull();
expect(finalEntry!.status).toBe("completed");
}); });
test("3. role mismatch in mock data makes the agent exit with an error", async () => { test("3. role mismatch in mock data makes the agent exit with an error", async () => {
@@ -287,7 +319,164 @@ describe("E2E mock-agent: full uwf pipeline", () => {
// The thread remains active (no step node was written for the failed step). // The thread remains active (no step node was written for the failed step).
const uwf = await createUwfStore(uwfHome); const uwf = await createUwfStore(uwfHome);
expect(getThread(uwf.varStore, threadId)).not.toBeNull(); const entry = getThread(uwf.varStore, threadId);
expect(getThread(uwf.varStore, threadId)!.head).toBe(step1.head); expect(entry).not.toBeNull();
expect(entry!.status).not.toBe("completed");
expect(entry!.head).toBe(step1.head);
});
test("4. planner $SUSPEND then resume re-runs planner and reaches $END", async () => {
await writeMockConfig("e2e-suspend.mock.yaml");
const workflowHash = await addWorkflow("e2e-suspend.workflow.yaml", "test-suspend");
const start = await cmdThreadStart(uwfHome, workflowHash, "Analyze the task", uwfHome, tmpDir);
const threadId = start.thread;
// Step 1 → planner emits insufficient_info → thread suspends.
const step1 = execStep(threadId);
expect(step1.status).toBe("suspended");
expect(step1.done).toBe(false);
expect(step1.currentRole).toBeNull();
expect(step1.suspendedRole).toBe("planner");
expect(step1.suspendMessage).toBe("Need more info: missing requirements");
// Thread index entry reflects the suspension with rendered metadata.
const suspendedEntry = getThread((await createUwfStore(uwfHome)).varStore, threadId);
expect(suspendedEntry).not.toBeNull();
expect(suspendedEntry!.status).toBe("suspended");
expect(suspendedEntry!.suspendedRole).toBe("planner");
expect(suspendedEntry!.suspendMessage).toBe("Need more info: missing requirements");
// Resume re-runs the planner role; the second scripted step is `ready` → $END.
const resume = runResume(threadId, "Here are the requirements");
expect(resume.exitCode).toBe(0);
const resumeOut = JSON.parse(resume.stdout.trim()) as StepOutputJson;
expect(resumeOut.status).toBe("completed");
expect(resumeOut.done).toBe(true);
expect(resumeOut.currentRole).toBeNull();
expect(resumeOut.suspendedRole).toBeNull();
// CAS chain: suspended planner step → resumed planner step.
const store = await openStore(casDir);
const s1 = getStepNode(store, step1.head);
const s2 = getStepNode(store, resumeOut.head);
expect(s1.role).toBe("planner");
expect(s2.role).toBe("planner");
expect(s2.prev).toBe(step1.head);
expect(getStatus(store, s1.output)).toBe("insufficient_info");
expect(getStatus(store, s2.output)).toBe("ready");
const finalEntry = getThread((await createUwfStore(uwfHome)).varStore, threadId);
expect(finalEntry).not.toBeNull();
expect(finalEntry!.status).toBe("completed");
expect(finalEntry!.head).toBe(resumeOut.head);
});
test("5. --count 3 runs the whole linear pipeline in one invocation", async () => {
await writeMockConfig("e2e-count.mock.yaml");
const workflowHash = await addWorkflow("e2e-count.workflow.yaml", "test-count");
const start = await cmdThreadStart(uwfHome, workflowHash, "Ship the feature", uwfHome, tmpDir);
const threadId = start.thread;
// Single invocation with --count 3 → moderator drives analyst → developer → reviewer → $END.
const { stdout, stderr, exitCode } = runExec(threadId, 3);
expect(exitCode, `stderr: ${stderr}`).toBe(0);
// Multi-step exec emits a JSON array (one entry per executed step).
const results = JSON.parse(stdout.trim()) as StepOutputJson[];
expect(Array.isArray(results)).toBe(true);
expect(results).toHaveLength(3);
expect(results[0].status).toBe("idle");
expect(results[0].currentRole).toBe("developer");
expect(results[1].status).toBe("idle");
expect(results[1].currentRole).toBe("reviewer");
expect(results[2].status).toBe("completed");
expect(results[2].done).toBe(true);
// Verify the CAS chain holds 3 step nodes in the correct order.
const store = await openStore(casDir);
const n1 = getStepNode(store, results[0].head);
const n2 = getStepNode(store, results[1].head);
const n3 = getStepNode(store, results[2].head);
expect([n1.role, n2.role, n3.role]).toEqual(["analyst", "developer", "reviewer"]);
expect(n1.prev).toBeNull();
expect(n2.prev).toBe(results[0].head);
expect(n3.prev).toBe(results[1].head);
expect(new Set([n1.start, n2.start, n3.start]).size).toBe(1);
const finalEntry = getThread((await createUwfStore(uwfHome)).varStore, threadId);
expect(finalEntry).not.toBeNull();
expect(finalEntry!.status).toBe("completed");
expect(finalEntry!.head).toBe(results[2].head);
});
test("6. mustache edge prompt renders planner variables into the worker step", async () => {
await writeMockConfig("e2e-mustache.mock.yaml");
const workflowHash = await addWorkflow("e2e-mustache.workflow.yaml", "test-mustache");
const start = await cmdThreadStart(uwfHome, workflowHash, "Plan the task", uwfHome, tmpDir);
const threadId = start.thread;
// Step 1 → planner emits branch + repoPath.
const step1 = execStep(threadId);
expect(step1.status).toBe("idle");
expect(step1.currentRole).toBe("worker");
// Step 2 → worker; the moderator renders the templated edge prompt before spawning it.
const step2 = execStep(threadId);
expect(step2.done).toBe(true);
expect(step2.status).toBe("completed");
const store = await openStore(casDir);
const plannerStep = getStepNode(store, step1.head);
expect(getStatus(store, plannerStep.output)).toBe("ready");
// The worker step's edgePrompt is the mustache-rendered template.
const workerStep = getStepNode(store, step2.head);
expect(workerStep.role).toBe("worker");
expect(workerStep.edgePrompt).toContain("fix/42-auth");
expect(workerStep.edgePrompt).toContain("/tmp/my-repo");
expect(workerStep.edgePrompt).toBe("Work on branch fix/42-auth in /tmp/my-repo");
});
test("7. completed thread can be resumed (衔尾蛇: end → start)", async () => {
// Reuse the suspend workflow (planner with ready → $END), but mock data
// goes straight to ready on first run, then ready again after resume.
await writeMockConfig("e2e-completed-resume.mock.yaml");
const workflowHash = await addWorkflow("e2e-suspend.workflow.yaml", "test-suspend");
const start = await cmdThreadStart(uwfHome, workflowHash, "Do the work", uwfHome, tmpDir);
const threadId = start.thread;
// Step 1: planner outputs ready → $END → thread completed.
const step1 = execStep(threadId);
expect(step1.done).toBe(true);
expect(step1.status).toBe("completed");
const uwf1 = await createUwfStore(uwfHome);
const entry1 = getThread(uwf1.varStore, threadId);
expect(entry1).not.toBeNull();
expect(entry1!.status).toBe("completed");
// Resume the completed thread — should re-evaluate $START → planner.
const resumeResult = runResume(threadId, "Additional context for round 2");
expect(resumeResult.exitCode).toBe(0);
// After resume step, planner ran again (step index 1 in mock) → ready → $END.
const uwf2 = await createUwfStore(uwfHome);
const entry2 = getThread(uwf2.varStore, threadId);
expect(entry2).not.toBeNull();
expect(entry2!.status).toBe("completed");
// Head should have advanced (not the same as step1).
expect(entry2!.head).not.toBe(step1.head);
// CAS chain: step2.prev === step1 head (chain is preserved across resume).
const store = await openStore(casDir);
const resumeOutput = JSON.parse(resumeResult.stdout.trim());
const step2Node = getStepNode(store, resumeOutput.head);
expect(step2Node.role).toBe("planner");
expect(step2Node.prev).toBe(step1.head);
}); });
}); });
@@ -0,0 +1,15 @@
steps:
# Step 0: planner → ready → $END (thread completes)
- role: planner
output: |
---
$status: ready
---
Initial plan complete.
# Step 1: after resume, planner runs again from $START → ready → $END again
- role: planner
output: |
---
$status: ready
---
Revised plan after resume.
@@ -0,0 +1,19 @@
steps:
- role: analyst
output: |
---
$status: analyzed
---
Analysis complete.
- role: developer
output: |
---
$status: implemented
---
Implementation complete.
- role: reviewer
output: |
---
$status: approved
---
Approved.
@@ -0,0 +1,45 @@
name: test-count
description: 3-step linear pipeline (analyst -> developer -> reviewer -> $END)
roles:
analyst:
description: Analyzes the task
goal: Analyze the task
capabilities: []
procedure: Analyze it
output: Output the analysis and set $status to analyzed
frontmatter:
oneOf:
- properties:
$status: { const: analyzed }
required: [$status]
developer:
description: Implements the change
goal: Implement the change
capabilities: []
procedure: Write code
output: Output the implementation and set $status to implemented
frontmatter:
oneOf:
- properties:
$status: { const: implemented }
required: [$status]
reviewer:
description: Reviews the change
goal: Review the change
capabilities: []
procedure: Review code
output: Approve and set $status to approved
frontmatter:
oneOf:
- properties:
$status: { const: approved }
required: [$status]
graph:
$START:
_: { role: analyst, prompt: 'Analyze the task' }
analyst:
analyzed: { role: developer, prompt: 'Implement the change' }
developer:
implemented: { role: reviewer, prompt: 'Review the change' }
reviewer:
approved: { role: '$END', prompt: 'Done' }
@@ -0,0 +1,15 @@
steps:
- role: planner
output: |
---
$status: ready
branch: fix/42-auth
repoPath: /tmp/my-repo
---
Planned the work.
- role: worker
output: |
---
$status: done
---
Work complete.
@@ -0,0 +1,34 @@
name: test-mustache
description: Planner emits template variables consumed by the worker edge prompt
roles:
planner:
description: Plans work and emits branch + repo path
goal: Plan the task
capabilities: []
procedure: Decide the branch and repo path
output: Set $status to ready and emit branch and repoPath
frontmatter:
oneOf:
- properties:
$status: { const: ready }
branch: { type: string }
repoPath: { type: string }
required: [$status, branch, repoPath]
worker:
description: Works on the planned branch
goal: Do the work
capabilities: []
procedure: Do it
output: Output the result and set $status to done
frontmatter:
oneOf:
- properties:
$status: { const: done }
required: [$status]
graph:
$START:
_: { role: planner, prompt: 'Plan the task' }
planner:
ready: { role: worker, prompt: 'Work on branch {{{branch}}} in {{{repoPath}}}' }
worker:
done: { role: '$END', prompt: 'Complete' }
@@ -0,0 +1,14 @@
steps:
- role: planner
output: |
---
$status: insufficient_info
reason: missing requirements
---
I need more information before I can plan this.
- role: planner
output: |
---
$status: ready
---
I now have what I need. Ready to proceed.
@@ -0,0 +1,24 @@
name: test-suspend
description: Planner can suspend for more info or finish when ready
roles:
planner:
description: Plans work and may request more info
goal: Analyze the task
capabilities: []
procedure: Analyze the task and decide if more info is needed
output: Set $status to insufficient_info (with reason) or ready
frontmatter:
oneOf:
- properties:
$status: { const: insufficient_info }
reason: { type: string }
required: [$status, reason]
- properties:
$status: { const: ready }
required: [$status]
graph:
$START:
_: { role: planner, prompt: 'Analyze the task' }
planner:
insufficient_info: { role: '$SUSPEND', prompt: 'Need more info: {{{reason}}}' }
ready: { role: '$END', prompt: 'Done' }
@@ -4,7 +4,7 @@ import { join } from "node:path";
import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol"; import { type CasRef, createThreadIndexEntry, type ThreadId } from "@united-workforce/protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest"; import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { resolveHeadHash } from "../commands/shared.js"; import { resolveHeadHash } from "../commands/shared.js";
import { addHistoryEntry, createUwfStore, setThread } from "../store.js"; import { completeThread, createUwfStore, setThread } from "../store.js";
let tmpDir: string; let tmpDir: string;
@@ -31,19 +31,13 @@ describe("resolveHeadHash", () => {
expect(result).toBe(headHash); expect(result).toBe(headHash);
}); });
test("falls back to history variable when thread not in active index", async () => { test("finds completed thread", async () => {
const threadId = "01JTEST0000000000000000002" as ThreadId; const threadId = "01JTEST0000000000000000002" as ThreadId;
const workflowHash = "workflow_hash_789" as CasRef;
const uwf = await createUwfStore(tmpDir); const uwf = await createUwfStore(tmpDir);
const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef; const headHash = (await uwf.store.cas.put(uwf.schemas.text, "completed-head")) as CasRef;
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
thread: threadId, completeThread(uwf.varStore, threadId, "completed");
workflow: workflowHash,
head: headHash,
completedAt: Date.now(),
reason: null,
});
const result = await resolveHeadHash(tmpDir, threadId); const result = await resolveHeadHash(tmpDir, threadId);
@@ -54,58 +48,36 @@ describe("resolveHeadHash", () => {
// calls fail() which does process.exit(1), terminating the test runner. // calls fail() which does process.exit(1), terminating the test runner.
// The error behavior is tested in integration tests below via CLI invocation. // The error behavior is tested in integration tests below via CLI invocation.
test("prioritizes active thread over history when thread exists in both", async () => { test("prioritizes active thread", async () => {
const threadId = "01JTEST0000000000000000004" as ThreadId; const threadId = "01JTEST0000000000000000004" as ThreadId;
const workflowHash = "workflow_hash_xyz" as CasRef;
const uwf = await createUwfStore(tmpDir); const uwf = await createUwfStore(tmpDir);
const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef; const activeHead = (await uwf.store.cas.put(uwf.schemas.text, "active-v2")) as CasRef;
const historicalHash = (await uwf.store.cas.put(uwf.schemas.text, "historical-v1")) as CasRef;
setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead)); setThread(uwf.varStore, threadId, createThreadIndexEntry(activeHead));
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
head: historicalHash,
completedAt: Date.now(),
reason: null,
});
const result = await resolveHeadHash(tmpDir, threadId); const result = await resolveHeadHash(tmpDir, threadId);
// Should return the active head, not the historical one // Should return the active head
expect(result).toBe(activeHead); expect(result).toBe(activeHead);
}); });
test("finds thread from multiple history entries", async () => { test("finds thread from multiple completed threads", async () => {
const threadId1 = "01JTEST0000000000000000005" as ThreadId; const threadId1 = "01JTEST0000000000000000005" as ThreadId;
const threadId2 = "01JTEST0000000000000000006" as ThreadId; const threadId2 = "01JTEST0000000000000000006" as ThreadId;
const threadId3 = "01JTEST0000000000000000007" as ThreadId; const threadId3 = "01JTEST0000000000000000007" as ThreadId;
const workflowHash = "workflow_hash_abc" as CasRef;
const uwf = await createUwfStore(tmpDir); const uwf = await createUwfStore(tmpDir);
const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef; const hash1 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread1")) as CasRef;
const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef; const hash2 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread2")) as CasRef;
const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef; const hash3 = (await uwf.store.cas.put(uwf.schemas.text, "hash-thread3")) as CasRef;
addHistoryEntry(uwf.varStore, {
thread: threadId1, setThread(uwf.varStore, threadId1, createThreadIndexEntry(hash1));
workflow: workflowHash, completeThread(uwf.varStore, threadId1, "completed");
head: hash1,
completedAt: Date.now() - 2000, setThread(uwf.varStore, threadId2, createThreadIndexEntry(hash2));
reason: null, completeThread(uwf.varStore, threadId2, "completed");
});
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId3, createThreadIndexEntry(hash3));
thread: threadId2, completeThread(uwf.varStore, threadId3, "completed");
workflow: workflowHash,
head: hash2,
completedAt: Date.now() - 1000,
reason: null,
});
addHistoryEntry(uwf.varStore, {
thread: threadId3,
workflow: workflowHash,
head: hash3,
completedAt: Date.now(),
reason: null,
});
const result = await resolveHeadHash(tmpDir, threadId2); const result = await resolveHeadHash(tmpDir, threadId2);
@@ -226,19 +226,15 @@ describe("Global CAS directory", () => {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const threadId = "thread-123" as ThreadId; const threadId = "thread-123" as ThreadId;
const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head"); const headHash = await uwf.store.cas.put(uwf.schemas.text, "history-head");
const { addHistoryEntry, findHistoryEntry } = await import("../store.js"); const { completeThread, setThread, getThread } = await import("../store.js");
addHistoryEntry(uwf.varStore, { const { createThreadIndexEntry } = await import("@united-workforce/protocol");
thread: threadId,
workflow: "workflow-456",
head: headHash,
completedAt: Date.now(),
reason: "completed",
});
const entry = findHistoryEntry(uwf.varStore, threadId); setThread(uwf.varStore, threadId, createThreadIndexEntry(headHash));
expect(entry?.thread).toBe(threadId); completeThread(uwf.varStore, threadId, "completed");
expect(entry?.workflow).toBe("workflow-456");
const entry = getThread(uwf.varStore, threadId);
expect(entry?.head).toBe(headHash); expect(entry?.head).toBe(headHash);
expect(entry?.status).toBe("completed");
const { access } = await import("node:fs/promises"); const { access } = await import("node:fs/promises");
await access(join(globalCasDir, "vars")); await access(join(globalCasDir, "vars"));
@@ -274,15 +270,12 @@ describe("Global CAS directory", () => {
); );
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const { findHistoryEntry } = await import("../store.js"); const { getThread } = await import("../store.js");
const entry = findHistoryEntry(uwf.varStore, threadId); const entry = getThread(uwf.varStore, threadId);
expect(entry).toEqual({ expect(entry).not.toBeNull();
thread: threadId, expect(entry?.head).toBe(headHash);
workflow: workflowHash, expect(entry?.status).toBe("cancelled");
head: headHash, expect(entry?.completedAt).toBe(completedAt);
completedAt,
reason: "cancelled",
});
await expect(access(historyPath)).rejects.toThrow(); await expect(access(historyPath)).rejects.toThrow();
const migratedContent = await readFile(`${historyPath}.migrated`, "utf8"); const migratedContent = await readFile(`${historyPath}.migrated`, "utf8");
@@ -0,0 +1,235 @@
import { mkdir, mkdtemp } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest";
import {
completeThread,
createUwfStore,
getThread,
loadActiveThreads,
loadHistoryThreads,
setThread,
} from "../store.js";
async function makeUwfStore(storageRoot: string) {
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
process.env.OCAS_HOME = casDir;
return createUwfStore(storageRoot);
}
async function seedThreadHead(
uwf: Awaited<ReturnType<typeof createUwfStore>>,
label: string,
): Promise<CasRef> {
return (await uwf.store.cas.put(uwf.schemas.text, label)) as CasRef;
}
describe("unified thread storage", () => {
test("loadActiveThreads excludes completed threads", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000ACTIVE1" as ThreadId;
const threadId2 = "01JTEST000000000000ACTIVE2" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "completed-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const active = loadActiveThreads(uwf.varStore);
expect(Object.keys(active)).toHaveLength(1);
expect(active[threadId1]).toBeDefined();
expect(active[threadId2]).toBeUndefined();
});
test("loadActiveThreads excludes cancelled threads", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-active-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000ACTIVE3" as ThreadId;
const threadId2 = "01JTEST000000000000ACTIVE4" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "cancelled-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const active = loadActiveThreads(uwf.varStore);
expect(Object.keys(active)).toHaveLength(1);
expect(active[threadId1]).toBeDefined();
expect(active[threadId2]).toBeUndefined();
});
test("loadHistoryThreads only returns completed and cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-history-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId1 = "01JTEST000000000000HISTOR1" as ThreadId;
const threadId2 = "01JTEST000000000000HISTOR2" as ThreadId;
const threadId3 = "01JTEST000000000000HISTOR3" as ThreadId;
const head1 = await seedThreadHead(uwf, "active-head");
const head2 = await seedThreadHead(uwf, "completed-head");
const head3 = await seedThreadHead(uwf, "cancelled-head");
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
setThread(uwf.varStore, threadId2, {
head: head2,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
setThread(uwf.varStore, threadId3, {
head: head3,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
});
const history = loadHistoryThreads(uwf.varStore);
expect(Object.keys(history)).toHaveLength(2);
expect(history[threadId1]).toBeUndefined();
expect(history[threadId2]).toBeDefined();
expect(history[threadId3]).toBeDefined();
});
test("completeThread marks thread as completed", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE1" as ThreadId;
const head = await seedThreadHead(uwf, "active-head");
setThread(uwf.varStore, threadId, {
head,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.completedAt).toBeDefined();
expect(entry?.completedAt).toBeGreaterThan(0);
});
test("completeThread marks thread as cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE2" as ThreadId;
const head = await seedThreadHead(uwf, "active-head");
setThread(uwf.varStore, threadId, {
head,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId, "cancelled");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("cancelled");
expect(entry?.completedAt).toBeDefined();
expect(entry?.completedAt).toBeGreaterThan(0);
});
test("completeThread clears suspend metadata", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000COMPLE3" as ThreadId;
const head = await seedThreadHead(uwf, "suspended-head");
setThread(uwf.varStore, threadId, {
head,
status: "suspended",
suspendedRole: "test-role",
suspendMessage: "test message",
completedAt: null,
});
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.suspendedRole).toBeNull();
expect(entry?.suspendMessage).toBeNull();
});
test("completeThread handles non-existent thread gracefully", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-complete-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000NOEXIST" as ThreadId;
// Should not throw
completeThread(uwf.varStore, threadId, "completed");
const entry = getThread(uwf.varStore, threadId);
expect(entry).toBeNull();
});
test("status and completedAt tags are persisted and loaded", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-tags-test-"));
const uwf = await makeUwfStore(tmpDir);
const threadId = "01JTEST000000000000TAGTEST" as ThreadId;
const head = await seedThreadHead(uwf, "test-head");
const now = Date.now();
setThread(uwf.varStore, threadId, {
head,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: now,
});
const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
expect(entry?.completedAt).toBe(now);
});
});
@@ -3,7 +3,13 @@ import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { CasRef, ThreadId } from "@united-workforce/protocol"; import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { addHistoryEntry, createUwfStore, loadAllHistory } from "../store.js"; import {
completeThread,
createUwfStore,
getThread,
loadHistoryThreads,
setThread,
} from "../store.js";
async function makeUwfStore(storageRoot: string) { async function makeUwfStore(storageRoot: string) {
const casDir = join(storageRoot, "cas"); const casDir = join(storageRoot, "cas");
@@ -20,88 +26,113 @@ async function seedHistoryHead(
} }
describe("thread cancel status", () => { describe("thread cancel status", () => {
test("cancelled history entry has reason 'cancelled'", async () => { test("cancelled thread has status 'cancelled'", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL1" as ThreadId; const threadId = "01JTEST000000000000CANCEL1" as ThreadId;
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "cancelled-head"); const head = await seedHistoryHead(uwf, "cancelled-head");
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, {
thread: threadId,
workflow: "test-workflow",
head, head,
completedAt: Date.now(), status: "idle",
reason: "cancelled", suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
const history = loadAllHistory(uwf.varStore); completeThread(uwf.varStore, threadId, "cancelled");
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("cancelled"); const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("cancelled");
}); });
test("completed history entry has reason 'completed'", async () => { test("completed thread has status 'completed'", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL2" as ThreadId; const threadId = "01JTEST000000000000CANCEL2" as ThreadId;
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "completed-head"); const head = await seedHistoryHead(uwf, "completed-head");
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, {
thread: threadId,
workflow: "test-workflow",
head, head,
completedAt: Date.now(), status: "idle",
reason: "completed", suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
const history = loadAllHistory(uwf.varStore); completeThread(uwf.varStore, threadId, "completed");
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("completed"); const entry = getThread(uwf.varStore, threadId);
expect(entry).not.toBeNull();
expect(entry?.status).toBe("completed");
}); });
test("history entry with null reason is stored as completed", async () => { test("loadHistoryThreads returns completed and cancelled", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const threadId = "01JTEST000000000000CANCEL3" as ThreadId;
const uwf = await makeUwfStore(tmpDir);
const head = await seedHistoryHead(uwf, "legacy-head");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: "test-workflow",
head,
completedAt: Date.now(),
reason: null,
});
const history = loadAllHistory(uwf.varStore);
expect(history).toHaveLength(1);
expect(history[0]?.reason).toBe("completed");
});
test("mixed completed and cancelled entries preserve distinct reasons", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-")); const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const uwf = await makeUwfStore(tmpDir); const uwf = await makeUwfStore(tmpDir);
const head1 = await seedHistoryHead(uwf, "head1"); const head1 = await seedHistoryHead(uwf, "head1");
const head2 = await seedHistoryHead(uwf, "head2"); const head2 = await seedHistoryHead(uwf, "head2");
addHistoryEntry(uwf.varStore, { const threadId1 = "01JTEST000000000000CANCEL4" as ThreadId;
thread: "01JTEST000000000000CANCEL4" as ThreadId, setThread(uwf.varStore, threadId1, {
workflow: "test-workflow",
head: head1, head: head1,
completedAt: Date.now(), status: "idle",
reason: "completed", suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId1, "completed");
addHistoryEntry(uwf.varStore, { const threadId2 = "01JTEST000000000000CANCEL5" as ThreadId;
thread: "01JTEST000000000000CANCEL5" as ThreadId, setThread(uwf.varStore, threadId2, {
workflow: "test-workflow",
head: head2, head: head2,
completedAt: Date.now(), status: "idle",
reason: "cancelled", suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId2, "cancelled");
const history = loadAllHistory(uwf.varStore); const history = loadHistoryThreads(uwf.varStore);
expect(history).toHaveLength(2); expect(Object.keys(history)).toHaveLength(2);
const reasons = history.map((entry) => entry.reason).sort(); const statuses = Object.values(history)
expect(reasons).toEqual(["cancelled", "completed"]); .map((entry) => entry.status)
.sort();
expect(statuses).toEqual(["cancelled", "completed"]);
});
test("mixed completed and cancelled entries preserve distinct statuses", async () => {
const tmpDir = await mkdtemp(join(tmpdir(), "uwf-cancel-test-"));
const uwf = await makeUwfStore(tmpDir);
const head1 = await seedHistoryHead(uwf, "head1");
const head2 = await seedHistoryHead(uwf, "head2");
const threadId1 = "01JTEST000000000000CANCEL6" as ThreadId;
setThread(uwf.varStore, threadId1, {
head: head1,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId1, "completed");
const threadId2 = "01JTEST000000000000CANCEL7" as ThreadId;
setThread(uwf.varStore, threadId2, {
head: head2,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
completeThread(uwf.varStore, threadId2, "cancelled");
const history = loadHistoryThreads(uwf.varStore);
expect(Object.keys(history)).toHaveLength(2);
const statuses = Object.values(history)
.map((entry) => entry.status)
.sort();
expect(statuses).toEqual(["cancelled", "completed"]);
}); });
}); });
@@ -10,9 +10,8 @@ import { cmdThreadList } from "../commands/thread.js";
import { parseTimeInput } from "../commands/thread-time-parser.js"; import { parseTimeInput } from "../commands/thread-time-parser.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { import {
addHistoryEntry, completeThread as completeThreadInStore,
createUwfStore, createUwfStore,
deleteThread,
loadAllThreads, loadAllThreads,
setThread, setThread,
} from "../store.js"; } from "../store.js";
@@ -73,18 +72,11 @@ async function markThreadRunning(storageRoot: string, threadId: ThreadId, workfl
async function completeThread( async function completeThread(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
workflowHash: CasRef, _workflowHash: CasRef,
headHash: CasRef, _headHash: CasRef,
) { ) {
const uwfIdx = await createUwfStore(storageRoot); const uwfIdx = await createUwfStore(storageRoot);
deleteThread(uwfIdx.varStore, threadId); completeThreadInStore(uwfIdx.varStore, threadId, "completed");
addHistoryEntry(uwfIdx.varStore, {
thread: threadId,
workflow: workflowHash,
head: headHash,
completedAt: Date.now(),
reason: null,
});
} }
// ── test setup ──────────────────────────────────────────────────────────────── // ── test setup ────────────────────────────────────────────────────────────────
@@ -500,8 +492,10 @@ describe("edge cases", () => {
)) as CasRef; )) as CasRef;
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
head: placeholderHead, head: placeholderHead,
status: "idle",
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
completedAt: null,
}; };
for (const [tid, ent] of Object.entries(index)) { for (const [tid, ent] of Object.entries(index)) {
setThread(uwfIdx.varStore, tid as ThreadId, ent); setThread(uwfIdx.varStore, tid as ThreadId, ent);
@@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
await seedThreads(tmpDir, { await seedThreads(tmpDir, {
[THREAD_ID]: { [THREAD_ID]: {
head: stepHash, head: stepHash,
status: "suspended",
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE, suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
}, },
}); });
@@ -247,7 +249,7 @@ describe("uwf thread resume", () => {
const result = runUwf(["thread", "resume", THREAD_ID], casDir); const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0); expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread is not suspended"); expect(result.stderr).toContain("thread cannot be resumed");
}); });
test("resume suspended thread executes step and becomes idle", async () => { test("resume suspended thread executes step and becomes idle", async () => {
@@ -347,8 +349,10 @@ describe("uwf thread resume", () => {
const uwfAfterFirst = await createUwfStore(tmpDir); const uwfAfterFirst = await createUwfStore(tmpDir);
expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({ expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({
head: firstResume.head, head: firstResume.head,
status: "suspended",
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE, suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
}); });
const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent( const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent(
@@ -444,3 +448,266 @@ echo '${adapterJson}'
return { mockAgentPath }; return { mockAgentPath };
} }
describe("uwf thread resume - completed threads", () => {
test("resume completed thread starts from $START role", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "test-completed-resume",
description: "completed thread resume test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
reviewer: {
description: "Reviewer role",
goal: "Review",
capabilities: [],
procedure: "review",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: { _: { role: "reviewer", prompt: "Review the work", location: null } },
reviewer: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Initial task",
cwd: tmpDir,
});
process.env.OCAS_HOME = casDir;
const workerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const reviewerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const detailHash = await store.cas.put(schemas.text, "mock detail");
const workerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600000000,
completedAtMs: 1716600001000,
cwd: tmpDir,
assembledPrompt: null,
});
const reviewerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: workerStepHash,
role: "reviewer",
output: reviewerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Review the work",
startedAtMs: 1716600001000,
completedAtMs: 1716600002000,
cwd: tmpDir,
assembledPrompt: null,
});
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: reviewerStepHash,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: 1716600002000,
},
});
// Verify the status was actually set
const { createUwfStore, getThread } = await import("../store.js");
const verifyUwf = await createUwfStore(tmpDir);
const verifyEntry = getThread(verifyUwf.varStore, THREAD_ID);
// biome-ignore lint/suspicious/noConsole: test debugging
console.log("Seeded entry status:", verifyEntry?.status);
// biome-ignore lint/suspicious/noConsole: test debugging
console.log("Seeded entry:", JSON.stringify(verifyEntry, null, 2));
const promptCapturePath = join(tmpDir, "captured-prompt-completed.txt");
const mockAgentPath = join(tmpDir, "mock-agent-completed.sh");
const newWorkerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: reviewerStepHash,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
cwd: tmpDir,
assembledPrompt: null,
});
const adapterJson = JSON.stringify({
stepHash: newWorkerStepHash,
detailHash,
role: "worker",
frontmatter: { $status: "_" },
body: "",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
});
await writeFile(
mockAgentPath,
`#!/bin/sh
prompt=""
while [ $# -gt 0 ]; do
if [ "$1" = "--prompt" ]; then
prompt="$2"
shift 2
else
shift
fi
done
printf '%s' "$prompt" > '${promptCapturePath}'
echo '${adapterJson}'
`,
{ mode: 0o755 },
);
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
const result = runUwf(
["thread", "resume", THREAD_ID, "-p", "Additional context", "--agent", mockAgentPath],
casDir,
);
if (result.status !== 0) {
// biome-ignore lint/suspicious/noConsole: test debugging
console.error("Command failed:", result.stderr);
}
expect(result.status).toBe(0);
const cliOutput = JSON.parse(result.stdout.trim());
expect(cliOutput.status).toBe("idle");
expect(cliOutput.currentRole).toBe("reviewer");
expect(cliOutput.done).toBe(false);
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toContain("Previous run completed");
expect(capturedPrompt).toContain("Additional context");
const storeModule = await import("../store.js");
const uwf2 = await storeModule.createUwfStore(tmpDir);
const entry2 = storeModule.getThread(uwf2.varStore, THREAD_ID);
expect(entry2?.status).toBe("idle");
expect(entry2?.completedAt).toBeNull();
});
test("resume cancelled thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "cancelled-workflow",
description: "cancelled thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_HOME = casDir;
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: startHash,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
},
});
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("cancelled");
});
test("resume idle thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "idle-workflow",
description: "idle thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_HOME = casDir;
await seedThreads(tmpDir, { [THREAD_ID]: startHash });
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("idle");
});
});
@@ -6,13 +6,7 @@ import type { CasRef, ThreadId } from "@united-workforce/protocol";
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { createMarker, deleteMarker } from "../background/index.js"; import { createMarker, deleteMarker } from "../background/index.js";
import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js"; import { cmdThreadShow, cmdThreadStart } from "../commands/thread.js";
import { import { completeThread, createUwfStore, loadAllThreads, setThread } from "../store.js";
addHistoryEntry,
createUwfStore,
deleteThread,
loadAllThreads,
setThread,
} from "../store.js";
const OUTPUT_SCHEMA = { const OUTPUT_SCHEMA = {
type: "object" as const, type: "object" as const,
@@ -118,7 +112,13 @@ async function insertStepNode(
assembledPrompt: null, assembledPrompt: null,
})) as CasRef; })) as CasRef;
setThread(uwf.varStore, threadId, { head: stepHash, suspendedRole: null, suspendMessage: null }); setThread(uwf.varStore, threadId, {
head: stepHash,
status: "idle",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
});
} }
describe("thread show status field", () => { describe("thread show status field", () => {
@@ -200,7 +200,7 @@ describe("thread show status field", () => {
// Create a thread // Create a thread
const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir); const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir);
const threadId = startResult.thread as ThreadId; const threadId = startResult.thread as ThreadId;
const workflow = startResult.workflow; const _workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
@@ -208,15 +208,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId); completeThread(uwfForIndex.varStore, threadId, "completed");
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
const result = await cmdThreadShow(storageRoot, threadId); const result = await cmdThreadShow(storageRoot, threadId);
@@ -237,7 +229,7 @@ describe("thread show status field", () => {
// Create a thread // Create a thread
const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir); const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir);
const threadId = startResult.thread as ThreadId; const threadId = startResult.thread as ThreadId;
const workflow = startResult.workflow; const _workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
@@ -245,15 +237,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId); completeThread(uwfForIndex.varStore, threadId, "cancelled");
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
});
const result = await cmdThreadShow(storageRoot, threadId); const result = await cmdThreadShow(storageRoot, threadId);
@@ -274,7 +258,7 @@ describe("thread show status field", () => {
// Create a thread // Create a thread
const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir); const startResult = await cmdThreadStart(storageRoot, workflowPath, "test prompt", tmpDir);
const threadId = startResult.thread as ThreadId; const threadId = startResult.thread as ThreadId;
const workflow = startResult.workflow; const _workflow = startResult.workflow;
// Get the head hash before moving to history // Get the head hash before moving to history
const uwfForIndex = await createUwfStore(storageRoot); const uwfForIndex = await createUwfStore(storageRoot);
@@ -282,15 +266,7 @@ describe("thread show status field", () => {
const head = index[threadId]!.head; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
deleteThread(uwfForIndex.varStore, threadId); completeThread(uwfForIndex.varStore, threadId, "completed");
addHistoryEntry(uwfForIndex.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: null,
});
const result = await cmdThreadShow(storageRoot, threadId); const result = await cmdThreadShow(storageRoot, threadId);
@@ -160,8 +160,10 @@ describe("suspend step CAS chain and threads.yaml metadata", () => {
const threadEntry = getThread(uwf.varStore, threadId); const threadEntry = getThread(uwf.varStore, threadId);
expect(threadEntry).toEqual({ expect(threadEntry).toEqual({
head: stepHash, head: stepHash,
status: "suspended",
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?", suspendMessage: "Please clarify: Which API?",
completedAt: null,
}); });
const showResult = await cmdThreadShow(tmpDir, threadId); const showResult = await cmdThreadShow(tmpDir, threadId);
+27 -24
View File
@@ -11,7 +11,7 @@ import {
THREAD_READ_DEFAULT_QUOTA, THREAD_READ_DEFAULT_QUOTA,
} from "../commands/thread.js"; } from "../commands/thread.js";
import type { UwfStore } from "../store.js"; import type { UwfStore } from "../store.js";
import { addHistoryEntry, createUwfStore } from "../store.js"; import { completeThread, createUwfStore, setThread } from "../store.js";
import { seedThreads } from "./thread-test-helpers.js"; import { seedThreads } from "./thread-test-helpers.js";
// ── schemas used in tests ──────────────────────────────────────────────────── // ── schemas used in tests ────────────────────────────────────────────────────
@@ -745,13 +745,14 @@ describe("cmdStepList with completed threads", () => {
const threadId = "01JTEST0000000000000000A2" as ThreadId; const threadId = "01JTEST0000000000000000A2" as ThreadId;
// Thread is NOT in active index (simulating completed thread) // Thread is NOT in active index (simulating completed thread)
// But it IS in history variable store // But it IS in history variable store
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, {
thread: threadId,
workflow: workflowHash,
head: step2Hash, head: step2Hash,
completedAt: Date.now(), status: "idle",
reason: null, suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId, "completed");
const result = await cmdStepList(tmpDir, threadId); const result = await cmdStepList(tmpDir, threadId);
@@ -872,14 +873,15 @@ describe("cmdStepShow with completed threads", () => {
const threadId = "01JTEST0000000000000000B2" as ThreadId; const threadId = "01JTEST0000000000000000B2" as ThreadId;
// Thread is NOT in active index // Thread is NOT in active index
// But it IS in history variable store // But it IS in the unified store with completed status
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, {
thread: threadId,
workflow: workflowHash,
head: stepHash, head: stepHash,
completedAt: Date.now(), status: "idle",
reason: null, suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId, "completed");
const result = await cmdStepShow(tmpDir, stepHash); const result = await cmdStepShow(tmpDir, stepHash);
@@ -934,15 +936,15 @@ describe("cmdThreadRead with completed threads", () => {
}); });
const threadId = "01JTEST0000000000000000C1" as ThreadId; const threadId = "01JTEST0000000000000000C1" as ThreadId;
// Thread is NOT in active index // Thread is in store with completed status
// But it IS in history variable store setThread(uwf.varStore, threadId, {
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow: workflowHash,
head: stepHash, head: stepHash,
completedAt: Date.now(), status: "idle",
reason: null, suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId, "completed");
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false); const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
@@ -998,13 +1000,14 @@ describe("cmdThreadRead with completed threads", () => {
}); });
const threadId = "01JTEST0000000000000000C2" as ThreadId; const threadId = "01JTEST0000000000000000C2" as ThreadId;
addHistoryEntry(uwf.varStore, { setThread(uwf.varStore, threadId, {
thread: threadId,
workflow: workflowHash,
head: step3Hash, head: step3Hash,
completedAt: Date.now(), status: "idle",
reason: null, suspendedRole: null,
suspendMessage: null,
completedAt: null,
}); });
completeThread(uwf.varStore, threadId, "completed");
const markdown = await cmdThreadRead( const markdown = await cmdThreadRead(
tmpDir, tmpDir,
+1 -5
View File
@@ -6,7 +6,7 @@ import type {
StepNodePayload, StepNodePayload,
ThreadId, ThreadId,
} from "@united-workforce/protocol"; } from "@united-workforce/protocol";
import { createUwfStore, findHistoryEntry, getThread, type UwfStore } from "../store.js"; import { createUwfStore, getThread, type UwfStore } from "../store.js";
type ChainState = { type ChainState = {
startHash: CasRef; startHash: CasRef;
@@ -207,10 +207,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
if (entry !== null) { if (entry !== null) {
return entry.head; return entry.head;
} }
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`); fail(`thread not found: ${threadId}`);
} }
+2
View File
@@ -114,8 +114,10 @@ export async function cmdStepFork(
const newThreadId = generateUlid(Date.now()) as ThreadId; const newThreadId = generateUlid(Date.now()) as ThreadId;
setThread(uwf.varStore, newThreadId, { setThread(uwf.varStore, newThreadId, {
head: stepHash, head: stepHash,
status: "idle",
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
completedAt: null,
}); });
return { return {
+112 -107
View File
@@ -38,17 +38,14 @@ import { createMarker, deleteMarker, isThreadRunning } from "../background/index
import { createIncludeTag } from "../include.js"; import { createIncludeTag } from "../include.js";
import { evaluate, isSuspendResult } from "../moderator/index.js"; import { evaluate, isSuspendResult } from "../moderator/index.js";
import { import {
addHistoryEntry, completeThread,
createUwfStore, createUwfStore,
deleteThread,
findHistoryEntry,
getThread, getThread,
loadAllHistory, loadActiveThreads,
loadAllThreads, loadHistoryThreads,
loadWorkflowRegistry, loadWorkflowRegistry,
resolveWorkflowHash, resolveWorkflowHash,
setThread, setThread,
type ThreadHistoryLine,
type UwfStore, type UwfStore,
} from "../store.js"; } from "../store.js";
import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js";
@@ -485,61 +482,55 @@ export async function cmdThreadShow(
): Promise<ThreadShowOutput> { ): Promise<ThreadShowOutput> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const entry = getThread(uwf.varStore, threadId); const entry = getThread(uwf.varStore, threadId);
if (entry !== null) { if (entry === null) {
const activeHead = entry.head; fail(`thread not found: ${threadId}`);
const workflow = resolveWorkflowFromHead(uwf, activeHead); }
if (workflow === null) {
fail(`failed to resolve workflow from head: ${activeHead}`);
}
const status = await resolveActiveThreadStatus( const activeHead = entry.head;
storageRoot, const workflow = resolveWorkflowFromHead(uwf, activeHead);
threadId, if (workflow === null) {
uwf, fail(`failed to resolve workflow from head: ${activeHead}`);
activeHead, }
workflow,
);
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
const hint =
status === "suspended"
? `Thread is suspended. Resume with: uwf thread resume ${threadId}`
: null;
// Determine if this is a completed/cancelled thread
if (entry.status === "completed" || entry.status === "cancelled") {
const hint = null;
return { return {
workflow, workflow,
thread: threadId, thread: threadId,
head: activeHead, head: activeHead,
status, status: entry.status,
currentRole,
suspendedRole: suspendFields.suspendedRole,
suspendMessage: suspendFields.suspendMessage,
done: false,
background: null,
hint,
};
}
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed";
return {
workflow: hist.workflow,
thread: threadId,
head: hist.head,
status,
currentRole: null, currentRole: null,
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
done: true, done: true,
background: null, background: null,
hint: null, hint,
}; };
} }
fail(`thread not found: ${threadId}`); // Active thread
const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, activeHead, workflow);
const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
const hint =
status === "suspended"
? `Thread is suspended. Resume with: uwf thread resume ${threadId}`
: null;
return {
workflow,
thread: threadId,
head: activeHead,
status,
currentRole,
suspendedRole: suspendFields.suspendedRole,
suspendMessage: suspendFields.suspendMessage,
done: false,
background: null,
hint,
};
} }
export type ThreadListItemWithStatus = ThreadListItem & { export type ThreadListItemWithStatus = ThreadListItem & {
@@ -594,19 +585,20 @@ async function collectActiveThreads(
} }
function collectCompletedThreads( function collectCompletedThreads(
varStore: VarStore, uwf: UwfStore,
activeIds: Set<ThreadId>, activeIds: Set<ThreadId>,
): ThreadListItemWithStatus[] { ): ThreadListItemWithStatus[] {
const items: ThreadListItemWithStatus[] = []; const items: ThreadListItemWithStatus[] = [];
const history = loadAllHistory(varStore); const history = loadHistoryThreads(uwf.varStore);
const seen = new Set<ThreadId>(); // Deduplication (issue #470) const seen = new Set<ThreadId>(); // Deduplication (issue #470)
for (const entry of history) { for (const [threadId, entry] of Object.entries(history)) {
if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { if (!activeIds.has(threadId as ThreadId) && !seen.has(threadId as ThreadId)) {
seen.add(entry.thread); seen.add(threadId as ThreadId);
const status = entry.reason === "cancelled" ? "cancelled" : "completed"; const status = entry.status;
const workflow = resolveWorkflowFromHead(uwf, entry.head);
items.push({ items.push({
thread: entry.thread, thread: threadId as ThreadId,
workflow: entry.workflow, workflow: workflow ?? "",
head: entry.head, head: entry.head,
status, status,
currentRole: null, currentRole: null,
@@ -659,7 +651,7 @@ export async function cmdThreadList(
take: number | null, take: number | null,
): Promise<ThreadListItemWithStatus[]> { ): Promise<ThreadListItemWithStatus[]> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = loadAllThreads(uwf.varStore); const index = loadActiveThreads(uwf.varStore);
// Collect active threads // Collect active threads
let items = await collectActiveThreads(storageRoot, uwf, index); let items = await collectActiveThreads(storageRoot, uwf, index);
@@ -671,7 +663,7 @@ export async function cmdThreadList(
statusFilter.includes("cancelled"); statusFilter.includes("cancelled");
if (includeCompleted) { if (includeCompleted) {
const activeIds = new Set(items.map((i) => i.thread)); const activeIds = new Set(items.map((i) => i.thread));
const completedItems = collectCompletedThreads(uwf.varStore, activeIds); const completedItems = collectCompletedThreads(uwf, activeIds);
items = items.concat(completedItems); items = items.concat(completedItems);
} }
@@ -1035,17 +1027,11 @@ function spawnAgent(
return obj as unknown as AdapterOutput; return obj as unknown as AdapterOutput;
} }
function archiveThread(uwf: UwfStore, threadId: ThreadId, workflow: CasRef, head: CasRef): void { function archiveThread(uwf: UwfStore, threadId: ThreadId, _workflow: CasRef, _head: CasRef): void {
deleteThread(uwf.varStore, threadId); completeThread(uwf.varStore, threadId, "completed");
addHistoryEntry(uwf.varStore, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "completed",
});
} }
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: orchestration function with inherent branching
export async function cmdThreadResume( export async function cmdThreadResume(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
@@ -1067,40 +1053,78 @@ export async function cmdThreadResume(
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow; const workflowHash = chain.start.workflow;
const status = await resolveActiveThreadStatus( // Check entry.status first for completed/cancelled (like in cmdThreadShow)
storageRoot, let status: ThreadStatus;
threadId, if (entry.status === "completed" || entry.status === "cancelled") {
uwf, status = entry.status;
headHash, } else {
workflowHash, status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, headHash, workflowHash);
);
if (status !== "suspended") {
fail(`thread is not suspended: ${threadId} (status: ${status})`);
} }
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); if (status !== "suspended" && status !== "completed") {
if (suspendFields.suspendedRole === null) { fail(`thread cannot be resumed: ${threadId} (status: ${status})`);
fail(`thread is suspended but suspendedRole is missing: ${threadId}`);
}
if (suspendFields.suspendMessage === null) {
fail(`thread is suspended but suspendMessage is missing: ${threadId}`);
} }
const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement);
const plog = createProcessLogger({ const plog = createProcessLogger({
storageRoot, storageRoot,
context: { thread: threadId, workflow: workflowHash }, context: { thread: threadId, workflow: workflowHash },
}); });
if (status === "suspended") {
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash);
if (suspendFields.suspendedRole === null) {
fail(`thread is suspended but suspendedRole is missing: ${threadId}`);
}
if (suspendFields.suspendMessage === null) {
fail(`thread is suspended but suspendMessage is missing: ${threadId}`);
}
const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement);
plog.log(
PL_THREAD_RESUME,
`resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`,
null,
);
return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, {
role: suspendFields.suspendedRole,
prompt: resumePrompt,
});
}
// status === "completed"
const workflow = loadWorkflowPayload(uwf, workflowHash);
const startResult = evaluate(workflow.graph, START_ROLE, {});
if (!startResult.ok) {
fail(`failed to evaluate $START: ${startResult.error.message}`);
}
if (isSuspendResult(startResult.value)) {
fail("workflow cannot start with $SUSPEND");
}
if (startResult.value.role === END_ROLE) {
fail("workflow cannot start with $END");
}
const startRole = startResult.value.role;
const completedPromptPrefix = "Previous run completed. Resuming with additional context.";
const completedResumePrompt =
supplement !== null && supplement !== ""
? `${completedPromptPrefix}\n\n${supplement}`
: completedPromptPrefix;
const updatedEntry = { ...entry, status: "idle" as const, completedAt: null };
setThread(uwf.varStore, threadId, updatedEntry);
plog.log( plog.log(
PL_THREAD_RESUME, PL_THREAD_RESUME,
`resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, `resume completed role=${startRole} supplement=${supplement !== null}`,
null, null,
); );
return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, {
role: suspendFields.suspendedRole, role: startRole,
prompt: resumePrompt, prompt: completedResumePrompt,
}); });
} }
@@ -1249,7 +1273,7 @@ function resolveResumeStepTarget(
} }
async function resolveModeratorStepTarget( async function resolveModeratorStepTarget(
storageRoot: string, _storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
entry: ThreadIndexEntry, entry: ThreadIndexEntry,
headHash: CasRef, headHash: CasRef,
@@ -1318,7 +1342,7 @@ async function resolveModeratorStepTarget(
} }
async function finalizeAgentStep( async function finalizeAgentStep(
storageRoot: string, _storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
workflowHash: CasRef, workflowHash: CasRef,
workflow: WorkflowPayload, workflow: WorkflowPayload,
@@ -1450,10 +1474,6 @@ async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise
if (entry !== null) { if (entry !== null) {
return entry.head; return entry.head;
} }
const hist = findHistoryEntry(uwf.varStore, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`); fail(`thread not found: ${threadId}`);
} }
@@ -1533,7 +1553,6 @@ export async function cmdThreadCancel(
if (entry === null) { if (entry === null) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const head = entry.head;
// Check if thread is running in background and terminate it // Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId); const runningMarker = await isThreadRunning(storageRoot, threadId);
@@ -1546,21 +1565,7 @@ export async function cmdThreadCancel(
await deleteMarker(storageRoot, threadId); await deleteMarker(storageRoot, threadId);
} }
const workflow = resolveWorkflowFromHead(uwf, head); completeThread(uwf.varStore, threadId, "cancelled");
if (workflow === null) {
fail(`failed to resolve workflow from head: ${head}`);
}
deleteThread(uwf.varStore, threadId);
const historyEntry: ThreadHistoryLine = {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
reason: "cancelled",
};
addHistoryEntry(uwf.varStore, historyEntry);
return { thread: threadId, cancelled: true }; return { thread: threadId, cancelled: true };
} }
+92 -54
View File
@@ -6,13 +6,7 @@ import { join } from "node:path";
import { bootstrap, type Hash, type Store, type VarStore } from "@ocas/core"; import { bootstrap, type Hash, type Store, type VarStore } from "@ocas/core";
import { createFsStore, createSqliteVarStore } from "@ocas/fs"; import { createFsStore, createSqliteVarStore } from "@ocas/fs";
import type { import type { CasRef, ThreadId, ThreadIndexEntry, ThreadsIndex } from "@united-workforce/protocol";
CasRef,
ThreadId,
ThreadIndexEntry,
ThreadListItem,
ThreadsIndex,
} from "@united-workforce/protocol";
import { parseThreadsIndex } from "@united-workforce/protocol"; import { parseThreadsIndex } from "@united-workforce/protocol";
import { parse } from "yaml"; import { parse } from "yaml";
@@ -26,9 +20,6 @@ export const REGISTRY_VAR_PREFIX = "@uwf/registry/";
/** Variable name prefix for active thread entries (`@uwf/thread/<thread-id>`). */ /** Variable name prefix for active thread entries (`@uwf/thread/<thread-id>`). */
export const THREAD_VAR_PREFIX = "@uwf/thread/"; export const THREAD_VAR_PREFIX = "@uwf/thread/";
/** Variable name prefix for completed/cancelled thread history (`@uwf/history/<thread-id>`). */
export const HISTORY_VAR_PREFIX = "@uwf/history/";
/** A workflow entry discovered from the project-local .workflows/ directory. */ /** A workflow entry discovered from the project-local .workflows/ directory. */
export type ProjectWorkflowEntry = { export type ProjectWorkflowEntry = {
/** Workflow name (from YAML `name` field, equals filename stem). */ /** Workflow name (from YAML `name` field, equals filename stem). */
@@ -156,11 +147,6 @@ export function getThreadsPath(storageRoot: string): string {
return join(storageRoot, "threads.yaml"); return join(storageRoot, "threads.yaml");
} }
export type ThreadHistoryLine = ThreadListItem & {
completedAt: number;
reason: "completed" | "cancelled" | null;
};
export type UwfStore = { export type UwfStore = {
storageRoot: string; storageRoot: string;
store: Store; store: Store;
@@ -179,6 +165,7 @@ export async function createUwfStore(storageRoot: string): Promise<UwfStore> {
await migrateWorkflowRegistryIfNeeded(storageRoot, varStore); await migrateWorkflowRegistryIfNeeded(storageRoot, varStore);
await migrateThreadsIndexIfNeeded(storageRoot, varStore); await migrateThreadsIndexIfNeeded(storageRoot, varStore);
await migrateHistoryIfNeeded(storageRoot, varStore); await migrateHistoryIfNeeded(storageRoot, varStore);
migrateHistoryVarsToThreadVars(varStore);
return { storageRoot, store, schemas, varStore }; return { storageRoot, store, schemas, varStore };
} }
@@ -299,8 +286,10 @@ function threadVarName(threadId: ThreadId): string {
function entryFromVariable(v: { value: string; tags: Record<string, string> }): ThreadIndexEntry { function entryFromVariable(v: { value: string; tags: Record<string, string> }): ThreadIndexEntry {
return { return {
head: v.value as CasRef, head: v.value as CasRef,
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
suspendedRole: v.tags.suspendedRole ?? null, suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null, suspendMessage: v.tags.suspendMessage ?? null,
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
}; };
} }
@@ -331,21 +320,74 @@ export function setThread(varStore: VarStore, threadId: ThreadId, entry: ThreadI
// Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first. // Head CAS nodes may use different schemas (StartNode vs StepNode) — clear all variants first.
varStore.remove(name); varStore.remove(name);
const tags: Record<string, string> = {}; const tags: Record<string, string> = {};
if (entry.status !== "idle") {
tags.status = entry.status;
}
if (entry.suspendedRole !== null) { if (entry.suspendedRole !== null) {
tags.suspendedRole = entry.suspendedRole; tags.suspendedRole = entry.suspendedRole;
} }
if (entry.suspendMessage !== null) { if (entry.suspendMessage !== null) {
tags.suspendMessage = entry.suspendMessage; tags.suspendMessage = entry.suspendMessage;
} }
if (entry.completedAt !== null) {
tags.completedAt = String(entry.completedAt);
}
varStore.set(name, entry.head, { tags }); varStore.set(name, entry.head, { tags });
} }
/** Remove an active thread entry (on complete/cancel). */ /** Load only active threads (status not in completed/cancelled). */
export function deleteThread(varStore: VarStore, threadId: ThreadId): void { export function loadActiveThreads(varStore: VarStore): ThreadsIndex {
varStore.remove(threadVarName(threadId)); const all = loadAllThreads(varStore);
const active: ThreadsIndex = {};
for (const [threadId, entry] of Object.entries(all)) {
if (entry.status !== "completed" && entry.status !== "cancelled") {
active[threadId as ThreadId] = entry;
}
}
return active;
} }
function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null { /** Load only completed/cancelled threads (history). */
export function loadHistoryThreads(varStore: VarStore): ThreadsIndex {
const all = loadAllThreads(varStore);
const history: ThreadsIndex = {};
for (const [threadId, entry] of Object.entries(all)) {
if (entry.status === "completed" || entry.status === "cancelled") {
history[threadId as ThreadId] = entry;
}
}
return history;
}
/** Complete a thread by marking it completed or cancelled. */
export function completeThread(
varStore: VarStore,
threadId: ThreadId,
reason: "completed" | "cancelled",
): void {
const entry = getThread(varStore, threadId);
if (entry === null) {
return;
}
const completed = {
head: entry.head,
status: reason,
suspendedRole: null,
suspendMessage: null,
completedAt: Date.now(),
} as ThreadIndexEntry;
setThread(varStore, threadId, completed);
}
type LegacyHistoryEntry = {
thread: ThreadId;
workflow: CasRef;
head: CasRef;
completedAt: number;
reason: "completed" | "cancelled" | null;
};
function parseLegacyHistoryJsonlLine(trimmed: string): LegacyHistoryEntry | null {
let raw: unknown; let raw: unknown;
try { try {
raw = JSON.parse(trimmed) as unknown; raw = JSON.parse(trimmed) as unknown;
@@ -379,7 +421,7 @@ function parseHistoryJsonlLine(trimmed: string): ThreadHistoryLine | null {
return null; return null;
} }
/** One-time migration: `~/.uwf/history.jsonl` → `@uwf/history/*` variables. */ /** One-time migration: `~/.uwf/history.jsonl` → `@uwf/thread/*` variables with status tags. */
export async function migrateHistoryIfNeeded( export async function migrateHistoryIfNeeded(
storageRoot: string, storageRoot: string,
varStore: VarStore, varStore: VarStore,
@@ -395,47 +437,43 @@ export async function migrateHistoryIfNeeded(
if (trimmed === "") { if (trimmed === "") {
continue; continue;
} }
const entry = parseHistoryJsonlLine(trimmed); const entry = parseLegacyHistoryJsonlLine(trimmed);
if (entry !== null) { if (entry !== null) {
addHistoryEntry(varStore, entry); const status = entry.reason === "cancelled" ? "cancelled" : "completed";
const threadEntry: ThreadIndexEntry = {
head: entry.head,
status: status as ThreadIndexEntry["status"],
suspendedRole: null,
suspendMessage: null,
completedAt: entry.completedAt,
};
setThread(varStore, entry.thread, threadEntry);
} }
} }
await rename(path, `${path}.migrated`); await rename(path, `${path}.migrated`);
} }
export function loadAllHistory(varStore: VarStore): ThreadHistoryLine[] { /** Migrate `@uwf/history/*` variables to `@uwf/thread/*` with status tags. */
const vars = varStore.list({ namePrefix: HISTORY_VAR_PREFIX }); export function migrateHistoryVarsToThreadVars(varStore: VarStore): void {
return vars.map((v) => ({ const LEGACY_HISTORY_VAR_PREFIX = "@uwf/history/";
thread: v.name.slice(HISTORY_VAR_PREFIX.length) as ThreadId, const vars = varStore.list({ namePrefix: LEGACY_HISTORY_VAR_PREFIX });
workflow: v.tags.workflow ?? "",
head: v.value as CasRef,
completedAt: Number(v.tags.completedAt ?? "0"),
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
}));
}
export function findHistoryEntry(varStore: VarStore, threadId: ThreadId): ThreadHistoryLine | null { for (const v of vars) {
const vars = varStore.list({ namePrefix: `${HISTORY_VAR_PREFIX}${threadId}` }); const threadId = v.name.slice(LEGACY_HISTORY_VAR_PREFIX.length) as ThreadId;
const v = vars.find((entry) => entry.name === `${HISTORY_VAR_PREFIX}${threadId}`); const reason = v.tags.reason;
if (v === undefined) { const status = reason === "cancelled" ? "cancelled" : "completed";
return null; const completedAt = Number(v.tags.completedAt ?? Date.now());
const threadEntry: ThreadIndexEntry = {
head: v.value as CasRef,
status: status as ThreadIndexEntry["status"],
suspendedRole: null,
suspendMessage: null,
completedAt,
};
setThread(varStore, threadId, threadEntry);
varStore.remove(v.name);
} }
return {
thread: threadId,
workflow: v.tags.workflow ?? "",
head: v.value as CasRef,
completedAt: Number(v.tags.completedAt ?? "0"),
reason: v.tags.reason === "completed" || v.tags.reason === "cancelled" ? v.tags.reason : null,
};
}
export function addHistoryEntry(varStore: VarStore, entry: ThreadHistoryLine): void {
varStore.set(`${HISTORY_VAR_PREFIX}${entry.thread}`, entry.head, {
tags: {
workflow: entry.workflow,
completedAt: String(entry.completedAt),
reason: entry.reason ?? "completed",
},
});
} }
+1
View File
@@ -96,6 +96,7 @@ export function checkWorkflowFilenameConsistency(
} }
/** Validate YAML-parsed workflow document shape (outputSchema may be inline JSON Schema). */ /** Validate YAML-parsed workflow document shape (outputSchema may be inline JSON Schema). */
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: validation function with many field checks
export function parseWorkflowPayload(raw: unknown): WorkflowPayload | null { export function parseWorkflowPayload(raw: unknown): WorkflowPayload | null {
if (!isRecord(raw)) { if (!isRecord(raw)) {
return null; return null;
@@ -1,6 +1,7 @@
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { import {
createThreadIndexEntry, createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended, markThreadSuspended,
normalizeThreadIndexEntry, normalizeThreadIndexEntry,
parseThreadsIndex, parseThreadsIndex,
@@ -16,6 +17,8 @@ describe("thread-index", () => {
head: "0123456789ABC", head: "0123456789ABC",
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
status: "idle",
completedAt: null,
}); });
}); });
@@ -29,6 +32,40 @@ describe("thread-index", () => {
head: "0123456789ABC", head: "0123456789ABC",
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?", suspendMessage: "Please clarify: Which API?",
status: "idle",
completedAt: null,
});
});
test("normalizeThreadIndexEntry preserves status and completedAt from new data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("normalizeThreadIndexEntry defaults status=idle, completedAt=null for old data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
}); });
}); });
@@ -47,10 +84,24 @@ describe("thread-index", () => {
head: "0123456789ABC", head: "0123456789ABC",
suspendedRole: "worker", suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?", suspendMessage: "Please clarify: Which API?",
status: "suspended",
}); });
}); });
test("updateThreadHead clears suspend metadata", () => { test("serialize completed entry as object", () => {
const entry = markThreadCompleted(
createThreadIndexEntry("0123456789ABC"),
"completed",
1234567890,
);
expect(serializeThreadIndexEntry(entry)).toEqual({
head: "0123456789ABC",
status: "completed",
completedAt: 1234567890,
});
});
test("updateThreadHead clears suspend metadata and resets status to idle", () => {
const suspended = markThreadSuspended( const suspended = markThreadSuspended(
createThreadIndexEntry("OLDHEAD0123456"), createThreadIndexEntry("OLDHEAD0123456"),
"worker", "worker",
@@ -61,6 +112,44 @@ describe("thread-index", () => {
head: "NEWHEAD01234567", head: "NEWHEAD01234567",
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
status: "idle",
completedAt: null,
});
});
test("markThreadSuspended sets status to suspended", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const suspended = markThreadSuspended(entry, "worker", "Waiting for input");
expect(suspended).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Waiting for input",
status: "suspended",
completedAt: null,
});
});
test("markThreadCompleted sets status and completedAt", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const completed = markThreadCompleted(entry, "completed", 1234567890);
expect(completed).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("markThreadCompleted with cancelled status", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const cancelled = markThreadCompleted(entry, "cancelled", 9876543210);
expect(cancelled).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "cancelled",
completedAt: 9876543210,
}); });
}); });
@@ -71,6 +160,7 @@ describe("thread-index", () => {
head: "HEAD00000000002", head: "HEAD00000000002",
suspendedRole: "reviewer", suspendedRole: "reviewer",
suspendMessage: "Need input", suspendMessage: "Need input",
status: "suspended",
}, },
}; };
const parsed = parseThreadsIndex(raw); const parsed = parseThreadsIndex(raw);
+1
View File
@@ -5,6 +5,7 @@ export {
} from "./schemas.js"; } from "./schemas.js";
export { export {
createThreadIndexEntry, createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended, markThreadSuspended,
normalizeThreadIndexEntry, normalizeThreadIndexEntry,
parseThreadsIndex, parseThreadsIndex,
+60 -7
View File
@@ -15,10 +15,17 @@ export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null
} }
const suspendedRole = rec.suspendedRole; const suspendedRole = rec.suspendedRole;
const suspendMessage = rec.suspendMessage; const suspendMessage = rec.suspendMessage;
const status = rec.status;
const completedAt = rec.completedAt;
return { return {
head: head as CasRef, head: head as CasRef,
suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null, suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null,
suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null, suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null,
status:
typeof status === "string"
? (status as "idle" | "running" | "suspended" | "completed" | "cancelled")
: "idle",
completedAt: typeof completedAt === "number" ? completedAt : null,
}; };
} }
@@ -27,6 +34,8 @@ export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry {
head, head,
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
status: "idle",
completedAt: null,
}; };
} }
@@ -35,6 +44,8 @@ export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): Thread
head, head,
suspendedRole: null, suspendedRole: null,
suspendMessage: null, suspendMessage: null,
status: "idle",
completedAt: null,
}; };
} }
@@ -47,21 +58,63 @@ export function markThreadSuspended(
head: entry.head, head: entry.head,
suspendedRole, suspendedRole,
suspendMessage, suspendMessage,
status: "suspended",
completedAt: null,
};
}
export function markThreadCompleted(
entry: ThreadIndexEntry,
status: "completed" | "cancelled",
now: number,
): ThreadIndexEntry {
return {
head: entry.head,
suspendedRole: null,
suspendMessage: null,
status,
completedAt: now,
}; };
} }
/** Serialize for variable store — compact string when not suspended. */ /** Serialize for variable store — compact string when not suspended. */
export function serializeThreadIndexEntry( export function serializeThreadIndexEntry(
entry: ThreadIndexEntry, entry: ThreadIndexEntry,
): string | Record<string, string> { ): string | Record<string, string | number> {
if (entry.suspendedRole === null || entry.suspendMessage === null) { // Compact string only for idle status with no suspend metadata
if (
entry.status === "idle" &&
entry.suspendedRole === null &&
entry.suspendMessage === null &&
entry.completedAt === null
) {
return entry.head; return entry.head;
} }
return {
// Build object representation
const obj: Record<string, string | number> = {
head: entry.head, head: entry.head,
suspendedRole: entry.suspendedRole,
suspendMessage: entry.suspendMessage,
}; };
// Include suspend metadata if present
if (entry.suspendedRole !== null) {
obj.suspendedRole = entry.suspendedRole;
}
if (entry.suspendMessage !== null) {
obj.suspendMessage = entry.suspendMessage;
}
// Always include status if not idle
if (entry.status !== "idle") {
obj.status = entry.status;
}
// Include completedAt if present
if (entry.completedAt !== null) {
obj.completedAt = entry.completedAt;
}
return obj;
} }
export function parseThreadsIndex(raw: unknown): ThreadsIndex { export function parseThreadsIndex(raw: unknown): ThreadsIndex {
@@ -80,8 +133,8 @@ export function parseThreadsIndex(raw: unknown): ThreadsIndex {
export function serializeThreadsIndex( export function serializeThreadsIndex(
index: ThreadsIndex, index: ThreadsIndex,
): Record<string, string | Record<string, string>> { ): Record<string, string | Record<string, string | number>> {
const out: Record<string, string | Record<string, string>> = {}; const out: Record<string, string | Record<string, string | number>> = {};
for (const [threadId, entry] of Object.entries(index)) { for (const [threadId, entry] of Object.entries(index)) {
out[threadId] = serializeThreadIndexEntry(entry); out[threadId] = serializeThreadIndexEntry(entry);
} }
+2
View File
@@ -118,6 +118,8 @@ export type ThreadIndexEntry = {
head: CasRef; head: CasRef;
suspendedRole: string | null; suspendedRole: string | null;
suspendMessage: string | null; suspendMessage: string | null;
status: ThreadStatus;
completedAt: number | null;
}; };
/** uwf thread steps — single step entry */ /** uwf thread steps — single step entry */
@@ -1,15 +1,15 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { parseArgv } from "../src/run.js"; import { parseArgv } from "../src/run.js";
describe("parseArgv", () => { describe("parseArgv", () => {
let exitSpy: ReturnType<typeof vi.spyOn>; let exitSpy: ReturnType<typeof vi.spyOn>;
let stderrSpy: ReturnType<typeof vi.spyOn>; let _stderrSpy: ReturnType<typeof vi.spyOn>;
beforeEach(() => { beforeEach(() => {
exitSpy = vi.spyOn(process, "exit").mockImplementation((() => { exitSpy = vi.spyOn(process, "exit").mockImplementation((() => {
throw new Error("process.exit"); throw new Error("process.exit");
}) as never); }) as never);
stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((() => true) as never); _stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((() => true) as never);
}); });
afterEach(() => { afterEach(() => {
@@ -17,22 +17,37 @@ describe("parseArgv", () => {
}); });
it("returns threadId, role, prompt for valid argv", () => { it("returns threadId, role, prompt for valid argv", () => {
const result = parseArgv(["node", "script", "--thread", "abc123", "--role", "developer", "--prompt", "do stuff"]); const result = parseArgv([
"node",
"script",
"--thread",
"abc123",
"--role",
"developer",
"--prompt",
"do stuff",
]);
expect(result).toEqual({ threadId: "abc123", role: "developer", prompt: "do stuff" }); expect(result).toEqual({ threadId: "abc123", role: "developer", prompt: "do stuff" });
}); });
it("exits when --thread is missing", () => { it("exits when --thread is missing", () => {
expect(() => parseArgv(["node", "script", "--role", "dev", "--prompt", "x"])).toThrow("process.exit"); expect(() => parseArgv(["node", "script", "--role", "dev", "--prompt", "x"])).toThrow(
"process.exit",
);
expect(exitSpy).toHaveBeenCalledWith(1); expect(exitSpy).toHaveBeenCalledWith(1);
}); });
it("exits when --role is missing", () => { it("exits when --role is missing", () => {
expect(() => parseArgv(["node", "script", "--thread", "t1", "--prompt", "x"])).toThrow("process.exit"); expect(() => parseArgv(["node", "script", "--thread", "t1", "--prompt", "x"])).toThrow(
"process.exit",
);
expect(exitSpy).toHaveBeenCalledWith(1); expect(exitSpy).toHaveBeenCalledWith(1);
}); });
it("exits when --prompt is missing", () => { it("exits when --prompt is missing", () => {
expect(() => parseArgv(["node", "script", "--thread", "t1", "--role", "dev"])).toThrow("process.exit"); expect(() => parseArgv(["node", "script", "--thread", "t1", "--role", "dev"])).toThrow(
"process.exit",
);
expect(exitSpy).toHaveBeenCalledWith(1); expect(exitSpy).toHaveBeenCalledWith(1);
}); });
}); });
+18 -13
View File
@@ -1,14 +1,14 @@
import { homedir } from "node:os"; import { homedir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { describe, it, expect } from "vitest"; import { describe, expect, it } from "vitest";
import { import {
resolveStorageRoot,
getDefaultStorageRoot,
getCasDir, getCasDir,
getConfigPath, getConfigPath,
getDefaultStorageRoot,
getEnvPath, getEnvPath,
getGlobalCasDir, getGlobalCasDir,
normalizeWorkflowConfig, normalizeWorkflowConfig,
resolveStorageRoot,
} from "../src/storage.js"; } from "../src/storage.js";
const VALID_CONFIG = { const VALID_CONFIG = {
@@ -79,28 +79,33 @@ describe("normalizeWorkflowConfig", () => {
}); });
it("throws when defaultAgent missing", () => { it("throws when defaultAgent missing", () => {
expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, defaultAgent: undefined })) expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, defaultAgent: undefined })).toThrow(
.toThrow("defaultAgent and defaultModel"); "defaultAgent and defaultModel",
);
}); });
it("throws when defaultModel missing", () => { it("throws when defaultModel missing", () => {
expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, defaultModel: 42 })) expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, defaultModel: 42 })).toThrow(
.toThrow("defaultAgent and defaultModel"); "defaultAgent and defaultModel",
);
}); });
it("throws on invalid providers entry", () => { it("throws on invalid providers entry", () => {
expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, providers: { bad: "string" } })) expect(() =>
.toThrow("config.providers.bad must be a mapping"); normalizeWorkflowConfig({ ...VALID_CONFIG, providers: { bad: "string" } }),
).toThrow("config.providers.bad must be a mapping");
}); });
it("throws on invalid models entry", () => { it("throws on invalid models entry", () => {
expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, models: { m: { provider: 123, name: "x" } } })) expect(() =>
.toThrow("config.models.m requires provider and name"); normalizeWorkflowConfig({ ...VALID_CONFIG, models: { m: { provider: 123, name: "x" } } }),
).toThrow("config.models.m requires provider and name");
}); });
it("throws on invalid agents entry", () => { it("throws on invalid agents entry", () => {
expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, agents: "bad" })) expect(() => normalizeWorkflowConfig({ ...VALID_CONFIG, agents: "bad" })).toThrow(
.toThrow("config.agents must be a mapping"); "config.agents must be a mapping",
);
}); });
it("returns null for undefined modelOverrides", () => { it("returns null for undefined modelOverrides", () => {
+2
View File
@@ -82,8 +82,10 @@ export async function getActiveThreadEntry(
} }
return { return {
head: v.value as CasRef, head: v.value as CasRef,
status: (v.tags.status ?? "idle") as ThreadIndexEntry["status"],
suspendedRole: v.tags.suspendedRole ?? null, suspendedRole: v.tags.suspendedRole ?? null,
suspendMessage: v.tags.suspendMessage ?? null, suspendMessage: v.tags.suspendMessage ?? null,
completedAt: v.tags.completedAt !== undefined ? Number(v.tags.completedAt) : null,
}; };
} }
+3 -3
View File
@@ -1,10 +1,10 @@
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { import {
CROCKFORD_BASE32_ALPHABET, CROCKFORD_BASE32_ALPHABET,
encodeCrockfordBase32Bits,
decodeCrockfordBase32Bits, decodeCrockfordBase32Bits,
encodeUint64AsCrockford,
decodeCrockfordToUint64, decodeCrockfordToUint64,
encodeCrockfordBase32Bits,
encodeUint64AsCrockford,
} from "../src/base32.js"; } from "../src/base32.js";
describe("CROCKFORD_BASE32_ALPHABET", () => { describe("CROCKFORD_BASE32_ALPHABET", () => {
@@ -105,7 +105,7 @@ describe("encodeUint64AsCrockford / decodeCrockfordToUint64", () => {
}); });
it("roundtrips arbitrary value", () => { it("roundtrips arbitrary value", () => {
const value = 0xDEAD_BEEF_CAFE_BABEn; const value = 0xdead_beef_cafe_baben;
const encoded = encodeUint64AsCrockford(value); const encoded = encodeUint64AsCrockford(value);
const decoded = decodeCrockfordToUint64(encoded); const decoded = decodeCrockfordToUint64(encoded);
expect(decoded).toEqual({ ok: true, value }); expect(decoded).toEqual({ ok: true, value });
+25 -25
View File
@@ -1,38 +1,38 @@
import { describe, it, expect } from 'vitest'; import { describe, expect, it } from "vitest";
import { assertValidLogTag } from '../src/process-logger/log-tag.js'; import { assertValidLogTag } from "../src/process-logger/log-tag.js";
describe('assertValidLogTag', () => { describe("assertValidLogTag", () => {
it('accepts valid 8-char Crockford Base32 tags', () => { it("accepts valid 8-char Crockford Base32 tags", () => {
expect(() => assertValidLogTag('0123ABCD')).not.toThrow(); expect(() => assertValidLogTag("0123ABCD")).not.toThrow();
expect(() => assertValidLogTag('VWXYZ789')).not.toThrow(); expect(() => assertValidLogTag("VWXYZ789")).not.toThrow();
expect(() => assertValidLogTag('00000000')).not.toThrow(); expect(() => assertValidLogTag("00000000")).not.toThrow();
expect(() => assertValidLogTag('ZZZZZZZZ')).not.toThrow(); expect(() => assertValidLogTag("ZZZZZZZZ")).not.toThrow();
}); });
it('accepts lowercase (converted via toUpperCase)', () => { it("accepts lowercase (converted via toUpperCase)", () => {
expect(() => assertValidLogTag('abcdefgh')).not.toThrow(); expect(() => assertValidLogTag("abcdefgh")).not.toThrow();
expect(() => assertValidLogTag('0a1b2c3d')).not.toThrow(); expect(() => assertValidLogTag("0a1b2c3d")).not.toThrow();
}); });
it('throws on too short', () => { it("throws on too short", () => {
expect(() => assertValidLogTag('1234567')).toThrow(); expect(() => assertValidLogTag("1234567")).toThrow();
expect(() => assertValidLogTag('')).toThrow(); expect(() => assertValidLogTag("")).toThrow();
}); });
it('throws on too long', () => { it("throws on too long", () => {
expect(() => assertValidLogTag('123456789')).toThrow(); expect(() => assertValidLogTag("123456789")).toThrow();
}); });
it('throws on invalid chars I, L, O, U', () => { it("throws on invalid chars I, L, O, U", () => {
expect(() => assertValidLogTag('IIIIIIII')).toThrow(); expect(() => assertValidLogTag("IIIIIIII")).toThrow();
expect(() => assertValidLogTag('LLLLLLLL')).toThrow(); expect(() => assertValidLogTag("LLLLLLLL")).toThrow();
expect(() => assertValidLogTag('OOOOOOOO')).toThrow(); expect(() => assertValidLogTag("OOOOOOOO")).toThrow();
expect(() => assertValidLogTag('UUUUUUUU')).toThrow(); expect(() => assertValidLogTag("UUUUUUUU")).toThrow();
}); });
it('throws on special characters', () => { it("throws on special characters", () => {
expect(() => assertValidLogTag('1234567!')).toThrow(); expect(() => assertValidLogTag("1234567!")).toThrow();
expect(() => assertValidLogTag('ABCD-EFG')).toThrow(); expect(() => assertValidLogTag("ABCD-EFG")).toThrow();
expect(() => assertValidLogTag('ABCD EFG')).toThrow(); expect(() => assertValidLogTag("ABCD EFG")).toThrow();
}); });
}); });
+18 -18
View File
@@ -1,40 +1,40 @@
import { describe, it, expect } from 'vitest'; import { describe, expect, it } from "vitest";
import { mergeRefsWithContentHash, normalizeRefsField } from '../src/refs-field.js'; import { mergeRefsWithContentHash, normalizeRefsField } from "../src/refs-field.js";
describe('mergeRefsWithContentHash', () => { describe("mergeRefsWithContentHash", () => {
it('appends a new content hash', () => { it("appends a new content hash", () => {
expect(mergeRefsWithContentHash(['a', 'b'], 'c')).toEqual(['a', 'b', 'c']); expect(mergeRefsWithContentHash(["a", "b"], "c")).toEqual(["a", "b", "c"]);
}); });
it('skips duplicate content hash', () => { it("skips duplicate content hash", () => {
expect(mergeRefsWithContentHash(['a', 'b'], 'b')).toEqual(['a', 'b']); expect(mergeRefsWithContentHash(["a", "b"], "b")).toEqual(["a", "b"]);
}); });
it('preserves order', () => { it("preserves order", () => {
expect(mergeRefsWithContentHash(['x', 'y'], 'z')).toEqual(['x', 'y', 'z']); expect(mergeRefsWithContentHash(["x", "y"], "z")).toEqual(["x", "y", "z"]);
}); });
it('handles empty refs', () => { it("handles empty refs", () => {
expect(mergeRefsWithContentHash([], 'a')).toEqual(['a']); expect(mergeRefsWithContentHash([], "a")).toEqual(["a"]);
}); });
}); });
describe('normalizeRefsField', () => { describe("normalizeRefsField", () => {
it('returns empty array for non-array', () => { it("returns empty array for non-array", () => {
expect(normalizeRefsField(null)).toEqual([]); expect(normalizeRefsField(null)).toEqual([]);
expect(normalizeRefsField(undefined)).toEqual([]); expect(normalizeRefsField(undefined)).toEqual([]);
expect(normalizeRefsField(42)).toEqual([]); expect(normalizeRefsField(42)).toEqual([]);
}); });
it('passes through string array', () => { it("passes through string array", () => {
expect(normalizeRefsField(['a', 'b'])).toEqual(['a', 'b']); expect(normalizeRefsField(["a", "b"])).toEqual(["a", "b"]);
}); });
it('filters non-strings from mixed array', () => { it("filters non-strings from mixed array", () => {
expect(normalizeRefsField(['a', 1, 'b', null])).toEqual(['a', 'b']); expect(normalizeRefsField(["a", 1, "b", null])).toEqual(["a", "b"]);
}); });
it('handles empty array', () => { it("handles empty array", () => {
expect(normalizeRefsField([])).toEqual([]); expect(normalizeRefsField([])).toEqual([]);
}); });
}); });
+19 -19
View File
@@ -1,36 +1,36 @@
import { describe, it, expect } from 'vitest'; import { describe, expect, it } from "vitest";
import { ok, err } from '../src/result.js'; import { err, ok } from "../src/result.js";
describe('result', () => { describe("result", () => {
describe('ok', () => { describe("ok", () => {
it('wraps a value', () => { it("wraps a value", () => {
const r = ok(42); const r = ok(42);
expect(r).toEqual({ ok: true, value: 42 }); expect(r).toEqual({ ok: true, value: 42 });
}); });
it('wraps a string value', () => { it("wraps a string value", () => {
const r = ok('hello'); const r = ok("hello");
expect(r.ok).toBe(true); expect(r.ok).toBe(true);
if (r.ok) expect(r.value).toBe('hello'); if (r.ok) expect(r.value).toBe("hello");
}); });
}); });
describe('err', () => { describe("err", () => {
it('wraps an error', () => { it("wraps an error", () => {
const r = err('fail'); const r = err("fail");
expect(r).toEqual({ ok: false, error: 'fail' }); expect(r).toEqual({ ok: false, error: "fail" });
}); });
it('wraps an Error object', () => { it("wraps an Error object", () => {
const e = new Error('boom'); const e = new Error("boom");
const r = err(e); const r = err(e);
expect(r.ok).toBe(false); expect(r.ok).toBe(false);
if (!r.ok) expect(r.error).toBe(e); if (!r.ok) expect(r.error).toBe(e);
}); });
}); });
describe('type narrowing', () => { describe("type narrowing", () => {
it('narrows ok result', () => { it("narrows ok result", () => {
const r = ok(10) as ReturnType<typeof ok<number>> | ReturnType<typeof err<string>>; const r = ok(10) as ReturnType<typeof ok<number>> | ReturnType<typeof err<string>>;
if (r.ok) { if (r.ok) {
expect(r.value).toBe(10); expect(r.value).toBe(10);
@@ -39,10 +39,10 @@ describe('result', () => {
} }
}); });
it('narrows err result', () => { it("narrows err result", () => {
const r = err('bad') as ReturnType<typeof ok<number>> | ReturnType<typeof err<string>>; const r = err("bad") as ReturnType<typeof ok<number>> | ReturnType<typeof err<string>>;
if (!r.ok) { if (!r.ok) {
expect(r.error).toBe('bad'); expect(r.error).toBe("bad");
} else { } else {
expect.unreachable(); expect.unreachable();
} }
+17 -13
View File
@@ -1,25 +1,29 @@
import { describe, it, expect } from 'vitest'; import { homedir } from "node:os";
import { homedir } from 'node:os'; import { describe, expect, it } from "vitest";
import { getDefaultStorageRoot, getDefaultWorkflowStorageRoot, getGlobalCasDir } from '../src/storage-root.js'; import {
getDefaultStorageRoot,
getDefaultWorkflowStorageRoot,
getGlobalCasDir,
} from "../src/storage-root.js";
describe('getDefaultStorageRoot', () => { describe("getDefaultStorageRoot", () => {
it('returns homedir + /.uwf', () => { it("returns homedir + /.uwf", () => {
expect(getDefaultStorageRoot()).toBe(homedir() + '/.uwf'); expect(getDefaultStorageRoot()).toBe(`${homedir()}/.uwf`);
}); });
}); });
describe('getDefaultWorkflowStorageRoot', () => { describe("getDefaultWorkflowStorageRoot", () => {
it('returns same as getDefaultStorageRoot (deprecated alias)', () => { it("returns same as getDefaultStorageRoot (deprecated alias)", () => {
expect(getDefaultWorkflowStorageRoot()).toBe(getDefaultStorageRoot()); expect(getDefaultWorkflowStorageRoot()).toBe(getDefaultStorageRoot());
}); });
}); });
describe('getGlobalCasDir', () => { describe("getGlobalCasDir", () => {
it('appends /cas to given storage root', () => { it("appends /cas to given storage root", () => {
expect(getGlobalCasDir('/tmp/test')).toBe('/tmp/test/cas'); expect(getGlobalCasDir("/tmp/test")).toBe("/tmp/test/cas");
}); });
it('falls back to default when undefined', () => { it("falls back to default when undefined", () => {
expect(getGlobalCasDir(undefined)).toBe(homedir() + '/.uwf/cas'); expect(getGlobalCasDir(undefined)).toBe(`${homedir()}/.uwf/cas`);
}); });
}); });