feat: add thread steps and thread fork commands
- uwf thread steps <thread-id>: walk CAS chain, list all steps chronologically - uwf thread fork <thread-id> <step-hash>: create new thread from history point - New types: StartEntry, StepEntry, ThreadStepsOutput, ThreadForkOutput - Supports both active and archived threads Refs #342
This commit is contained in:
@@ -3,11 +3,13 @@
|
||||
import { Command } from "commander";
|
||||
|
||||
import {
|
||||
cmdThreadFork,
|
||||
cmdThreadKill,
|
||||
cmdThreadList,
|
||||
cmdThreadShow,
|
||||
cmdThreadStart,
|
||||
cmdThreadStep,
|
||||
cmdThreadSteps,
|
||||
} from "./commands/thread.js";
|
||||
import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js";
|
||||
import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js";
|
||||
@@ -144,6 +146,31 @@ thread
|
||||
});
|
||||
});
|
||||
|
||||
thread
|
||||
.command("steps")
|
||||
.description("List all steps in a thread")
|
||||
.argument("<thread-id>", "Thread ULID")
|
||||
.action((threadId: string) => {
|
||||
const storageRoot = resolveStorageRoot();
|
||||
runAction(async () => {
|
||||
const result = await cmdThreadSteps(storageRoot, threadId);
|
||||
writeOutput(result);
|
||||
});
|
||||
});
|
||||
|
||||
thread
|
||||
.command("fork")
|
||||
.description("Fork a thread from a specific step")
|
||||
.argument("<thread-id>", "Thread ULID")
|
||||
.argument("<step-hash>", "CAS hash of the step to fork from")
|
||||
.action((threadId: string, stepHash: string) => {
|
||||
const storageRoot = resolveStorageRoot();
|
||||
runAction(async () => {
|
||||
const result = await cmdThreadFork(storageRoot, threadId, stepHash);
|
||||
writeOutput(result);
|
||||
});
|
||||
});
|
||||
|
||||
program
|
||||
.command("setup")
|
||||
.description("Configure provider, model, and agent")
|
||||
|
||||
@@ -8,13 +8,17 @@ import type {
|
||||
AgentConfig,
|
||||
CasRef,
|
||||
ModeratorContext,
|
||||
StartEntry,
|
||||
StartNodePayload,
|
||||
StartOutput,
|
||||
StepContext,
|
||||
StepEntry,
|
||||
StepNodePayload,
|
||||
StepOutput,
|
||||
ThreadForkOutput,
|
||||
ThreadId,
|
||||
ThreadListItem,
|
||||
ThreadStepsOutput,
|
||||
WorkflowConfig,
|
||||
WorkflowPayload,
|
||||
} from "@uncaged/uwf-protocol";
|
||||
@@ -437,6 +441,131 @@ export async function cmdThreadStep(
|
||||
};
|
||||
}
|
||||
|
||||
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
const activeHead = index[threadId];
|
||||
if (activeHead !== undefined) {
|
||||
return activeHead;
|
||||
}
|
||||
const hist = await findThreadInHistory(storageRoot, threadId);
|
||||
if (hist !== null) {
|
||||
return hist.head;
|
||||
}
|
||||
fail(`thread not found: ${threadId}`);
|
||||
}
|
||||
|
||||
export async function cmdThreadSteps(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
): Promise<ThreadStepsOutput> {
|
||||
const headHash = await resolveHeadHash(storageRoot, threadId);
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const chain = walkChain(uwf, headHash);
|
||||
|
||||
const startNode = uwf.store.get(chain.startHash);
|
||||
if (startNode === null) {
|
||||
fail(`StartNode not found: ${chain.startHash}`);
|
||||
}
|
||||
|
||||
const startEntry: StartEntry = {
|
||||
hash: chain.startHash,
|
||||
workflow: chain.start.workflow,
|
||||
prompt: chain.start.prompt,
|
||||
timestamp: startNode.timestamp,
|
||||
};
|
||||
|
||||
const stepEntries: StepEntry[] = [];
|
||||
|
||||
// Walk again to get hashes for each step
|
||||
let hash: CasRef | null = headHash;
|
||||
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
|
||||
while (hash !== null) {
|
||||
const node = uwf.store.get(hash);
|
||||
if (node === null || node.type !== uwf.schemas.stepNode) {
|
||||
break;
|
||||
}
|
||||
const payload = node.payload as StepNodePayload;
|
||||
hashToNode.set(hash, { payload, timestamp: node.timestamp });
|
||||
hash = payload.prev;
|
||||
}
|
||||
|
||||
// Build chronological list with hashes
|
||||
// Walk from start's next to head
|
||||
let cur: CasRef | null = chain.headIsStart ? null : headHash;
|
||||
const ordered: { hash: CasRef; payload: StepNodePayload; timestamp: number }[] = [];
|
||||
while (cur !== null) {
|
||||
const entry = hashToNode.get(cur);
|
||||
if (entry === undefined) break;
|
||||
ordered.push({ hash: cur, ...entry });
|
||||
cur = entry.payload.prev;
|
||||
}
|
||||
ordered.reverse();
|
||||
|
||||
for (const item of ordered) {
|
||||
stepEntries.push({
|
||||
hash: item.hash,
|
||||
role: item.payload.role,
|
||||
output: expandOutput(uwf, item.payload.output),
|
||||
detail: item.payload.detail,
|
||||
agent: item.payload.agent,
|
||||
timestamp: item.timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
thread: threadId,
|
||||
workflow: chain.start.workflow,
|
||||
steps: [startEntry, ...stepEntries],
|
||||
};
|
||||
}
|
||||
|
||||
export async function cmdThreadFork(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
stepHash: CasRef,
|
||||
): Promise<ThreadForkOutput> {
|
||||
const headHash = await resolveHeadHash(storageRoot, threadId);
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
|
||||
// Verify stepHash belongs to this thread by walking the chain
|
||||
let found = false;
|
||||
let cur: CasRef | null = headHash;
|
||||
while (cur !== null) {
|
||||
if (cur === stepHash) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
const node = uwf.store.get(cur);
|
||||
if (node === null) break;
|
||||
if (node.type === uwf.schemas.startNode) {
|
||||
// startHash check
|
||||
if (cur === stepHash) {
|
||||
found = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
const payload = node.payload as StepNodePayload;
|
||||
cur = payload.prev;
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
fail(`step ${stepHash} not found in thread ${threadId}`);
|
||||
}
|
||||
|
||||
const newThreadId = generateUlid(Date.now()) as ThreadId;
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
index[newThreadId] = stepHash;
|
||||
await saveThreadsIndex(storageRoot, index);
|
||||
|
||||
return {
|
||||
thread: newThreadId,
|
||||
forkedFrom: {
|
||||
thread: threadId,
|
||||
step: stepHash,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise<KillOutput> {
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
const head = index[threadId];
|
||||
|
||||
@@ -16,14 +16,18 @@ export type {
|
||||
RoleDefinition,
|
||||
RoleName,
|
||||
Scenario,
|
||||
StartEntry,
|
||||
StartNodePayload,
|
||||
StartOutput,
|
||||
StepContext,
|
||||
StepEntry,
|
||||
StepNodePayload,
|
||||
StepOutput,
|
||||
StepRecord,
|
||||
ThreadForkOutput,
|
||||
ThreadId,
|
||||
ThreadListItem,
|
||||
ThreadStepsOutput,
|
||||
ThreadsIndex,
|
||||
Transition,
|
||||
WorkflowConfig,
|
||||
|
||||
@@ -80,6 +80,40 @@ export type StepOutput = {
|
||||
done: boolean;
|
||||
};
|
||||
|
||||
/** uwf thread steps — single step entry */
|
||||
export type StepEntry = {
|
||||
hash: CasRef;
|
||||
role: string;
|
||||
output: unknown;
|
||||
detail: CasRef;
|
||||
agent: string;
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
/** uwf thread steps — start entry */
|
||||
export type StartEntry = {
|
||||
hash: CasRef;
|
||||
workflow: CasRef;
|
||||
prompt: string;
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
/** uwf thread steps output */
|
||||
export type ThreadStepsOutput = {
|
||||
thread: ThreadId;
|
||||
workflow: CasRef;
|
||||
steps: [StartEntry, ...StepEntry[]];
|
||||
};
|
||||
|
||||
/** uwf thread fork output */
|
||||
export type ThreadForkOutput = {
|
||||
thread: ThreadId;
|
||||
forkedFrom: {
|
||||
thread: ThreadId;
|
||||
step: CasRef;
|
||||
};
|
||||
};
|
||||
|
||||
/** uwf thread list */
|
||||
export type ThreadListItem = {
|
||||
thread: ThreadId;
|
||||
|
||||
Reference in New Issue
Block a user