fix(cli-workflow): resolve step/thread commands on completed threads #478

Merged
xiaoju merged 1 commits from fix/469-step-commands-completed-threads into main 2026-05-24 16:41:07 +00:00
3 changed files with 499 additions and 7 deletions
@@ -0,0 +1,108 @@
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { resolveHeadHash } from "../commands/shared.js";
import { appendThreadHistory, saveThreadsIndex } from "../store.js";
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-resolve-head-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
describe("resolveHeadHash", () => {
test("returns head hash from threads.yaml for active thread", async () => {
const threadId = "01JTEST0000000000000000001" as ThreadId;
const headHash = "active_hash_123" as CasRef;
await saveThreadsIndex(tmpDir, { [threadId]: headHash });
const result = await resolveHeadHash(tmpDir, threadId);
expect(result).toBe(headHash);
});
test("falls back to history.jsonl when thread not in threads.yaml", async () => {
const threadId = "01JTEST0000000000000000002" as ThreadId;
const headHash = "completed_hash_456" as CasRef;
const workflowHash = "workflow_hash_789" as CasRef;
// No entry in threads.yaml, only in history.jsonl
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: headHash,
completedAt: Date.now(),
});
const result = await resolveHeadHash(tmpDir, threadId);
expect(result).toBe(headHash);
});
// Note: Testing the error case requires CLI-level testing because resolveHeadHash
// calls fail() which does process.exit(1), terminating the test runner.
// The error behavior is tested in integration tests below via CLI invocation.
test("prioritizes active thread over history when thread exists in both", async () => {
const threadId = "01JTEST0000000000000000004" as ThreadId;
const activeHash = "active_hash_v2" as CasRef;
const historicalHash = "historical_hash_v1" as CasRef;
const workflowHash = "workflow_hash_xyz" as CasRef;
// Thread exists in both locations (should not happen normally, but test the precedence)
await saveThreadsIndex(tmpDir, { [threadId]: activeHash });
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: historicalHash,
completedAt: Date.now(),
});
const result = await resolveHeadHash(tmpDir, threadId);
// Should return the active head, not the historical one
expect(result).toBe(activeHash);
});
test("finds thread from multiple history entries", async () => {
const threadId1 = "01JTEST0000000000000000005" as ThreadId;
const threadId2 = "01JTEST0000000000000000006" as ThreadId;
const threadId3 = "01JTEST0000000000000000007" as ThreadId;
const hash1 = "hash_thread1" as CasRef;
const hash2 = "hash_thread2" as CasRef;
const hash3 = "hash_thread3" as CasRef;
const workflowHash = "workflow_hash_abc" as CasRef;
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, {
thread: threadId1,
workflow: workflowHash,
head: hash1,
completedAt: Date.now() - 2000,
});
await appendThreadHistory(tmpDir, {
thread: threadId2,
workflow: workflowHash,
head: hash2,
completedAt: Date.now() - 1000,
});
await appendThreadHistory(tmpDir, {
thread: threadId3,
workflow: workflowHash,
head: hash3,
completedAt: Date.now(),
});
const result = await resolveHeadHash(tmpDir, threadId2);
expect(result).toBe(hash2);
});
});
@@ -5,7 +5,7 @@ import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdStepShow } from "../commands/step.js";
import { cmdStepList, cmdStepShow } from "../commands/step.js";
import {
cmdThreadRead,
extractLastAssistantContent,
@@ -13,7 +13,7 @@ import {
} from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import type { UwfStore } from "../store.js";
import { saveThreadsIndex } from "../store.js";
import { appendThreadHistory, saveThreadsIndex } from "../store.js";
// ── schemas used in tests ────────────────────────────────────────────────────
@@ -647,3 +647,383 @@ describe("cmdStepShow (process.exit tests - must be last)", () => {
).rejects.toThrow();
});
});
// ── cmdStepList / cmdStepShow: completed threads ──────────────────────────────
describe("cmdStepList with completed threads", () => {
test("lists steps from active thread", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-active",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Start prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "role1",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1Hash,
role: "role2",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step3Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step2Hash,
role: "role3",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000000A1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: step3Hash });
const result = await cmdStepList(tmpDir, threadId);
expect(result.thread).toBe(threadId);
expect(result.steps).toHaveLength(4); // start + 3 steps
expect(result.steps[1].role).toBe("role1");
expect(result.steps[2].role).toBe("role2");
expect(result.steps[3].role).toBe("role3");
});
test("lists steps from completed thread", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-completed",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Start prompt",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1Hash,
role: "roleB",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000000A2" as ThreadId;
// Thread is NOT in threads.yaml (simulating completed thread)
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: step2Hash,
completedAt: Date.now(),
});
const result = await cmdStepList(tmpDir, threadId);
expect(result.thread).toBe(threadId);
expect(result.steps).toHaveLength(3); // start + 2 steps
expect(result.steps[1].role).toBe("roleA");
expect(result.steps[2].role).toBe("roleB");
});
});
describe("cmdStepShow with completed threads", () => {
test("shows step detail from active thread", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-step-active",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "p",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Active thread response",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sess-active",
model: "model-x",
duration: 1234,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "coder",
output: outputHash,
detail: detailHash,
agent: "uwf-hermes",
});
const threadId = "01JTEST0000000000000000B1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const result = await cmdStepShow(tmpDir, stepHash);
expect(result).toMatchObject({
sessionId: "sess-active",
model: "model-x",
duration: 1234,
turnCount: 1,
});
});
test("shows step detail from completed thread", async () => {
const uwf = await makeUwfStore(tmpDir);
const detailSchemas = await registerDetailSchemas(uwf.store);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-step-completed",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "p",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await uwf.store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Completed thread response",
toolCalls: null,
reasoning: null,
});
const detailHash = await uwf.store.put(detailSchemas.detail, {
sessionId: "sess-completed",
model: "model-y",
duration: 5678,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "reviewer",
output: outputHash,
detail: detailHash,
agent: "uwf-hermes",
});
const threadId = "01JTEST0000000000000000B2" as ThreadId;
// Thread is NOT in threads.yaml
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: stepHash,
completedAt: Date.now(),
});
const result = await cmdStepShow(tmpDir, stepHash);
expect(result).toMatchObject({
sessionId: "sess-completed",
model: "model-y",
duration: 5678,
turnCount: 1,
});
});
});
describe("cmdThreadRead with completed threads", () => {
test("reads completed thread context", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-read-completed",
description: "desc",
roles: {
writer: {
description: "Write",
goal: "You are a writer.",
capabilities: [],
procedure: "Write content.",
output: "Summary.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Write something",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "writer",
output: outputHash,
detail: null,
agent: "uwf-hermes",
});
const threadId = "01JTEST0000000000000000C1" as ThreadId;
// Thread is NOT in threads.yaml
await saveThreadsIndex(tmpDir, {});
// But it IS in history.jsonl
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: stepHash,
completedAt: Date.now(),
});
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).toContain("writer");
expect(markdown).toContain("Write something");
});
test("reads completed thread with before filter", async () => {
const uwf = await makeUwfStore(tmpDir);
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "test-wf-read-before",
description: "desc",
roles: {},
conditions: {},
graph: {},
});
const startHash = await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Do task",
});
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const step1Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleX",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step2Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step1Hash,
role: "roleY",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const step3Hash = await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev: step2Hash,
role: "roleZ",
output: outputHash,
detail: null,
agent: "uwf-test",
});
const threadId = "01JTEST0000000000000000C2" as ThreadId;
await saveThreadsIndex(tmpDir, {});
await appendThreadHistory(tmpDir, {
thread: threadId,
workflow: workflowHash,
head: step3Hash,
completedAt: Date.now(),
});
const markdown = await cmdThreadRead(
tmpDir,
threadId,
THREAD_READ_DEFAULT_QUOTA,
step2Hash,
false,
);
// Should contain step1 (roleX) but not step2 (roleY) or step3 (roleZ)
expect(markdown).toContain("roleX");
expect(markdown).not.toContain("roleY");
expect(markdown).not.toContain("roleZ");
});
});
+9 -5
View File
@@ -6,7 +6,7 @@ import type {
StepNodePayload,
ThreadId,
} from "@uncaged/workflow-protocol";
import { loadThreadsIndex, type UwfStore } from "../store.js";
import { findThreadInHistory, loadThreadsIndex, type UwfStore } from "../store.js";
type ChainState = {
startHash: CasRef;
@@ -203,11 +203,15 @@ function collectOrderedSteps(
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
if (head === undefined) {
fail(`thread not active: ${threadId}`);
const activeHead = index[threadId];
if (activeHead !== undefined) {
return activeHead;
}
return head;
const hist = await findThreadInHistory(storageRoot, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`);
}
export {