diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index e7146be..f695e30 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -200,6 +200,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr thread: threadId, head: activeHead, done: false, + sessionId: null, }; } @@ -210,6 +211,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr thread: threadId, head: hist.head, done: true, + sessionId: null, }; } @@ -624,7 +626,12 @@ function resolveAgentConfig( return agentConfig; } -function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRef { +type SpawnAgentResult = { + stepHash: CasRef; + sessionId: string | null; +}; + +function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): SpawnAgentResult { const argv = [...agent.args, threadId, role]; let stdout: string; try { @@ -646,10 +653,24 @@ function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRe } const line = stdout.trim().split("\n").pop()?.trim() ?? ""; - if (!isCasRef(line)) { - fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`); + + // Try JSON output first (new protocol) + try { + const parsed = JSON.parse(line) as Record; + const stepHash = parsed.stepHash; + const sessionId = parsed.sessionId; + if (typeof stepHash === "string" && isCasRef(stepHash) && typeof sessionId === "string") { + return { stepHash, sessionId }; + } + } catch { + // Not JSON — fall through to legacy CAS hash parsing } - return line; + + // Legacy: plain CAS hash on stdout + if (!isCasRef(line)) { + fail(`agent stdout is not a valid CAS hash or JSON: ${line || "(empty)"}`); + } + return { stepHash: line, sessionId: null }; } async function archiveThread( @@ -698,6 +719,7 @@ export async function cmdThreadStep( thread: threadId, head: headHash, done: true, + sessionId: null, }; } @@ -706,7 +728,7 @@ export async function cmdThreadStep( const agent = resolveAgentConfig(config, workflow, role, agentOverride); loadDotenv({ path: getEnvPath(storageRoot) }); - const newHead = spawnAgent(agent, threadId, role); + const { stepHash: newHead, sessionId } = spawnAgent(agent, threadId, role); // Re-create store to pick up nodes written by the agent subprocess const uwfAfter = await createUwfStore(storageRoot); @@ -737,6 +759,7 @@ export async function cmdThreadStep( thread: threadId, head: newHead, done, + sessionId, }; } diff --git a/packages/workflow-agent-hermes/src/hermes.ts b/packages/workflow-agent-hermes/src/hermes.ts index 3311da8..5264934 100644 --- a/packages/workflow-agent-hermes/src/hermes.ts +++ b/packages/workflow-agent-hermes/src/hermes.ts @@ -1,4 +1,5 @@ import { spawn } from "node:child_process"; +import type { Store } from "@uncaged/json-cas"; import { type AgentContext, @@ -10,7 +11,6 @@ import { import { loadHermesSession, parseSessionIdFromStdout, - storeHermesRawOutput, storeHermesSessionDetail, } from "./session-detail.js"; @@ -52,17 +52,8 @@ export function buildHermesPrompt(ctx: AgentContext): string { return parts.join("\n"); } -function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> { +function spawnHermes(args: string[]): Promise<{ stdout: string; stderr: string }> { return new Promise((resolve, reject) => { - const args = [ - "chat", - "-q", - prompt, - "--yolo", - "--max-turns", - String(HERMES_MAX_TURNS), - "--quiet", - ]; const child = spawn(HERMES_COMMAND, args, { env: process.env, shell: false, @@ -94,23 +85,73 @@ function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: stri }); } +function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> { + return spawnHermes([ + "chat", + "-q", + prompt, + "--yolo", + "--max-turns", + String(HERMES_MAX_TURNS), + "--quiet", + ]); +} + +function spawnHermesResume( + sessionId: string, + message: string, +): Promise<{ stdout: string; stderr: string }> { + return spawnHermes([ + "chat", + "--resume", + sessionId, + "-q", + message, + "--yolo", + "--max-turns", + String(HERMES_MAX_TURNS), + "--quiet", + ]); +} + +function parseSessionId(stdout: string, stderr: string): string { + const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); + if (sessionId === null) { + throw new Error( + "Failed to parse session_id from hermes output.\n" + + `stderr (first 200 chars): ${stderr.slice(0, 200)}\n` + + `stdout (first 200 chars): ${stdout.slice(0, 200)}`, + ); + } + return sessionId; +} + +async function buildResultFromSession(sessionId: string, store: Store): Promise { + const session = await loadHermesSession(sessionId); + if (session === null) { + throw new Error(`Failed to load hermes session file for session_id: ${sessionId}`); + } + const { detailHash, output } = await storeHermesSessionDetail(store, session); + return { output, detailHash, sessionId }; +} + async function runHermes(ctx: AgentContext): Promise { const fullPrompt = buildHermesPrompt(ctx); const { stdout, stderr } = await spawnHermesChat(fullPrompt); - const { store } = ctx; + const sessionId = parseSessionId(stdout, stderr); + return buildResultFromSession(sessionId, ctx.store); +} - // --quiet mode: session_id may be on stdout or stderr - const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); - if (sessionId !== null) { - const session = await loadHermesSession(sessionId); - if (session !== null) { - const { detailHash, output } = await storeHermesSessionDetail(store, session); - return { output, detailHash }; - } - } - - const detailHash = await storeHermesRawOutput(store, stdout); - return { output: stdout, detailHash }; +async function continueHermes( + sessionId: string, + message: string, + store: Store, +): Promise { + const { stdout, stderr } = await spawnHermesResume(sessionId, message); + // Resume may return a new session_id + const newSessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); + const resolvedId = newSessionId ?? sessionId; + return buildResultFromSession(resolvedId, store); } /** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */ @@ -118,5 +159,6 @@ export function createHermesAgent(): () => Promise { return createAgent({ name: "hermes", run: runHermes, + continue: continueHermes, }); } diff --git a/packages/workflow-agent-kit/src/index.ts b/packages/workflow-agent-kit/src/index.ts index 778d3f7..97fa132 100644 --- a/packages/workflow-agent-kit/src/index.ts +++ b/packages/workflow-agent-kit/src/index.ts @@ -12,4 +12,10 @@ export type { FrontmatterFastPathResult } from "./frontmatter.js"; export { tryFrontmatterFastPath } from "./frontmatter.js"; export { createAgent } from "./run.js"; export { getConfigPath, getEnvPath, loadWorkflowConfig } from "./storage.js"; -export type { AgentContext, AgentOptions, AgentRunFn, AgentRunResult } from "./types.js"; +export type { + AgentContext, + AgentContinueFn, + AgentOptions, + AgentRunFn, + AgentRunResult, +} from "./types.js"; diff --git a/packages/workflow-agent-kit/src/run.ts b/packages/workflow-agent-kit/src/run.ts index f6ba1ab..0083d21 100644 --- a/packages/workflow-agent-kit/src/run.ts +++ b/packages/workflow-agent-kit/src/run.ts @@ -3,11 +3,12 @@ import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protoc import { config as loadDotenv } from "dotenv"; import { buildOutputFormatInstruction } from "./build-output-format-instruction.js"; import { buildContextWithMeta } from "./context.js"; -import { extract } from "./extract.js"; import { tryFrontmatterFastPath } from "./frontmatter.js"; import type { AgentStore } from "./storage.js"; -import { getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js"; -import type { AgentContext, AgentOptions, AgentRunResult } from "./types.js"; +import { getEnvPath, resolveStorageRoot } from "./storage.js"; +import type { AgentOptions } from "./types.js"; + +const MAX_FRONTMATTER_RETRIES = 2; function fail(message: string): never { process.stderr.write(`${message}\n`); @@ -66,31 +67,16 @@ async function writeStepNode(options: { return hash; } -async function runAgent(options: AgentOptions, ctx: AgentContext): Promise { - return runWithMessage("agent run failed", () => options.run(ctx)); -} - -async function extractOutput( +async function tryExtractOutput( rawOutput: string, outputSchema: CasRef, - storageRoot: string, ctx: Awaited>, -): Promise { - const fastPath = await runWithMessage("frontmatter fast path", () => - tryFrontmatterFastPath(rawOutput, outputSchema, ctx.meta.store), - ).catch(() => null); - +): Promise { + const fastPath = await tryFrontmatterFastPath(rawOutput, outputSchema, ctx.meta.store); if (fastPath !== null) { return fastPath.outputHash; } - - const config = await runWithMessage("failed to load config", () => - loadWorkflowConfig(storageRoot), - ); - const extracted = await runWithMessage("extract failed", () => - extract(rawOutput, outputSchema, config), - ); - return extracted.hash; + return null; } async function persistStep(options: { @@ -112,10 +98,18 @@ async function persistStep(options: { }); } +export type AgentCliOutput = { + stepHash: CasRef; + sessionId: string; +}; + /** * Create an agent CLI entrypoint. * Parses argv (` `), runs the agent, extracts structured output, - * writes StepNode to CAS, and prints the new node hash to stdout. + * writes StepNode to CAS, and prints JSON result to stdout. + * + * If frontmatter extraction fails, retries up to MAX_FRONTMATTER_RETRIES times + * by calling agent.continue() with a correction message. */ export function createAgent(options: AgentOptions): () => Promise { return async function main(): Promise { @@ -135,13 +129,31 @@ export function createAgent(options: AgentOptions): () => Promise { ctx.outputFormatInstruction = buildOutputFormatInstruction(frontmatterSchema); } - const agentResult = await runAgent(options, ctx); - const outputHash = await extractOutput( - agentResult.output, - roleDef.frontmatter, - storageRoot, - ctx, - ); + let agentResult = await runWithMessage("agent run failed", () => options.run(ctx)); + + // Try to extract frontmatter; retry via continue if it fails + let outputHash = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx); + + for (let retry = 0; retry < MAX_FRONTMATTER_RETRIES && outputHash === null; retry++) { + const correctionMessage = + "Your previous response did not contain valid YAML frontmatter matching the role schema.\n" + + "You MUST begin your response with a YAML frontmatter block (--- delimited).\n" + + "Please output ONLY the corrected frontmatter block followed by your work."; + + agentResult = await runWithMessage("agent continue failed", () => + options.continue(agentResult.sessionId, correctionMessage, ctx.meta.store), + ); + outputHash = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx); + } + + if (outputHash === null) { + fail( + "Agent output does not contain valid YAML frontmatter matching the role schema " + + `after ${MAX_FRONTMATTER_RETRIES} retries.\n` + + `Raw output (first 500 chars): ${agentResult.output.slice(0, 500)}`, + ); + } + const stepHash = await persistStep({ ctx, outputHash, @@ -149,6 +161,7 @@ export function createAgent(options: AgentOptions): () => Promise { agentName: agentLabel(options.name), }); - process.stdout.write(`${stepHash}\n`); + const result: AgentCliOutput = { stepHash, sessionId: agentResult.sessionId }; + process.stdout.write(`${JSON.stringify(result)}\n`); }; } diff --git a/packages/workflow-agent-kit/src/types.ts b/packages/workflow-agent-kit/src/types.ts index 8c57d0b..c959524 100644 --- a/packages/workflow-agent-kit/src/types.ts +++ b/packages/workflow-agent-kit/src/types.ts @@ -17,11 +17,19 @@ export type AgentContext = ModeratorContext & { export type AgentRunResult = { output: string; detailHash: string; + sessionId: string; }; +export type AgentContinueFn = ( + sessionId: string, + message: string, + store: AgentContext["store"], +) => Promise; + export type AgentRunFn = (ctx: AgentContext) => Promise; export type AgentOptions = { name: string; run: AgentRunFn; + continue: AgentContinueFn; }; diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index 8960f0c..ec43431 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -81,6 +81,7 @@ export type StepOutput = { thread: ThreadId; head: CasRef; done: boolean; + sessionId: string | null; }; /** uwf thread steps — single step entry */