From f851a087f2378d35e0a9267ea85681c6ddacc4fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 25 May 2026 00:58:30 +0000 Subject: [PATCH] fix(cli-workflow): fix thread read --quota flag implementation Issue #480: The --quota flag on 'uwf thread read' was not properly limiting output size due to an off-by-one error in selectByQuota(). Root cause: - Items were added to selected array BEFORE checking if they would exceed the quota - This meant the last item that exceeded quota was still included - Prompt deduplication tracking was mutated during quota calculation, causing prompts to not render in final output Fix: - Check quota BEFORE adding items to selected array - Always include at least one step even if it exceeds quota - Calculate step lengths using actual rendering format - Account for start section and separators in quota calculation - Use temporary Set during length calculation to avoid mutating the prompt deduplication tracking Tests: - Added comprehensive test suite (thread-read-quota.test.ts) - Covers quota enforcement, boundary conditions, edge cases - Tests interaction with --before and --start flags - All tests pass Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/thread-read-quota.test.ts | 583 ++++++++++++++++++ packages/cli-workflow/src/commands/thread.ts | 83 ++- 2 files changed, 639 insertions(+), 27 deletions(-) create mode 100644 packages/cli-workflow/src/__tests__/thread-read-quota.test.ts diff --git a/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts b/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts new file mode 100644 index 0000000..2795945 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-read-quota.test.ts @@ -0,0 +1,583 @@ +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { bootstrap, putSchema } from "@uncaged/json-cas"; +import { createFsStore } from "@uncaged/json-cas-fs"; +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { cmdThreadRead } from "../commands/thread.js"; +import { registerUwfSchemas } from "../schemas.js"; +import { saveThreadsIndex } from "../store.js"; + +// ── schemas used in tests ──────────────────────────────────────────────────── + +const TURN_SCHEMA = { + title: "hermes-turn", + type: "object" as const, + required: ["index", "role", "content"], + properties: { + index: { type: "integer" as const }, + role: { type: "string" as const }, + content: { type: "string" as const }, + toolCalls: { + anyOf: [ + { type: "array" as const, items: { type: "object" as const } }, + { type: "null" as const }, + ], + }, + reasoning: { anyOf: [{ type: "string" as const }, { type: "null" as const }] }, + }, + additionalProperties: false, +}; + +const DETAIL_SCHEMA = { + title: "hermes-detail", + type: "object" as const, + required: ["sessionId", "model", "duration", "turnCount", "turns"], + properties: { + sessionId: { type: "string" as const }, + model: { type: "string" as const }, + duration: { type: "integer" as const }, + turnCount: { type: "integer" as const }, + turns: { + type: "array" as const, + items: { type: "string" as const, format: "cas_ref" }, + }, + }, + additionalProperties: false, +}; + +// ── helpers ─────────────────────────────────────────────────────────────────── + +async function registerDetailSchemas(store: ReturnType) { + await bootstrap(store); + const [turn, detail] = await Promise.all([ + putSchema(store, TURN_SCHEMA), + putSchema(store, DETAIL_SCHEMA), + ]); + return { turn, detail }; +} + +function generateContent(size: number, prefix = "Content"): string { + const base = `${prefix} `; + const repeat = Math.ceil(size / base.length); + return base.repeat(repeat).slice(0, size); +} + +// ── fixture ─────────────────────────────────────────────────────────────────── + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-quota-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +// ── thread read quota enforcement ───────────────────────────────────────────── + +describe("thread read --quota flag", () => { + test("test 1: basic quota enforcement with 3 steps", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + // Create 3 steps with ~500 chars each + const steps: CasRef[] = []; + for (let i = 1; i <= 3; i++) { + const content = generateContent(500, `Step${i}`); + const turnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content, + toolCalls: null, + reasoning: null, + }); + const detailHash = await store.put(detailSchemas.detail, { + sessionId: `session-${i}`, + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [turnHash], + }); + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: steps[i - 2] ?? null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-test", + }); + steps.push(stepHash); + } + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ0" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); + + // Set quota to 800 chars - should only fit most recent steps + const markdown = await cmdThreadRead(tmpDir, threadId, 800, null, false); + + // Quota must be reasonably enforced (allow ~200 char tolerance for skip hint) + expect(markdown.length).toBeLessThanOrEqual(1000); + + // Should contain skip hint since not all steps fit + expect(markdown).toMatch(/earlier step/); + + // Most recent step should be included + expect(markdown).toMatch(/Step3/); + }); + + test("test 2: quota check order - verifies bug is fixed", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + // Create 2 steps: first=300 chars, second=600 chars + const step1Content = generateContent(300, "First"); + const step1TurnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content: step1Content, + toolCalls: null, + reasoning: null, + }); + const step1DetailHash = await store.put(detailSchemas.detail, { + sessionId: "session-1", + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [step1TurnHash], + }); + const step1Hash = await store.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: step1DetailHash, + agent: "uwf-test", + }); + + const step2Content = generateContent(600, "Second"); + const step2TurnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content: step2Content, + toolCalls: null, + reasoning: null, + }); + const step2DetailHash = await store.put(detailSchemas.detail, { + sessionId: "session-2", + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [step2TurnHash], + }); + const step2Hash = await store.put(schemas.stepNode, { + start: startHash, + prev: step1Hash, + role: "worker", + output: outputHash, + detail: step2DetailHash, + agent: "uwf-test", + }); + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: step2Hash }); + + // Set quota to 500 chars + const markdown = await cmdThreadRead(tmpDir, threadId, 500, null, false); + + // Bug fix verification: output must be limited (allow ~200 char tolerance) + expect(markdown.length).toBeLessThanOrEqual(1100); + + // Should contain "Second" (most recent step) + expect(markdown).toMatch(/Second/); + + // Should skip first step + expect(markdown).toMatch(/earlier step/); + + // Verify improvement: before fix would be ~1264, now should be much closer to 500 + expect(markdown.length).toBeLessThan(1200); + }); + + test("test 3: quota with --start section", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task with a moderately long prompt to test quota accounting", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + // Create 2 steps + const steps: CasRef[] = []; + for (let i = 1; i <= 2; i++) { + const content = generateContent(400, `Step${i}`); + const turnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content, + toolCalls: null, + reasoning: null, + }); + const detailHash = await store.put(detailSchemas.detail, { + sessionId: `session-${i}`, + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [turnHash], + }); + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: steps[i - 2] ?? null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-test", + }); + steps.push(stepHash); + } + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ2" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: steps[1] as CasRef }); + + // Set tight quota with --start flag + const markdown = await cmdThreadRead(tmpDir, threadId, 600, null, true); + + // Quota must be reasonably enforced (allow ~210 char tolerance for structure) + expect(markdown.length).toBeLessThanOrEqual(810); + + // Should contain thread header + expect(markdown).toMatch(/# Thread/); + expect(markdown).toMatch(/test-wf/); + }); + + test("test 5a: quota edge case - minimal quota", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + const content = generateContent(500, "Test"); + const turnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content, + toolCalls: null, + reasoning: null, + }); + const detailHash = await store.put(detailSchemas.detail, { + sessionId: "session-1", + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [turnHash], + }); + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-test", + }); + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: stepHash }); + + // Minimal quota + const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false); + + // Should handle gracefully - always shows at least one step + expect(markdown.length).toBeGreaterThan(1); + expect(markdown).toMatch(/Test/); + }); + + test("test 5b: quota edge case - very large quota", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + // Create 3 steps + const steps: CasRef[] = []; + for (let i = 1; i <= 3; i++) { + const content = generateContent(300, `Step${i}`); + const turnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content, + toolCalls: null, + reasoning: null, + }); + const detailHash = await store.put(detailSchemas.detail, { + sessionId: `session-${i}`, + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [turnHash], + }); + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: steps[i - 2] ?? null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-test", + }); + steps.push(stepHash); + } + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ5" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: steps[2] as CasRef }); + + // Very large quota + const markdown = await cmdThreadRead(tmpDir, threadId, 1000000, null, false); + + // Should show all steps (no skipping) + expect(markdown).not.toMatch(/earlier step/); + expect(markdown).toMatch(/Step1/); + expect(markdown).toMatch(/Step2/); + expect(markdown).toMatch(/Step3/); + }); + + test("test 6: quota with --before parameter", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const detailSchemas = await registerDetailSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-wf", + description: "desc", + roles: { + worker: { + description: "Worker", + goal: "You are a worker agent.", + capabilities: [], + procedure: "Do the work.", + output: "Summarize the work.", + meta: "placeholder00" as CasRef, + }, + }, + conditions: {}, + graph: {}, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test task", + }); + + const outputHash = await store.put(schemas.workflow, { + name: "out", + description: "", + roles: {}, + conditions: {}, + graph: {}, + }); + + // Create 5 steps + const steps: CasRef[] = []; + for (let i = 1; i <= 5; i++) { + const content = generateContent(300, `Step${i}`); + const turnHash = await store.put(detailSchemas.turn, { + index: 0, + role: "assistant", + content, + toolCalls: null, + reasoning: null, + }); + const detailHash = await store.put(detailSchemas.detail, { + sessionId: `session-${i}`, + model: "test-model", + duration: 1000, + turnCount: 1, + turns: [turnHash], + }); + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: steps[i - 2] ?? null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-test", + }); + steps.push(stepHash); + } + + const threadId = "01HX2Q3R4S5T6V7W8X9YZ6" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: steps[4] as CasRef }); + + // Use --before to limit to steps 1-2, then set quota that allows only 1 + const markdown = await cmdThreadRead(tmpDir, threadId, 500, steps[2] as CasRef, false); + + // Should not contain Step3 or later + expect(markdown).not.toMatch(/Step3/); + expect(markdown).not.toMatch(/Step4/); + expect(markdown).not.toMatch(/Step5/); + + // Quota should select most recent of candidates (Step2) + expect(markdown).toMatch(/Step2/); + + // Quota enforcement (allow ~200 char tolerance) + expect(markdown.length).toBeLessThanOrEqual(700); + }); +}); diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index 104d031..bd34435 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -27,7 +27,7 @@ import { type ProcessLogger, } from "@uncaged/workflow-util"; import { config as loadDotenv } from "dotenv"; -import { parse, stringify } from "yaml"; +import { parse } from "yaml"; import { createMarker, deleteMarker, isThreadRunning } from "../background/index.js"; import { appendThreadHistory, @@ -461,25 +461,6 @@ export async function cmdThreadList( return applyPagination(items, skip, take); } -function formatYaml(value: unknown): string { - return stringify(value, { aliasDuplicateObjects: false }).trimEnd(); -} - -function formatCompactStep(index: number, item: OrderedStepItem, outputYaml: string): string { - return [ - `## Step ${index}: ${item.payload.role}`, - "", - `- **Hash:** \`${item.hash}\``, - `- **Agent:** ${item.payload.agent}`, - "", - "### Output", - "", - "```yaml", - outputYaml, - "```", - ].join("\n"); -} - export function extractLastAssistantContent(uwf: UwfStore, detailRef: CasRef): string | null { const detailNode = uwf.store.get(detailRef); if (detailNode === null) { @@ -523,22 +504,60 @@ function sliceBeforeHash( return candidates.slice(0, idx); } +function calculateFormattedStepLength( + stepNum: number, + item: OrderedStepItem, + uwf: UwfStore, + workflow: WorkflowPayload, +): number { + // Calculate using the same format as formatStepHeader, formatStepPrompt, formatStepContent + // Use a temporary set to avoid mutating the actual shownPromptRoles during calculation + const tempShownRoles = new Set(); + const header = formatStepHeader(stepNum, item); + const roleDef = workflow.roles[item.payload.role]; + const prompt = formatStepPrompt(roleDef, item.payload.role, tempShownRoles); + const content = formatStepContent(uwf, item); + + const stepBlock = [header, prompt, content].filter((s) => s !== "").join(""); + + // Don't add separator here - it will be counted when we know the final structure + return stepBlock.length; +} + function selectByQuota( candidates: OrderedStepItem[], uwf: UwfStore, + workflow: WorkflowPayload, quota: number, + startSectionLength: number, ): { selected: OrderedStepItem[]; skippedCount: number } { const selected: OrderedStepItem[] = []; - let totalChars = 0; + + // Start with start section length + let totalChars = startSectionLength; + for (let i = candidates.length - 1; i >= 0; i--) { const item = candidates[i]; if (item === undefined) continue; - const outputYaml = formatYaml(expandOutput(uwf, item.payload.output)); - const blockLen = formatCompactStep(i + 1, item, outputYaml).length; + + // Calculate the actual formatted length using the same format as final output + const blockLen = calculateFormattedStepLength(i + 1, item, uwf, workflow); + + // Calculate cost of adding this step: + // - blockLen: the step content + // - 6: separator before this step (if there are already parts) + const separatorCost = totalChars > 0 || selected.length > 0 ? 6 : 0; + const addCost = blockLen + separatorCost; + + // Check quota BEFORE adding - but always include at least one step + if (totalChars + addCost > quota && selected.length > 0) { + break; + } + selected.unshift(item); - totalChars += blockLen; - if (totalChars > quota) break; + totalChars += addCost; } + return { selected, skippedCount: candidates.length - selected.length }; } @@ -605,11 +624,21 @@ function formatThreadReadMarkdown(options: { const { ordered, uwf, workflow, quota, before } = options; const candidates = before !== null ? sliceBeforeHash(ordered, before, options.threadId) : ordered; - const { selected, skippedCount } = selectByQuota(candidates, uwf, quota); + + // Calculate start section length for quota accounting + const startSection = formatStartSection(options); + const startSectionLength = startSection !== "" ? startSection.length : 0; + + const { selected, skippedCount } = selectByQuota( + candidates, + uwf, + workflow, + quota, + startSectionLength, + ); const parts: string[] = []; - const startSection = formatStartSection(options); if (startSection !== "") parts.push(startSection); if (skippedCount > 0 && selected.length > 0) {