diff --git a/packages/cli-uwf/src/cli.ts b/packages/cli-uwf/src/cli.ts index bbcf5ba..7272ff9 100755 --- a/packages/cli-uwf/src/cli.ts +++ b/packages/cli-uwf/src/cli.ts @@ -3,11 +3,13 @@ import { Command } from "commander"; import { + cmdThreadFork, cmdThreadKill, cmdThreadList, cmdThreadShow, cmdThreadStart, cmdThreadStep, + cmdThreadSteps, } from "./commands/thread.js"; import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js"; import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js"; @@ -144,6 +146,31 @@ thread }); }); +thread + .command("steps") + .description("List all steps in a thread") + .argument("", "Thread ULID") + .action((threadId: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadSteps(storageRoot, threadId); + writeOutput(result); + }); + }); + +thread + .command("fork") + .description("Fork a thread from a specific step") + .argument("", "Thread ULID") + .argument("", "CAS hash of the step to fork from") + .action((threadId: string, stepHash: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadFork(storageRoot, threadId, stepHash); + writeOutput(result); + }); + }); + program .command("setup") .description("Configure provider, model, and agent") diff --git a/packages/cli-uwf/src/commands/thread.ts b/packages/cli-uwf/src/commands/thread.ts index e2b058c..a7b8eae 100644 --- a/packages/cli-uwf/src/commands/thread.ts +++ b/packages/cli-uwf/src/commands/thread.ts @@ -8,13 +8,17 @@ import type { AgentConfig, CasRef, ModeratorContext, + StartEntry, StartNodePayload, StartOutput, StepContext, + StepEntry, StepNodePayload, StepOutput, + ThreadForkOutput, ThreadId, ThreadListItem, + ThreadStepsOutput, WorkflowConfig, WorkflowPayload, } from "@uncaged/uwf-protocol"; @@ -437,6 +441,131 @@ export async function cmdThreadStep( }; } +async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { + const index = await loadThreadsIndex(storageRoot); + const activeHead = index[threadId]; + if (activeHead !== undefined) { + return activeHead; + } + const hist = await findThreadInHistory(storageRoot, threadId); + if (hist !== null) { + return hist.head; + } + fail(`thread not found: ${threadId}`); +} + +export async function cmdThreadSteps( + storageRoot: string, + threadId: ThreadId, +): Promise { + const headHash = await resolveHeadHash(storageRoot, threadId); + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + + const startNode = uwf.store.get(chain.startHash); + if (startNode === null) { + fail(`StartNode not found: ${chain.startHash}`); + } + + const startEntry: StartEntry = { + hash: chain.startHash, + workflow: chain.start.workflow, + prompt: chain.start.prompt, + timestamp: startNode.timestamp, + }; + + const stepEntries: StepEntry[] = []; + + // Walk again to get hashes for each step + let hash: CasRef | null = headHash; + const hashToNode = new Map(); + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null || node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + hashToNode.set(hash, { payload, timestamp: node.timestamp }); + hash = payload.prev; + } + + // Build chronological list with hashes + // Walk from start's next to head + let cur: CasRef | null = chain.headIsStart ? null : headHash; + const ordered: { hash: CasRef; payload: StepNodePayload; timestamp: number }[] = []; + while (cur !== null) { + const entry = hashToNode.get(cur); + if (entry === undefined) break; + ordered.push({ hash: cur, ...entry }); + cur = entry.payload.prev; + } + ordered.reverse(); + + for (const item of ordered) { + stepEntries.push({ + hash: item.hash, + role: item.payload.role, + output: expandOutput(uwf, item.payload.output), + detail: item.payload.detail, + agent: item.payload.agent, + timestamp: item.timestamp, + }); + } + + return { + thread: threadId, + workflow: chain.start.workflow, + steps: [startEntry, ...stepEntries], + }; +} + +export async function cmdThreadFork( + storageRoot: string, + threadId: ThreadId, + stepHash: CasRef, +): Promise { + const headHash = await resolveHeadHash(storageRoot, threadId); + const uwf = await createUwfStore(storageRoot); + + // Verify stepHash belongs to this thread by walking the chain + let found = false; + let cur: CasRef | null = headHash; + while (cur !== null) { + if (cur === stepHash) { + found = true; + break; + } + const node = uwf.store.get(cur); + if (node === null) break; + if (node.type === uwf.schemas.startNode) { + // startHash check + if (cur === stepHash) { + found = true; + } + break; + } + const payload = node.payload as StepNodePayload; + cur = payload.prev; + } + + if (!found) { + fail(`step ${stepHash} not found in thread ${threadId}`); + } + + const newThreadId = generateUlid(Date.now()) as ThreadId; + const index = await loadThreadsIndex(storageRoot); + index[newThreadId] = stepHash; + await saveThreadsIndex(storageRoot, index); + + return { + thread: newThreadId, + forkedFrom: { + thread: threadId, + step: stepHash, + }, + }; +} + export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const head = index[threadId]; diff --git a/packages/uwf-protocol/src/index.ts b/packages/uwf-protocol/src/index.ts index 84f315e..fe4a019 100644 --- a/packages/uwf-protocol/src/index.ts +++ b/packages/uwf-protocol/src/index.ts @@ -16,14 +16,18 @@ export type { RoleDefinition, RoleName, Scenario, + StartEntry, StartNodePayload, StartOutput, StepContext, + StepEntry, StepNodePayload, StepOutput, StepRecord, + ThreadForkOutput, ThreadId, ThreadListItem, + ThreadStepsOutput, ThreadsIndex, Transition, WorkflowConfig, diff --git a/packages/uwf-protocol/src/types.ts b/packages/uwf-protocol/src/types.ts index 85095da..deccd79 100644 --- a/packages/uwf-protocol/src/types.ts +++ b/packages/uwf-protocol/src/types.ts @@ -80,6 +80,40 @@ export type StepOutput = { done: boolean; }; +/** uwf thread steps — single step entry */ +export type StepEntry = { + hash: CasRef; + role: string; + output: unknown; + detail: CasRef; + agent: string; + timestamp: number; +}; + +/** uwf thread steps — start entry */ +export type StartEntry = { + hash: CasRef; + workflow: CasRef; + prompt: string; + timestamp: number; +}; + +/** uwf thread steps output */ +export type ThreadStepsOutput = { + thread: ThreadId; + workflow: CasRef; + steps: [StartEntry, ...StepEntry[]]; +}; + +/** uwf thread fork output */ +export type ThreadForkOutput = { + thread: ThreadId; + forkedFrom: { + thread: ThreadId; + step: CasRef; + }; +}; + /** uwf thread list */ export type ThreadListItem = { thread: ThreadId;