diff --git a/.gitignore b/.gitignore index a514b35..4b95466 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ xiaoju/ solve-issue-entry.ts packages/workflow-template-develop/develop.esm.js .DS_Store +*.py diff --git a/packages/cli-uwf/src/cli.ts b/packages/cli-uwf/src/cli.ts index a8b8b22..5a93593 100755 --- a/packages/cli-uwf/src/cli.ts +++ b/packages/cli-uwf/src/cli.ts @@ -1,11 +1,14 @@ #!/usr/bin/env bun import { Command } from "commander"; +import type { ThreadId } from "@uncaged/uwf-protocol"; import { cmdThreadFork, cmdThreadKill, cmdThreadList, + cmdThreadRead, + THREAD_READ_DEFAULT_QUOTA, cmdThreadShow, cmdThreadStart, cmdThreadStep, @@ -158,6 +161,28 @@ thread }); }); +thread + .command("read") + .description("Read thread context as human-readable markdown") + .argument("", "Thread ULID") + .option("--quota ", "Max output characters", String(THREAD_READ_DEFAULT_QUOTA)) + .option("--before ", "Load steps before this hash (exclusive)") + .option("--start", "Include start step in output") + .option("--detail", "Expand detail content for each step") + .action((threadId: string, opts: { quota: string; before: string | undefined; start: boolean; detail: boolean }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const quota = Number.parseInt(opts.quota, 10); + if (!Number.isFinite(quota) || quota < 1) { + process.stderr.write("invalid --quota: must be a positive integer\n"); + process.exit(1); + } + const before = opts.before ?? null; + const markdown = await cmdThreadRead(storageRoot, threadId as ThreadId, quota, before, opts.start ?? false, opts.detail ?? false); + process.stdout.write(markdown.endsWith("\n") ? markdown : `${markdown}\n`); + }); + }); + thread .command("fork") .description("Fork a thread from a specific step") diff --git a/packages/cli-uwf/src/commands/thread.ts b/packages/cli-uwf/src/commands/thread.ts index f718996..d5c9e3a 100644 --- a/packages/cli-uwf/src/commands/thread.ts +++ b/packages/cli-uwf/src/commands/thread.ts @@ -1,6 +1,8 @@ import { execFileSync } from "node:child_process"; -import { validate } from "@uncaged/json-cas"; +import { getSchema, validate } from "@uncaged/json-cas"; +import type { JSONSchema, Store as CasStore } from "@uncaged/json-cas"; +import { stringify } from "yaml"; import { getEnvPath, loadWorkflowConfig } from "@uncaged/uwf-agent-kit"; import { evaluate } from "@uncaged/uwf-moderator"; import type { @@ -40,6 +42,7 @@ import { import { isCasRef } from "../validate.js"; const END_ROLE = "$END"; +export const THREAD_READ_DEFAULT_QUOTA = 4000; type ChainState = { startHash: CasRef; @@ -48,6 +51,12 @@ type ChainState = { headIsStart: boolean; }; +type OrderedStepItem = { + hash: CasRef; + payload: StepNodePayload; + timestamp: number; +}; + export type KillOutput = { thread: ThreadId; archived: boolean; @@ -266,6 +275,215 @@ function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { return node.payload; } +/** + * Recursively expand all cas_ref fields in a CAS node's payload, + * replacing hash strings with the referenced node's expanded payload. + */ +function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { + const seen = visited ?? new Set(); + if (seen.has(hash)) return hash; // cycle guard + seen.add(hash); + + const node = store.get(hash); + if (node === null) return hash; + + const schema = getSchema(store, node.type); + if (schema === null) return node.payload; + + return expandValue(store, schema, node.payload, seen); +} + +function expandValue(store: CasStore, schema: JSONSchema, value: unknown, visited: Set): unknown { + // If this field is a cas_ref, expand it + if (schema.format === "cas_ref") { + if (typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + return value; + } + + // anyOf (nullable refs) + if (Array.isArray(schema.anyOf)) { + for (const sub of schema.anyOf as JSONSchema[]) { + if (sub.format === "cas_ref" && typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + } + return value; + } + + // Array of cas_ref items + if (schema.type === "array" && schema.items && Array.isArray(value)) { + const itemSchema = schema.items as JSONSchema; + return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited)); + } + + // Object with properties + if (value !== null && typeof value === "object" && !Array.isArray(value) && schema.properties) { + const props = schema.properties as Record; + const obj = value as Record; + const result: Record = {}; + for (const [key, val] of Object.entries(obj)) { + const propSchema = props[key]; + result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val; + } + return result; + } + + return value; +} + +function collectOrderedSteps( + uwf: UwfStore, + headHash: CasRef, + chain: ChainState, +): OrderedStepItem[] { + let hash: CasRef | null = headHash; + const hashToNode = new Map(); + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null || node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + hashToNode.set(hash, { payload, timestamp: node.timestamp }); + hash = payload.prev; + } + + let cur: CasRef | null = chain.headIsStart ? null : headHash; + const ordered: OrderedStepItem[] = []; + while (cur !== null) { + const entry = hashToNode.get(cur); + if (entry === undefined) { + break; + } + ordered.push({ hash: cur, ...entry }); + cur = entry.payload.prev; + } + ordered.reverse(); + return ordered; +} + +function formatYaml(value: unknown): string { + return stringify(value).trimEnd(); +} + +function formatCompactStep(index: number, item: OrderedStepItem, outputYaml: string): string { + return [ + `## Step ${index}: ${item.payload.role}`, + "", + `- **Hash:** \`${item.hash}\``, + `- **Agent:** ${item.payload.agent}`, + "", + "### Output", + "", + "```yaml", + outputYaml, + "```", + ].join("\n"); +} + +function formatThreadReadMarkdown(options: { + threadId: ThreadId; + workflowName: string; + workflowHash: CasRef; + prompt: string; + ordered: OrderedStepItem[]; + uwf: UwfStore; + workflow: WorkflowPayload; + quota: number; + before: CasRef | null; + showStart: boolean; + showDetail: boolean; +}): string { + const { ordered, uwf, workflow, quota, before, showStart, showDetail } = options; + + // Determine which steps to consider + let candidates = ordered; + if (before !== null) { + const idx = candidates.findIndex((s) => s.hash === before); + if (idx === -1) { + fail(`step ${before} not found in thread ${options.threadId}`); + } + candidates = candidates.slice(0, idx); + } + + // Walk backward from newest, accumulating chars until quota exceeded + const selected: OrderedStepItem[] = []; + let totalChars = 0; + for (let i = candidates.length - 1; i >= 0; i--) { + const item = candidates[i]; + if (item === undefined) continue; + const outputYaml = formatYaml(expandOutput(uwf, item.payload.output)); + const blockLen = formatCompactStep(i + 1, item, outputYaml).length; + selected.unshift(item); + totalChars += blockLen; + if (totalChars > quota) break; + } + + const skippedCount = candidates.length - selected.length; + const parts: string[] = []; + + // Start section + if (before === null || showStart) { + parts.push( + [ + `# Thread \`${options.threadId}\``, + "", + `**Workflow:** ${options.workflowName} (\`${options.workflowHash}\`)`, + "", + "## Task", + "", + options.prompt, + ].join("\n"), + ); + } + + // Skip hint + if (skippedCount > 0 && selected.length > 0) { + const firstSelected = selected[0]; + if (firstSelected !== undefined) { + parts.push( + `*(${skippedCount} earlier step${skippedCount > 1 ? "s" : ""}, load with \`uwf thread read ${options.threadId} --before ${firstSelected.hash}\`)*`, + ); + } + } + + // Step blocks + const startIndex = candidates.length - selected.length; + for (let i = 0; i < selected.length; i++) { + const item = selected[i]; + if (item === undefined) continue; + const stepNum = startIndex + i + 1; + const outputYaml = formatYaml(expandOutput(uwf, item.payload.output)); + const ts = new Date(item.timestamp).toISOString().replace("T", " ").replace(/\.\d+Z$/, ""); + const stepLines = [ + `## Step ${stepNum}: ${item.payload.role} \`${item.hash}\``, + `**Agent:** ${item.payload.agent} | **Time:** ${ts}`, + ]; + const roleDef = workflow.roles[item.payload.role]; + if (roleDef) { + stepLines.push("", "### Prompt", "", roleDef.systemPrompt); + } + stepLines.push( + "", + "### Output", + "", + "```yaml", + outputYaml, + "```", + ); + if (showDetail && item.payload.detail) { + const detailExpanded = expandDeep(uwf.store, item.payload.detail); + const detailYaml = formatYaml(detailExpanded); + stepLines.push("", "### Detail", "", "```yaml", detailYaml, "```"); + } + parts.push(stepLines.join("\n")); + } + + return parts.join("\n\n---\n\n"); +} + function buildModeratorContext(uwf: UwfStore, chain: ChainState): ModeratorContext { const chronological = [...chain.stepsNewestFirst].reverse(); const steps: StepContext[] = chronological.map((step) => ({ @@ -475,31 +693,7 @@ export async function cmdThreadSteps( }; const stepEntries: StepEntry[] = []; - - // Walk again to get hashes for each step - let hash: CasRef | null = headHash; - const hashToNode = new Map(); - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null || node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - hashToNode.set(hash, { payload, timestamp: node.timestamp }); - hash = payload.prev; - } - - // Build chronological list with hashes - // Walk from start's next to head - let cur: CasRef | null = chain.headIsStart ? null : headHash; - const ordered: { hash: CasRef; payload: StepNodePayload; timestamp: number }[] = []; - while (cur !== null) { - const entry = hashToNode.get(cur); - if (entry === undefined) break; - ordered.push({ hash: cur, ...entry }); - cur = entry.payload.prev; - } - ordered.reverse(); + const ordered = collectOrderedSteps(uwf, headHash, chain); for (const item of ordered) { stepEntries.push({ @@ -519,6 +713,35 @@ export async function cmdThreadSteps( }; } +export async function cmdThreadRead( + storageRoot: string, + threadId: ThreadId, + quota: number = THREAD_READ_DEFAULT_QUOTA, + before: CasRef | null = null, + showStart: boolean = false, + showDetail: boolean = false, +): Promise { + const headHash = await resolveHeadHash(storageRoot, threadId); + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + const workflow = loadWorkflowPayload(uwf, chain.start.workflow); + const ordered = collectOrderedSteps(uwf, headHash, chain); + + return formatThreadReadMarkdown({ + threadId, + workflowName: workflow.name, + workflowHash: chain.start.workflow, + prompt: chain.start.prompt, + ordered, + uwf, + workflow, + quota, + before, + showStart, + showDetail, + }); +} + export async function cmdThreadFork( storageRoot: string, stepHash: CasRef,