Merge pull request 'refactor(cli): reduce cognitive complexity in thread.ts' (#452) from fix/446-reduce-thread-complexity into main

This commit is contained in:
2026-05-23 15:55:03 +00:00
3 changed files with 339 additions and 105 deletions
@@ -382,10 +382,6 @@ describe("cmdThreadStepDetails", () => {
content: "done", content: "done",
}); });
}); });
test("throws when step hash does not exist", async () => {
await expect(cmdThreadStepDetails(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
});
}); });
// ── cmdThreadRead: ### Prompt deduplication ─────────────────────────────────── // ── cmdThreadRead: ### Prompt deduplication ───────────────────────────────────
@@ -471,3 +467,181 @@ describe("cmdThreadRead ### Prompt deduplication", () => {
expect(count).toBe(2); expect(count).toBe(2);
}); });
}); });
// ── cmdThreadRead: showStart / before / quota ─────────────────────────────────
describe("cmdThreadRead start section / before / quota", () => {
async function makeSimpleThread(
uwf: UwfStore,
roles: string[],
): Promise<{ startHash: CasRef; stepHashes: CasRef[] }> {
const uniqueRoles = [...new Set(roles)];
const workflowHash = await uwf.store.put(uwf.schemas.workflow, {
name: "simple-wf",
description: "desc",
roles: Object.fromEntries(
uniqueRoles.map((r) => [
r,
{
description: r,
goal: `Goal for ${r}`,
capabilities: [],
procedure: "Do stuff.",
output: "Output.",
meta: "placeholder00" as CasRef,
},
]),
),
conditions: {},
graph: {},
});
const startHash = (await uwf.store.put(uwf.schemas.startNode, {
workflow: workflowHash,
prompt: "Initial prompt",
})) as CasRef;
const outputHash = await uwf.store.put(uwf.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHashes: CasRef[] = [];
let prev: CasRef | null = null;
for (const role of roles) {
const stepHash = (await uwf.store.put(uwf.schemas.stepNode, {
start: startHash,
prev,
role,
output: outputHash,
detail: null,
agent: "uwf-test",
})) as CasRef;
stepHashes.push(stepHash);
prev = stepHash;
}
return { startHash, stepHashes };
}
test("showStart=true includes # Thread header and ## Task section", async () => {
const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST0000000000000006" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, true);
expect(markdown).toContain("# Thread");
expect(markdown).toContain("## Task");
expect(markdown).toContain("Initial prompt");
});
test("showStart=false with before=null still shows # Thread header (default behavior)", async () => {
const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST0000000000000007" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
// When before=null, the start section is always shown regardless of showStart
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).toContain("# Thread");
expect(markdown).toContain("## Task");
});
test("before filter: only steps before the given hash appear", async () => {
const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]);
const [_hashA, hashB, hashC] = stepHashes as [CasRef, CasRef, CasRef];
const threadId = "01JTEST0000000000000008" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: hashC });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, hashB, false);
expect(markdown).toContain("roleA");
expect(markdown).not.toContain("roleB");
expect(markdown).not.toContain("roleC");
});
test("quota=1 limits output and includes skip hint", async () => {
const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA", "roleB", "roleC"]);
const threadId = "01JTEST000000000000000A" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[stepHashes.length - 1]! });
const markdown = await cmdThreadRead(tmpDir, threadId, 1, null, false);
expect(markdown).toContain("earlier step");
});
test("all steps fit in quota: no skip hint", async () => {
const uwf = await makeUwfStore(tmpDir);
const { stepHashes } = await makeSimpleThread(uwf, ["roleA"]);
const threadId = "01JTEST000000000000000B" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHashes[0]! });
const markdown = await cmdThreadRead(tmpDir, threadId, THREAD_READ_DEFAULT_QUOTA, null, false);
expect(markdown).not.toContain("earlier step");
});
});
// ── Tests that call process.exit must be last ─────────────────────────────────
describe("cmdThreadStepDetails (process.exit tests - must be last)", () => {
test("throws when step hash does not exist", async () => {
await expect(cmdThreadStepDetails(tmpDir, "nonexistenth0" as CasRef)).rejects.toThrow();
});
test("before with unknown hash rejects", async () => {
const _uwf = await makeUwfStore(tmpDir);
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const uwfStore: UwfStore = { storageRoot: tmpDir, store, schemas };
const workflowHash = await uwfStore.store.put(uwfStore.schemas.workflow, {
name: "wf2",
description: "",
roles: {
roleA: {
description: "r",
goal: "g",
capabilities: [],
procedure: "p",
output: "o",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await uwfStore.store.put(uwfStore.schemas.startNode, {
workflow: workflowHash,
prompt: "p",
});
const outputHash = await uwfStore.store.put(uwfStore.schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const stepHash = await uwfStore.store.put(uwfStore.schemas.stepNode, {
start: startHash,
prev: null,
role: "roleA",
output: outputHash,
detail: null,
agent: "uwf-test",
});
await saveThreadsIndex(tmpDir, { ["01JTEST000000000000000C" as ThreadId]: stepHash as CasRef });
await expect(
cmdThreadRead(
tmpDir,
"01JTEST000000000000000C" as ThreadId,
THREAD_READ_DEFAULT_QUOTA,
"unknownhash0" as CasRef,
false,
),
).rejects.toThrow();
});
});
+148 -97
View File
@@ -462,49 +462,68 @@ function expandDeep(store: CasStore, hash: CasRef, visited?: Set<string>): unkno
return expandValue(store, schema, node.payload, seen); return expandValue(store, schema, node.payload, seen);
} }
function expandCasRefField(store: CasStore, value: unknown, visited: Set<string>): unknown {
if (typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
return value;
}
function expandAnyOfField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!Array.isArray(schema.anyOf)) return value;
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
function expandArrayField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (!schema.items || !Array.isArray(value)) return value;
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
function expandObjectField(
store: CasStore,
schema: JSONSchema,
value: unknown,
visited: Set<string>,
): unknown {
if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) {
return value;
}
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
function expandValue( function expandValue(
store: CasStore, store: CasStore,
schema: JSONSchema, schema: JSONSchema,
value: unknown, value: unknown,
visited: Set<string>, visited: Set<string>,
): unknown { ): unknown {
// If this field is a cas_ref, expand it if (schema.format === "cas_ref") return expandCasRefField(store, value, visited);
if (schema.format === "cas_ref") { if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited);
if (typeof value === "string") { if (schema.type === "array") return expandArrayField(store, schema, value, visited);
return expandDeep(store, value as CasRef, visited); return expandObjectField(store, schema, value, visited);
}
return value;
}
// anyOf (nullable refs)
if (Array.isArray(schema.anyOf)) {
for (const sub of schema.anyOf as JSONSchema[]) {
if (sub.format === "cas_ref" && typeof value === "string") {
return expandDeep(store, value as CasRef, visited);
}
}
return value;
}
// Array of cas_ref items
if (schema.type === "array" && schema.items && Array.isArray(value)) {
const itemSchema = schema.items as JSONSchema;
return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited));
}
// Object with properties
if (value !== null && typeof value === "object" && !Array.isArray(value) && schema.properties) {
const props = schema.properties as Record<string, JSONSchema>;
const obj = value as Record<string, unknown>;
const result: Record<string, unknown> = {};
for (const [key, val] of Object.entries(obj)) {
const propSchema = props[key];
result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val;
}
return result;
}
return value;
} }
function collectOrderedSteps( function collectOrderedSteps(
@@ -588,6 +607,85 @@ export function extractLastAssistantContent(uwf: UwfStore, detailRef: CasRef): s
return null; return null;
} }
function sliceBeforeHash(
candidates: OrderedStepItem[],
before: CasRef,
threadId: ThreadId,
): OrderedStepItem[] {
const idx = candidates.findIndex((s) => s.hash === before);
if (idx === -1) {
fail(`step ${before} not found in thread ${threadId}`);
}
return candidates.slice(0, idx);
}
function selectByQuota(
candidates: OrderedStepItem[],
uwf: UwfStore,
quota: number,
): { selected: OrderedStepItem[]; skippedCount: number } {
const selected: OrderedStepItem[] = [];
let totalChars = 0;
for (let i = candidates.length - 1; i >= 0; i--) {
const item = candidates[i];
if (item === undefined) continue;
const outputYaml = formatYaml(expandOutput(uwf, item.payload.output));
const blockLen = formatCompactStep(i + 1, item, outputYaml).length;
selected.unshift(item);
totalChars += blockLen;
if (totalChars > quota) break;
}
return { selected, skippedCount: candidates.length - selected.length };
}
function formatStepHeader(stepNum: number, item: OrderedStepItem): string {
const ts = new Date(item.timestamp)
.toISOString()
.replace("T", " ")
.replace(/\.\d+Z$/, "");
return [
`## Step ${stepNum}: ${item.payload.role} \`${item.hash}\``,
`**Agent:** ${item.payload.agent} | **Time:** ${ts}`,
].join("\n");
}
function formatStepPrompt(
roleDef: WorkflowPayload["roles"][string] | undefined,
role: string,
shownPromptRoles: Set<string>,
): string {
if (!roleDef || shownPromptRoles.has(role)) return "";
shownPromptRoles.add(role);
return ["", "", "### Prompt", "", roleDef.goal].join("\n");
}
function formatStepContent(uwf: UwfStore, item: OrderedStepItem): string {
if (!item.payload.detail) return "";
const content = extractLastAssistantContent(uwf, item.payload.detail);
if (content === null) return "";
return ["", "", "### Content", "", content].join("\n");
}
function formatStartSection(options: {
threadId: ThreadId;
workflowName: string;
workflowHash: CasRef;
prompt: string;
before: CasRef | null;
showStart: boolean;
}): string {
if (options.before !== null && !options.showStart) return "";
return [
`# Thread \`${options.threadId}\``,
"",
`**Workflow:** ${options.workflowName} (\`${options.workflowHash}\`)`,
"",
"## Task",
"",
options.prompt,
].join("\n");
}
function formatThreadReadMarkdown(options: { function formatThreadReadMarkdown(options: {
threadId: ThreadId; threadId: ThreadId;
workflowName: string; workflowName: string;
@@ -600,50 +698,16 @@ function formatThreadReadMarkdown(options: {
before: CasRef | null; before: CasRef | null;
showStart: boolean; showStart: boolean;
}): string { }): string {
const { ordered, uwf, workflow, quota, before, showStart } = options; const { ordered, uwf, workflow, quota, before } = options;
// Determine which steps to consider const candidates = before !== null ? sliceBeforeHash(ordered, before, options.threadId) : ordered;
let candidates = ordered; const { selected, skippedCount } = selectByQuota(candidates, uwf, quota);
if (before !== null) {
const idx = candidates.findIndex((s) => s.hash === before);
if (idx === -1) {
fail(`step ${before} not found in thread ${options.threadId}`);
}
candidates = candidates.slice(0, idx);
}
// Walk backward from newest, accumulating chars until quota exceeded
const selected: OrderedStepItem[] = [];
let totalChars = 0;
for (let i = candidates.length - 1; i >= 0; i--) {
const item = candidates[i];
if (item === undefined) continue;
const outputYaml = formatYaml(expandOutput(uwf, item.payload.output));
const blockLen = formatCompactStep(i + 1, item, outputYaml).length;
selected.unshift(item);
totalChars += blockLen;
if (totalChars > quota) break;
}
const skippedCount = candidates.length - selected.length;
const parts: string[] = []; const parts: string[] = [];
// Start section const startSection = formatStartSection(options);
if (before === null || showStart) { if (startSection !== "") parts.push(startSection);
parts.push(
[
`# Thread \`${options.threadId}\``,
"",
`**Workflow:** ${options.workflowName} (\`${options.workflowHash}\`)`,
"",
"## Task",
"",
options.prompt,
].join("\n"),
);
}
// Skip hint
if (skippedCount > 0 && selected.length > 0) { if (skippedCount > 0 && selected.length > 0) {
const firstSelected = selected[0]; const firstSelected = selected[0];
if (firstSelected !== undefined) { if (firstSelected !== undefined) {
@@ -653,34 +717,21 @@ function formatThreadReadMarkdown(options: {
} }
} }
// Step blocks
const startIndex = candidates.length - selected.length; const startIndex = candidates.length - selected.length;
const shownPromptRoles = new Set<string>(); const shownPromptRoles = new Set<string>();
for (let i = 0; i < selected.length; i++) { for (let i = 0; i < selected.length; i++) {
const item = selected[i]; const item = selected[i];
if (item === undefined) continue; if (item === undefined) continue;
const stepNum = startIndex + i + 1; const stepNum = startIndex + i + 1;
const ts = new Date(item.timestamp)
.toISOString()
.replace("T", " ")
.replace(/\.\d+Z$/, "");
const stepLines = [
`## Step ${stepNum}: ${item.payload.role} \`${item.hash}\``,
`**Agent:** ${item.payload.agent} | **Time:** ${ts}`,
];
const roleDef = workflow.roles[item.payload.role]; const roleDef = workflow.roles[item.payload.role];
if (roleDef && !shownPromptRoles.has(item.payload.role)) { const stepBlock = [
const prompt = roleDef.goal; formatStepHeader(stepNum, item),
stepLines.push("", "### Prompt", "", prompt); formatStepPrompt(roleDef, item.payload.role, shownPromptRoles),
shownPromptRoles.add(item.payload.role); formatStepContent(uwf, item),
} ]
if (item.payload.detail) { .filter((s) => s !== "")
const content = extractLastAssistantContent(uwf, item.payload.detail); .join("");
if (content !== null) { parts.push(stepBlock);
stepLines.push("", "### Content", "", content);
}
}
parts.push(stepLines.join("\n"));
} }
return parts.join("\n\n---\n\n"); return parts.join("\n\n---\n\n");
@@ -16,6 +16,7 @@ const log = createLogger({ sink: { kind: "stderr" } });
const CLAUDE_COMMAND = "claude"; const CLAUDE_COMMAND = "claude";
const CLAUDE_MAX_TURNS = 90; const CLAUDE_MAX_TURNS = 90;
const CLAUDE_MODEL = process.env["CLAUDE_MODEL"] ?? null;
function buildHistorySummary(steps: AgentContext["steps"]): string { function buildHistorySummary(steps: AgentContext["steps"]): string {
if (steps.length === 0) { if (steps.length === 0) {
@@ -87,7 +88,7 @@ function spawnClaude(args: string[]): Promise<{ stdout: string; stderr: string }
} }
function spawnClaudeRun(prompt: string): Promise<{ stdout: string; stderr: string }> { function spawnClaudeRun(prompt: string): Promise<{ stdout: string; stderr: string }> {
return spawnClaude([ const args = [
"-p", "-p",
prompt, prompt,
"--output-format", "--output-format",
@@ -96,14 +97,18 @@ function spawnClaudeRun(prompt: string): Promise<{ stdout: string; stderr: strin
"--dangerously-skip-permissions", "--dangerously-skip-permissions",
"--max-turns", "--max-turns",
String(CLAUDE_MAX_TURNS), String(CLAUDE_MAX_TURNS),
]); ];
if (CLAUDE_MODEL !== null) {
args.push("--model", CLAUDE_MODEL);
}
return spawnClaude(args);
} }
function spawnClaudeResume( function spawnClaudeResume(
sessionId: string, sessionId: string,
message: string, message: string,
): Promise<{ stdout: string; stderr: string }> { ): Promise<{ stdout: string; stderr: string }> {
return spawnClaude([ const args = [
"-p", "-p",
message, message,
"--resume", "--resume",
@@ -114,7 +119,11 @@ function spawnClaudeResume(
"--dangerously-skip-permissions", "--dangerously-skip-permissions",
"--max-turns", "--max-turns",
String(CLAUDE_MAX_TURNS), String(CLAUDE_MAX_TURNS),
]); ];
if (CLAUDE_MODEL !== null) {
args.push("--model", CLAUDE_MODEL);
}
return spawnClaude(args);
} }
async function processClaudeOutput(stdout: string, store: Store): Promise<AgentRunResult> { async function processClaudeOutput(stdout: string, store: Store): Promise<AgentRunResult> {