import { execFileSync, spawn } from "node:child_process"; import { access, readFile } from "node:fs/promises"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; import { validate } from "@ocas/core"; import type { AgentAlias, AgentConfig, CasRef, StartNodePayload, StartOutput, StepNodePayload, StepOutput, ThreadId, ThreadListItem, ThreadStatus, ThreadsIndex, WorkflowConfig, WorkflowPayload, } from "@uncaged/workflow-protocol"; import { createProcessLogger, extractUlidTimestamp, generateUlid, type ProcessLogger, } from "@uncaged/workflow-util"; import type { AdapterOutput } from "@uncaged/workflow-util-agent"; import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-util-agent"; import { config as loadDotenv } from "dotenv"; import { parse } from "yaml"; import { createMarker, deleteMarker, isThreadRunning } from "../background/index.js"; import { createIncludeTag } from "../include.js"; import { evaluate, isSuspendResult } from "../moderator/index.js"; import { appendThreadHistory, createUwfStore, findThreadInHistory, loadThreadHistory, loadThreadsIndex, loadWorkflowRegistry, resolveWorkflowHash, saveThreadsIndex, type ThreadHistoryLine, type UwfStore, } from "../store.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; import { validateWorkflow } from "../validate-semantic.js"; import { type ChainState, collectOrderedSteps, expandOutput, fail, type OrderedStepItem, walkChain, } from "./shared.js"; import { materializeWorkflowPayload } from "./workflow.js"; const END_ROLE = "$END"; const START_ROLE = "$START"; export const THREAD_READ_DEFAULT_QUOTA = 4000; function buildStepOutputFromEvaluation( workflowHash: CasRef, threadId: ThreadId, head: CasRef, status: ThreadStatus, evaluation: ReturnType, background: boolean | null, ): StepOutput { const done = status === "completed"; let currentRole: string | null = null; if (evaluation.ok && !isSuspendResult(evaluation.value) && evaluation.value.role !== END_ROLE) { currentRole = evaluation.value.role; } return { workflow: workflowHash, thread: threadId, head, status, currentRole, done, background, }; } async function resolveActiveThreadStatus( storageRoot: string, threadId: ThreadId, uwf: UwfStore, head: CasRef, workflowRef: CasRef, ): Promise { const runningMarker = await isThreadRunning(storageRoot, threadId); if (runningMarker !== null) { return "running"; } const chain = walkChain(uwf, head); const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); const workflow = loadWorkflowPayload(uwf, workflowRef); const result = evaluate(workflow.graph, lastRole, lastOutput); if (result.ok && isSuspendResult(result.value)) { return "suspended"; } return "idle"; } /** * Derive the current/next role from the workflow graph and chain state. * Returns null when the next role is $END, thread is suspended, or evaluation fails. */ function resolveCurrentRole(uwf: UwfStore, head: CasRef, workflowRef: CasRef): string | null { const chain = walkChain(uwf, head); const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); const workflow = loadWorkflowPayload(uwf, workflowRef); const result = evaluate(workflow.graph, lastRole, lastOutput); if (!result.ok) { return null; } if (isSuspendResult(result.value) || result.value.role === END_ROLE) { return null; } return result.value.role; } const PL_THREAD_START = "7HNQ4B2X"; const PL_MODERATOR = "M3K8V9T1"; const PL_AGENT_SPAWN = "R5J2W8N4"; const PL_AGENT_DONE = "C6P9E3H7"; const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_STEP_ERROR = "B8T5N1V6"; const PL_BACKGROUND_START = "X7Q4W9M2"; function failStep(plog: ProcessLogger, message: string): never { plog.log(PL_STEP_ERROR, message, null); fail(message); } /** * Check if a string looks like a file path (contains path separators or has .yaml/.yml extension). */ function isFilePath(input: string): boolean { return ( input.includes("/") || input.includes("\\") || input.endsWith(".yaml") || input.endsWith(".yml") ); } /** * Check if a workflow file exists at the given path. */ async function workflowFileExists(dir: string, name: string, ext: string): Promise { const candidate = resolvePath(dir, `${name}${ext}`); try { await access(candidate); return candidate; } catch { return null; } } /** * Search for a workflow file in a given directory (checks both .workflow/ and .workflows/). */ async function findWorkflowInDir(dir: string, name: string): Promise { // Check .workflow/ directory first (preferred) for (const ext of [".yaml", ".yml"]) { const result = await workflowFileExists(resolvePath(dir, ".workflow"), name, ext); if (result !== null) { return result; } } for (const indexName of ["index.yaml", "index.yml"]) { const candidate = resolvePath(dir, ".workflow", name, indexName); try { await access(candidate); return candidate; } catch { /* not found */ } } // Check .workflows/ directory as fallback (legacy) for (const ext of [".yaml", ".yml"]) { const result = await workflowFileExists(resolvePath(dir, ".workflows"), name, ext); if (result !== null) { return result; } } for (const indexName of ["index.yaml", "index.yml"]) { const candidate = resolvePath(dir, ".workflows", name, indexName); try { await access(candidate); return candidate; } catch { /* not found */ } } return null; } /** * Traverse parent directories looking for `.workflow/.yaml` or `.workflow/.yml`. * Returns the absolute path if found, otherwise null. * Stops at filesystem root or .git directory. */ async function findWorkflowInParents(startDir: string, name: string): Promise { let currentDir = resolvePath(startDir); const root = resolvePath("/"); while (true) { const found = await findWorkflowInDir(currentDir, name); if (found !== null) { return found; } // Stop at filesystem root if (currentDir === root) { break; } // Move to parent directory const parentDir = dirname(currentDir); if (parentDir === currentDir) { // Reached filesystem root break; } currentDir = parentDir; } return null; } async function materializeLocalWorkflow(uwf: UwfStore, filePath: string): Promise { let text: string; try { text = await readFile(filePath, "utf8"); } catch { fail(`project workflow file not found: ${filePath}`); } let raw: unknown; try { raw = parse(text, { customTags: [createIncludeTag(dirname(filePath))] }) as unknown; } catch (e) { fail(`invalid YAML in ${filePath}: ${e instanceof Error ? e.message : String(e)}`); } const payload = parseWorkflowPayload(raw); if (payload === null) { fail(`invalid workflow YAML in ${filePath}: expected WorkflowPayload shape`); } const filenameError = checkWorkflowFilenameConsistency(filePath, payload); if (filenameError !== null) { fail(filenameError); } const semanticErrors = validateWorkflow(payload); if (semanticErrors.length > 0) { fail(`workflow validation failed:\n${semanticErrors.map((e) => ` - ${e}`).join("\n")}`); } const materialized = await materializeWorkflowPayload(uwf, payload); const hash = await uwf.store.put(uwf.schemas.workflow, materialized); const stored = uwf.store.get(hash); if (stored === null || !validate(uwf.store, stored)) { fail("stored local workflow failed schema validation"); } return hash; } async function resolveWorkflowCasRef( uwf: UwfStore, storageRoot: string, workflowId: string, projectRoot: string, ): Promise { // Validate input const trimmed = workflowId.trim(); if (trimmed === "") { fail("workflow ID cannot be empty"); } // Strategy 1: Direct CAS hash if (isCasRef(trimmed)) { const node = uwf.store.get(trimmed); if (node === null) { fail(`CAS node not found: ${trimmed}`); } if (node.type !== uwf.schemas.workflow) { fail(`node ${trimmed} is not a Workflow (type ${node.type})`); } return trimmed; } // Strategy 2: Explicit file path (relative or absolute) if (isFilePath(trimmed)) { const absolutePath = isAbsolute(trimmed) ? trimmed : resolvePath(projectRoot, trimmed); return materializeLocalWorkflow(uwf, absolutePath); } // Strategy 3: Local discovery (parent directory traversal) const localPath = await findWorkflowInParents(projectRoot, trimmed); if (localPath !== null) { return materializeLocalWorkflow(uwf, localPath); } // Strategy 4: Global registry fallback const registry = await loadWorkflowRegistry(storageRoot); const hash = resolveWorkflowHash(registry, trimmed); if (!isCasRef(hash)) { fail(`workflow not found: ${trimmed}`); } const node = uwf.store.get(hash); if (node === null) { fail(`CAS node not found: ${hash}`); } if (node.type !== uwf.schemas.workflow) { fail(`node ${hash} is not a Workflow (type ${node.type})`); } return hash; } function resolveWorkflowFromHead(uwf: UwfStore, head: CasRef): CasRef | null { const node = uwf.store.get(head); if (node === null) { return null; } if (node.type === uwf.schemas.startNode) { const payload = node.payload as StartNodePayload; return payload.workflow; } const payload = node.payload as StepNodePayload; if (typeof payload.start !== "string") { return null; } const startNode = uwf.store.get(payload.start); if (startNode === null || startNode.type !== uwf.schemas.startNode) { return null; } return (startNode.payload as StartNodePayload).workflow; } export async function cmdThreadStart( storageRoot: string, workflowId: string, prompt: string, projectRoot: string, cwd: string = process.cwd(), ): Promise { // Validate cwd is an absolute path if (!isAbsolute(cwd)) { fail("cwd must be an absolute path"); } const uwf = await createUwfStore(storageRoot); const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot); const threadId = generateUlid(Date.now()) as ThreadId; const plog = createProcessLogger({ storageRoot, context: { thread: threadId, workflow: workflowHash }, }); const startPayload: StartNodePayload = { workflow: workflowHash, prompt, cwd, }; const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); const node = uwf.store.get(headHash); if (node === null || !validate(uwf.store, node)) { fail("stored StartNode failed schema validation"); } const index = await loadThreadsIndex(storageRoot); index[threadId] = headHash; await saveThreadsIndex(storageRoot, index); plog.log( PL_THREAD_START, `thread created workflow=${workflowHash} thread=${threadId} head=${headHash}`, null, ); return { workflow: workflowHash, thread: threadId }; } export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const activeHead = index[threadId]; if (activeHead !== undefined) { const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, activeHead); if (workflow === null) { fail(`failed to resolve workflow from head: ${activeHead}`); } const status = await resolveActiveThreadStatus( storageRoot, threadId, uwf, activeHead, workflow, ); const currentRole = resolveCurrentRole(uwf, activeHead, workflow); return { workflow, thread: threadId, head: activeHead, status, currentRole, done: false, background: null, }; } const hist = await findThreadInHistory(storageRoot, threadId); if (hist !== null) { const status: ThreadStatus = hist.reason === "cancelled" ? "cancelled" : "completed"; return { workflow: hist.workflow, thread: threadId, head: hist.head, status, currentRole: null, done: true, background: null, }; } fail(`thread not found: ${threadId}`); } export type ThreadListItemWithStatus = ThreadListItem & { status: ThreadStatus; currentRole: string | null; }; async function threadListItemFromActive( storageRoot: string, uwf: UwfStore, threadId: ThreadId, head: CasRef, ): Promise { const workflow = resolveWorkflowFromHead(uwf, head); if (workflow === null) { return null; } const status = await resolveActiveThreadStatus(storageRoot, threadId, uwf, head, workflow); return { thread: threadId, workflow, head, status, currentRole: resolveCurrentRole(uwf, head, workflow), }; } async function collectActiveThreads( storageRoot: string, uwf: UwfStore, index: ThreadsIndex, ): Promise { const items: ThreadListItemWithStatus[] = []; for (const [threadId, head] of Object.entries(index)) { const item = await threadListItemFromActive( storageRoot, uwf, threadId as ThreadId, head as CasRef, ); if (item !== null) { items.push(item); } } return items; } async function collectCompletedThreads( storageRoot: string, activeIds: Set, ): Promise { const items: ThreadListItemWithStatus[] = []; const history = await loadThreadHistory(storageRoot); const seen = new Set(); // Deduplication (issue #470) for (const entry of history) { if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { seen.add(entry.thread); items.push({ thread: entry.thread, workflow: entry.workflow, head: entry.head, status: entry.reason === "cancelled" ? "cancelled" : "completed", currentRole: null, }); } } return items; } function applyTimeFilters( items: ThreadListItemWithStatus[], afterMs: number | null, beforeMs: number | null, ): ThreadListItemWithStatus[] { if (afterMs === null && beforeMs === null) return items; return items.filter((item) => { const ts = extractUlidTimestamp(item.thread); if (ts === null) return false; if (afterMs !== null && ts <= afterMs) return false; if (beforeMs !== null && ts >= beforeMs) return false; return true; }); } function sortByNewestFirst(items: ThreadListItemWithStatus[]): ThreadListItemWithStatus[] { return items.sort((a, b) => { const tsA = extractUlidTimestamp(a.thread) ?? 0; const tsB = extractUlidTimestamp(b.thread) ?? 0; return tsB - tsA; }); } function applyPagination( items: ThreadListItemWithStatus[], skip: number | null, take: number | null, ): ThreadListItemWithStatus[] { const skipCount = skip ?? 0; const takeCount = take ?? items.length; return items.slice(skipCount, skipCount + takeCount); } export async function cmdThreadList( storageRoot: string, statusFilter: ThreadStatus[] | null, afterMs: number | null, beforeMs: number | null, skip: number | null, take: number | null, ): Promise { const uwf = await createUwfStore(storageRoot); const index = await loadThreadsIndex(storageRoot); // Collect active threads let items = await collectActiveThreads(storageRoot, uwf, index); // Collect completed threads (if relevant for status filter) const includeCompleted = statusFilter === null || statusFilter.includes("completed") || statusFilter.includes("cancelled"); if (includeCompleted) { const activeIds = new Set(items.map((i) => i.thread)); const completedItems = await collectCompletedThreads(storageRoot, activeIds); items = items.concat(completedItems); } // Apply status filter if (statusFilter !== null) { items = items.filter((item) => statusFilter.includes(item.status)); } // Apply time range filters items = applyTimeFilters(items, afterMs, beforeMs); // Sort by timestamp descending (newest first) items = sortByNewestFirst(items); // Apply pagination return applyPagination(items, skip, take); } export function extractLastAssistantContent(uwf: UwfStore, detailRef: CasRef): string | null { const detailNode = uwf.store.get(detailRef); if (detailNode === null) { return null; } const detail = detailNode.payload as Record; const turns = detail.turns; if (!Array.isArray(turns) || turns.length === 0) { return null; } for (let i = turns.length - 1; i >= 0; i--) { const turnRef = turns[i]; if (typeof turnRef !== "string") { continue; } const turnNode = uwf.store.get(turnRef as CasRef); if (turnNode === null) { continue; } const turn = turnNode.payload as Record; if ( turn.role === "assistant" && typeof turn.content === "string" && turn.content.trim() !== "" ) { return turn.content; } } return null; } function sliceBeforeHash( candidates: OrderedStepItem[], before: CasRef, threadId: ThreadId, ): OrderedStepItem[] { const idx = candidates.findIndex((s) => s.hash === before); if (idx === -1) { fail(`step ${before} not found in thread ${threadId}`); } return candidates.slice(0, idx); } function calculateFormattedStepLength( stepNum: number, item: OrderedStepItem, uwf: UwfStore, workflow: WorkflowPayload, ): number { // Calculate using the same format as formatStepHeader, formatStepPrompt, formatStepContent // Use a temporary set to avoid mutating the actual shownPromptRoles during calculation const tempShownRoles = new Set(); const header = formatStepHeader(stepNum, item); const roleDef = workflow.roles[item.payload.role]; const prompt = formatStepPrompt(roleDef, item.payload.role, tempShownRoles); const content = formatStepContent(uwf, item); const stepBlock = [header, prompt, content].filter((s) => s !== "").join(""); // Don't add separator here - it will be counted when we know the final structure return stepBlock.length; } function selectByQuota( candidates: OrderedStepItem[], uwf: UwfStore, workflow: WorkflowPayload, quota: number, startSectionLength: number, ): { selected: OrderedStepItem[]; skippedCount: number } { const selected: OrderedStepItem[] = []; // Start with start section length let totalChars = startSectionLength; for (let i = candidates.length - 1; i >= 0; i--) { const item = candidates[i]; if (item === undefined) continue; // Calculate the actual formatted length using the same format as final output const blockLen = calculateFormattedStepLength(i + 1, item, uwf, workflow); // Calculate cost of adding this step: // - blockLen: the step content // - 6: separator before this step (if there are already parts) const separatorCost = totalChars > 0 || selected.length > 0 ? 6 : 0; const addCost = blockLen + separatorCost; // Check quota BEFORE adding - but always include at least one step if (totalChars + addCost > quota && selected.length > 0) { break; } selected.unshift(item); totalChars += addCost; } return { selected, skippedCount: candidates.length - selected.length }; } function formatDuration(ms: number): string { if (ms < 1000) return `${ms}ms`; const seconds = ms / 1000; if (seconds < 60) return `${seconds.toFixed(1)}s`; const minutes = Math.floor(seconds / 60); const remainingSec = Math.round(seconds % 60); return `${minutes}m${remainingSec}s`; } function formatStepHeader(stepNum: number, item: OrderedStepItem): string { const ts = new Date(item.timestamp) .toISOString() .replace("T", " ") .replace(/\.\d+Z$/, ""); const durationMs = item.payload.completedAtMs - item.payload.startedAtMs; const duration = formatDuration(durationMs); return [ `## Step ${stepNum}: ${item.payload.role} \`${item.hash}\``, `**Agent:** ${item.payload.agent} | **Time:** ${ts} | **Duration:** ${duration}`, ].join("\n"); } function formatStepPrompt( roleDef: WorkflowPayload["roles"][string] | undefined, role: string, shownPromptRoles: Set, ): string { if (!roleDef || shownPromptRoles.has(role)) return ""; shownPromptRoles.add(role); return ["", "", "", roleDef.goal, ""].join("\n"); } function formatStepContent(uwf: UwfStore, item: OrderedStepItem): string { if (!item.payload.detail) return ""; const content = extractLastAssistantContent(uwf, item.payload.detail); if (content === null) return ""; return ["", "", "", content, ""].join("\n"); } function formatStartSection(options: { threadId: ThreadId; workflowName: string; workflowHash: CasRef; prompt: string; before: CasRef | null; showStart: boolean; }): string { if (options.before !== null && !options.showStart) return ""; return [ `# Thread \`${options.threadId}\``, "", `**Workflow:** ${options.workflowName} (\`${options.workflowHash}\`)`, "", "## Task", "", options.prompt, ].join("\n"); } function formatThreadReadMarkdown(options: { threadId: ThreadId; workflowName: string; workflowHash: CasRef; prompt: string; ordered: OrderedStepItem[]; uwf: UwfStore; workflow: WorkflowPayload; quota: number; before: CasRef | null; showStart: boolean; }): string { const { ordered, uwf, workflow, quota, before } = options; const candidates = before !== null ? sliceBeforeHash(ordered, before, options.threadId) : ordered; // Calculate start section length for quota accounting const startSection = formatStartSection(options); const startSectionLength = startSection !== "" ? startSection.length : 0; const { selected, skippedCount } = selectByQuota( candidates, uwf, workflow, quota, startSectionLength, ); const parts: string[] = []; if (startSection !== "") parts.push(startSection); if (skippedCount > 0 && selected.length > 0) { const firstSelected = selected[0]; if (firstSelected !== undefined) { parts.push( `*(${skippedCount} earlier step${skippedCount > 1 ? "s" : ""}, load with \`uwf thread read ${options.threadId} --before ${firstSelected.hash}\`)*`, ); } } const startIndex = candidates.length - selected.length; const shownPromptRoles = new Set(); for (let i = 0; i < selected.length; i++) { const item = selected[i]; if (item === undefined) continue; const stepNum = startIndex + i + 1; const roleDef = workflow.roles[item.payload.role]; const stepBlock = [ formatStepHeader(stepNum, item), formatStepPrompt(roleDef, item.payload.role, shownPromptRoles), formatStepContent(uwf, item), ] .filter((s) => s !== "") .join(""); parts.push(stepBlock); } return parts.join("\n\n---\n\n"); } type EvaluateLastOutput = Record; const STATUS_KEY = "$status"; function resolveEvaluateArgs( uwf: UwfStore, chain: ChainState, ): { lastRole: string; lastOutput: EvaluateLastOutput } { if (chain.headIsStart) { return { lastRole: START_ROLE, lastOutput: { [STATUS_KEY]: "_" } }; } const lastStep = chain.stepsNewestFirst[0]; if (lastStep === undefined) { fail("empty step chain"); } const raw = expandOutput(uwf, lastStep.output); const base = typeof raw === "object" && raw !== null && !Array.isArray(raw) ? (raw as Record) : {}; return { lastRole: lastStep.role, lastOutput: base, }; } function loadWorkflowPayload(uwf: UwfStore, workflowRef: CasRef): WorkflowPayload { const node = uwf.store.get(workflowRef); if (node === null) { fail(`workflow CAS node not found: ${workflowRef}`); } if (node.type !== uwf.schemas.workflow) { fail(`node ${workflowRef} is not a Workflow`); } return node.payload as WorkflowPayload; } function parseAgentOverride(override: string): AgentConfig { const parts = override .trim() .split(/\s+/) .filter((p) => p.length > 0); const command = parts[0]; if (command === undefined) { fail("agent override must not be empty"); } return { command, args: parts.slice(1) }; } function resolveAgentConfig( config: WorkflowConfig, workflow: WorkflowPayload, role: string, agentOverride: string | null, ): AgentConfig { if (agentOverride !== null) { return parseAgentOverride(agentOverride); } let alias: AgentAlias = config.defaultAgent; if (config.agentOverrides !== null) { const roleOverrides = config.agentOverrides[workflow.name]; if (roleOverrides !== undefined && roleOverrides[role] !== undefined) { alias = roleOverrides[role]; } } const agentConfig = config.agents[alias]; if (agentConfig === undefined) { fail(`unknown agent alias in config: ${alias}`); } return agentConfig; } function spawnAgent( plog: ProcessLogger, agent: AgentConfig, threadId: ThreadId, role: string, edgePrompt: string, cwd: string, ): AdapterOutput { const argv = [...agent.args, "--thread", threadId, "--role", role, "--prompt", edgePrompt]; let stdout: string; try { stdout = execFileSync(agent.command, argv, { encoding: "utf8", stdio: ["ignore", "pipe", "pipe"], maxBuffer: 50 * 1024 * 1024, // 50 MB — stream-json output can be large cwd, }); } catch (e) { const err = e as NodeJS.ErrnoException & { stderr?: Buffer | string | null }; const stderr = err.stderr == null ? "" : typeof err.stderr === "string" ? err.stderr : err.stderr.toString("utf8"); const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : ""; failStep(plog, `agent command failed (${agent.command})${detail}`); } const line = stdout.trim().split("\n").pop()?.trim() ?? ""; let parsed: unknown; try { parsed = JSON.parse(line); } catch { failStep(plog, `agent stdout last line is not valid JSON: ${line || "(empty)"}`); } const obj = parsed as Record; if ( typeof obj !== "object" || obj === null || typeof obj.stepHash !== "string" || !isCasRef(obj.stepHash as string) ) { failStep(plog, `agent stdout JSON missing valid stepHash: ${line}`); } return obj as unknown as AdapterOutput; } async function archiveThread( storageRoot: string, threadId: ThreadId, workflow: CasRef, head: CasRef, ): Promise { const index = await loadThreadsIndex(storageRoot); delete index[threadId]; await saveThreadsIndex(storageRoot, index); await appendThreadHistory(storageRoot, { thread: threadId, workflow, head, completedAt: Date.now(), reason: "completed", }); } export async function cmdThreadExec( storageRoot: string, threadId: ThreadId, agentOverride: string | null, count: number, background: boolean, backgroundWorker: boolean, ): Promise { if (count < 1 || !Number.isInteger(count)) { fail(`--count must be a positive integer, got: ${count}`); } // Check if thread is already running in background (unless we ARE the background worker) if (!backgroundWorker) { const runningMarker = await isThreadRunning(storageRoot, threadId); if (runningMarker !== null) { fail(`thread already executing in background (PID: ${runningMarker.pid})`); } } const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId); const plog = createProcessLogger({ storageRoot, context: { thread: threadId, workflow: workflowHash }, }); if (background && !backgroundWorker) { // Spawn background process return cmdThreadStepBackground(storageRoot, threadId, agentOverride, count, plog, workflowHash); } // If we're the background worker, create marker before execution let markerCreated = false; if (backgroundWorker) { await createMarker(storageRoot, { thread: threadId, workflow: workflowHash, pid: process.pid, startedAt: Date.now(), }); markerCreated = true; } try { const results: StepOutput[] = []; for (let i = 0; i < count; i++) { const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog); results.push(result); if (result.done || result.status === "suspended") { break; } } return results; } finally { // Cleanup marker if we created one if (markerCreated) { await deleteMarker(storageRoot, threadId); } } } async function resolveActiveThreadWorkflowHash( storageRoot: string, threadId: ThreadId, ): Promise { const index = await loadThreadsIndex(storageRoot); const headHash = index[threadId]; if (headHash === undefined) { fail(`thread not active: ${threadId}`); } const uwf = await createUwfStore(storageRoot); const chain = walkChain(uwf, headHash); return chain.start.workflow; } async function cmdThreadStepBackground( storageRoot: string, threadId: ThreadId, agentOverride: string | null, count: number, plog: ProcessLogger, workflowHash: CasRef, ): Promise { // Get current head to return to caller const index = await loadThreadsIndex(storageRoot); const headHash = index[threadId]; if (headHash === undefined) { failStep(plog, `thread not active: ${threadId}`); } const uwf = await createUwfStore(storageRoot); // Spawn detached background process const scriptPath = process.argv[1]; if (scriptPath === undefined) { failStep(plog, "unable to determine script path for background execution"); } const args = ["thread", "exec", threadId, "--count", String(count)]; if (agentOverride !== null) { args.push("--agent", agentOverride); } // Internal flag to signal the background worker to create/cleanup markers args.push("--_background-worker"); plog.log(PL_BACKGROUND_START, `spawning background process count=${count}`, null); const child = spawn(scriptPath, args, { detached: true, stdio: "ignore", }); child.unref(); // Return immediately with current state and background flag return [ { workflow: workflowHash, thread: threadId, head: headHash, status: "running", currentRole: resolveCurrentRole(uwf, headHash, workflowHash), done: false, background: true, }, ]; } async function cmdThreadStepOnce( storageRoot: string, threadId: ThreadId, agentOverride: string | null, plog: ProcessLogger, ): Promise { const index = await loadThreadsIndex(storageRoot); const headHash = index[threadId]; if (headHash === undefined) { failStep(plog, `thread not active: ${threadId}`); } const uwf = await createUwfStore(storageRoot); const chain = walkChain(uwf, headHash); const workflowHash = chain.start.workflow; const workflow = loadWorkflowPayload(uwf, workflowHash); const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); const nextResult = evaluate(workflow.graph, lastRole, lastOutput); if (!nextResult.ok) { failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`); } plog.log( PL_MODERATOR, `moderator ${ isSuspendResult(nextResult.value) ? `action=suspend suspendedRole=${nextResult.value.suspendedRole}` : `role=${nextResult.value.role}` } prompt=${nextResult.value.prompt}`, null, ); if (isSuspendResult(nextResult.value)) { return buildStepOutputFromEvaluation( workflowHash, threadId, headHash, "suspended", nextResult, null, ); } if (nextResult.value.role === END_ROLE) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null); await archiveThread(storageRoot, threadId, workflowHash, headHash); return { workflow: workflowHash, thread: threadId, head: headHash, status: "completed", currentRole: null, done: true, background: null, }; } const role = nextResult.value.role; const edgePrompt = nextResult.value.prompt; // Resolve cwd: use edge location if provided, otherwise inherit thread.cwd const threadCwd = chain.start.cwd; const effectiveCwd = nextResult.value.location !== null ? nextResult.value.location : threadCwd; const config = await loadWorkflowConfig(storageRoot); const agent = resolveAgentConfig(config, workflow, role, agentOverride); plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { args: [...agent.args, threadId, role].join(" "), }); loadDotenv({ path: getEnvPath(storageRoot) }); const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd); const newHead = agentResult.stepHash as CasRef; plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null); // Re-create store to pick up nodes written by the agent subprocess const uwfAfter = await createUwfStore(storageRoot); const newNode = uwfAfter.store.get(newHead); if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) { failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`); } // Reload threads index to avoid overwriting changes made by the agent subprocess const freshIndex = await loadThreadsIndex(storageRoot); freshIndex[threadId] = newHead; await saveThreadsIndex(storageRoot, freshIndex); const chainAfter = walkChain(uwfAfter, newHead); const { lastRole: lastRoleAfter, lastOutput: lastOutputAfter } = resolveEvaluateArgs( uwfAfter, chainAfter, ); const afterResult = evaluate(workflow.graph, lastRoleAfter, lastOutputAfter); if (!afterResult.ok) { failStep(plog, `post-step moderator evaluate failed: ${afterResult.error.message}`); } if (isSuspendResult(afterResult.value)) { return buildStepOutputFromEvaluation( workflowHash, threadId, newHead, "suspended", afterResult, null, ); } const done = afterResult.value.role === END_ROLE; if (done) { plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null); await archiveThread(storageRoot, threadId, workflowHash, newHead); } // Determine status based on whether thread is done and running state const status: ThreadStatus = done ? "completed" : "idle"; const currentRole = done ? null : afterResult.value.role; return { workflow: workflowHash, thread: threadId, head: newHead, status, currentRole, done, background: null, }; } async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const activeHead = index[threadId]; if (activeHead !== undefined) { return activeHead; } const hist = await findThreadInHistory(storageRoot, threadId); if (hist !== null) { return hist.head; } fail(`thread not found: ${threadId}`); } export async function cmdThreadRead( storageRoot: string, threadId: ThreadId, quota: number = THREAD_READ_DEFAULT_QUOTA, before: CasRef | null = null, showStart: boolean = false, ): Promise { const headHash = await resolveHeadHash(storageRoot, threadId); const uwf = await createUwfStore(storageRoot); const chain = walkChain(uwf, headHash); const workflow = loadWorkflowPayload(uwf, chain.start.workflow); const ordered = collectOrderedSteps(uwf, headHash, chain); return formatThreadReadMarkdown({ threadId, workflowName: workflow.name, workflowHash: chain.start.workflow, prompt: chain.start.prompt, ordered, uwf, workflow, quota, before, showStart, }); } export type StopOutput = { thread: ThreadId; stopped: boolean; }; export type CancelOutput = { thread: ThreadId; cancelled: boolean; }; /** * Stop background execution of a thread (but keep thread active) */ export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const head = index[threadId]; if (head === undefined) { fail(`thread not active: ${threadId}`); } // Check if thread is running in background and terminate it const runningMarker = await isThreadRunning(storageRoot, threadId); if (runningMarker === null) { process.stderr.write(`Warning: thread ${threadId} is not currently running\n`); return { thread: threadId, stopped: false }; } try { process.kill(runningMarker.pid, "SIGTERM"); } catch { // Process may have already exited, ignore error } await deleteMarker(storageRoot, threadId); return { thread: threadId, stopped: true }; } /** * Cancel a thread (stop execution + move to history) */ export async function cmdThreadCancel( storageRoot: string, threadId: ThreadId, ): Promise { const index = await loadThreadsIndex(storageRoot); const head = index[threadId]; if (head === undefined) { fail(`thread not active: ${threadId}`); } // Check if thread is running in background and terminate it const runningMarker = await isThreadRunning(storageRoot, threadId); if (runningMarker !== null) { try { process.kill(runningMarker.pid, "SIGTERM"); } catch { // Process may have already exited, ignore error } await deleteMarker(storageRoot, threadId); } const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, head); if (workflow === null) { fail(`failed to resolve workflow from head: ${head}`); } delete index[threadId]; await saveThreadsIndex(storageRoot, index); const historyEntry: ThreadHistoryLine = { thread: threadId, workflow, head, completedAt: Date.now(), reason: "cancelled", }; await appendThreadHistory(storageRoot, historyEntry); return { thread: threadId, cancelled: true }; }