Merge pull request 'feat: thread steps + thread fork' (#345) from feat/342-thread-steps-fork into main

This commit is contained in:
2026-05-18 16:34:55 +00:00
4 changed files with 194 additions and 0 deletions
+27
View File
@@ -3,11 +3,13 @@
import { Command } from "commander";
import {
cmdThreadFork,
cmdThreadKill,
cmdThreadList,
cmdThreadShow,
cmdThreadStart,
cmdThreadStep,
cmdThreadSteps,
} from "./commands/thread.js";
import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js";
import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js";
@@ -144,6 +146,31 @@ thread
});
});
thread
.command("steps")
.description("List all steps in a thread")
.argument("<thread-id>", "Thread ULID")
.action((threadId: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadSteps(storageRoot, threadId);
writeOutput(result);
});
});
thread
.command("fork")
.description("Fork a thread from a specific step")
.argument("<thread-id>", "Thread ULID")
.argument("<step-hash>", "CAS hash of the step to fork from")
.action((threadId: string, stepHash: string) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdThreadFork(storageRoot, threadId, stepHash);
writeOutput(result);
});
});
program
.command("setup")
.description("Configure provider, model, and agent")
+129
View File
@@ -8,13 +8,17 @@ import type {
AgentConfig,
CasRef,
ModeratorContext,
StartEntry,
StartNodePayload,
StartOutput,
StepContext,
StepEntry,
StepNodePayload,
StepOutput,
ThreadForkOutput,
ThreadId,
ThreadListItem,
ThreadStepsOutput,
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/uwf-protocol";
@@ -437,6 +441,131 @@ export async function cmdThreadStep(
};
}
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId];
if (activeHead !== undefined) {
return activeHead;
}
const hist = await findThreadInHistory(storageRoot, threadId);
if (hist !== null) {
return hist.head;
}
fail(`thread not found: ${threadId}`);
}
export async function cmdThreadSteps(
storageRoot: string,
threadId: ThreadId,
): Promise<ThreadStepsOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const startNode = uwf.store.get(chain.startHash);
if (startNode === null) {
fail(`StartNode not found: ${chain.startHash}`);
}
const startEntry: StartEntry = {
hash: chain.startHash,
workflow: chain.start.workflow,
prompt: chain.start.prompt,
timestamp: startNode.timestamp,
};
const stepEntries: StepEntry[] = [];
// Walk again to get hashes for each step
let hash: CasRef | null = headHash;
const hashToNode = new Map<string, { payload: StepNodePayload; timestamp: number }>();
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null || node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
hashToNode.set(hash, { payload, timestamp: node.timestamp });
hash = payload.prev;
}
// Build chronological list with hashes
// Walk from start's next to head
let cur: CasRef | null = chain.headIsStart ? null : headHash;
const ordered: { hash: CasRef; payload: StepNodePayload; timestamp: number }[] = [];
while (cur !== null) {
const entry = hashToNode.get(cur);
if (entry === undefined) break;
ordered.push({ hash: cur, ...entry });
cur = entry.payload.prev;
}
ordered.reverse();
for (const item of ordered) {
stepEntries.push({
hash: item.hash,
role: item.payload.role,
output: expandOutput(uwf, item.payload.output),
detail: item.payload.detail,
agent: item.payload.agent,
timestamp: item.timestamp,
});
}
return {
thread: threadId,
workflow: chain.start.workflow,
steps: [startEntry, ...stepEntries],
};
}
export async function cmdThreadFork(
storageRoot: string,
threadId: ThreadId,
stepHash: CasRef,
): Promise<ThreadForkOutput> {
const headHash = await resolveHeadHash(storageRoot, threadId);
const uwf = await createUwfStore(storageRoot);
// Verify stepHash belongs to this thread by walking the chain
let found = false;
let cur: CasRef | null = headHash;
while (cur !== null) {
if (cur === stepHash) {
found = true;
break;
}
const node = uwf.store.get(cur);
if (node === null) break;
if (node.type === uwf.schemas.startNode) {
// startHash check
if (cur === stepHash) {
found = true;
}
break;
}
const payload = node.payload as StepNodePayload;
cur = payload.prev;
}
if (!found) {
fail(`step ${stepHash} not found in thread ${threadId}`);
}
const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash;
await saveThreadsIndex(storageRoot, index);
return {
thread: newThreadId,
forkedFrom: {
thread: threadId,
step: stepHash,
},
};
}
export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise<KillOutput> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
+4
View File
@@ -16,14 +16,18 @@ export type {
RoleDefinition,
RoleName,
Scenario,
StartEntry,
StartNodePayload,
StartOutput,
StepContext,
StepEntry,
StepNodePayload,
StepOutput,
StepRecord,
ThreadForkOutput,
ThreadId,
ThreadListItem,
ThreadStepsOutput,
ThreadsIndex,
Transition,
WorkflowConfig,
+34
View File
@@ -80,6 +80,40 @@ export type StepOutput = {
done: boolean;
};
/** uwf thread steps — single step entry */
export type StepEntry = {
hash: CasRef;
role: string;
output: unknown;
detail: CasRef;
agent: string;
timestamp: number;
};
/** uwf thread steps — start entry */
export type StartEntry = {
hash: CasRef;
workflow: CasRef;
prompt: string;
timestamp: number;
};
/** uwf thread steps output */
export type ThreadStepsOutput = {
thread: ThreadId;
workflow: CasRef;
steps: [StartEntry, ...StepEntry[]];
};
/** uwf thread fork output */
export type ThreadForkOutput = {
thread: ThreadId;
forkedFrom: {
thread: ThreadId;
step: CasRef;
};
};
/** uwf thread list */
export type ThreadListItem = {
thread: ThreadId;