feat(agent): adapter stdout JSON with full metadata (#566) (#569)

Co-authored-by: 小橘 <xiaoju@shazhou.work>
Co-committed-by: 小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-28 00:18:57 +00:00
committed by xiaoju
parent 7935b73374
commit 080b37c2be
9 changed files with 511 additions and 15 deletions
@@ -0,0 +1,174 @@
import { execFileSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js";
// ── schemas ──────────────────────────────────────────────────────────────────
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const, enum: ["done", "failed"] },
result: { type: "string" as const },
},
required: ["$status"],
additionalProperties: false,
};
// ── fixture ──────────────────────────────────────────────────────────────────
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-roundtrip-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
describe("C1: adapter JSON round-trip integration", () => {
test("mock agent outputs JSON, CLI parses it and updates thread head in CAS", async () => {
// 1. Set up CAS store with workflow, start node, and output schema
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.put(schemas.workflow, {
name: "test-roundtrip",
description: "roundtrip integration test",
roles: {
worker: {
description: "Worker role",
goal: "Do work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Do the work", location: null } },
worker: { done: { role: "$END", prompt: "completed", location: null } },
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Test round-trip task",
});
const threadId = "01ROUNDTRIPTEST0000000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: startHash });
// 2. Pre-create CAS nodes that the mock agent would produce
const outputHash = await store.put(outputSchemaHash, {
$status: "done",
result: "test-ok",
});
// Use text schema for detail (simple placeholder)
const detailHash = await store.put(schemas.text, "mock detail");
const startedAtMs = 1716600000000;
const completedAtMs = 1716600001500;
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Do the work",
startedAtMs,
completedAtMs,
cwd: tmpDir,
});
// 3. Create a minimal mock agent shell script that just outputs JSON
// The step node is already in CAS — the agent just needs to print the JSON line
const mockAgentPath = join(tmpDir, "mock-agent.sh");
const adapterJson = JSON.stringify({
stepHash,
detailHash,
role: "worker",
frontmatter: { $status: "done", result: "test-ok" },
body: "",
startedAtMs,
completedAtMs,
});
await writeFile(mockAgentPath, `#!/bin/sh\necho '${adapterJson}'\n`, { mode: 0o755 });
// 4. Write config.yaml
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
// 5. Run CLI with agent override pointing to our mock
const cliPath = join(import.meta.dirname, "..", "cli.js");
let stdout: string;
let stderr: string;
let exitCode: number;
try {
stdout = execFileSync(
"bun",
["run", cliPath, "thread", "exec", threadId, "--agent", mockAgentPath],
{
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, WORKFLOW_STORAGE_ROOT: tmpDir },
cwd: tmpDir,
timeout: 30000,
},
);
stderr = "";
exitCode = 0;
} catch (e: unknown) {
const err = e as NodeJS.ErrnoException & {
stdout?: string;
stderr?: string;
status?: number;
};
stdout = err.stdout ?? "";
stderr = err.stderr ?? "";
exitCode = err.status ?? 1;
}
// 6. Verify
if (exitCode !== 0) {
throw new Error(`CLI exited with code ${exitCode}\nstdout: ${stdout}\nstderr: ${stderr}`);
}
// Parse CLI output
const cliOutput = JSON.parse(stdout.trim());
expect(cliOutput).toHaveProperty("thread", threadId);
expect(cliOutput).toHaveProperty("head", stepHash);
expect(cliOutput.head).toMatch(/^[0-9A-HJ-NP-TV-Z]{13}$/);
// Verify the CAS step node exists and has correct metadata
const storeAfter = createFsStore(casDir);
const stepNode = storeAfter.get(cliOutput.head as CasRef);
expect(stepNode).not.toBeNull();
const payload = stepNode!.payload as StepNodePayload;
expect(payload.role).toBe("worker");
expect(payload.agent).toBe("uwf-mock");
expect(payload.startedAtMs).toBe(1716600000000);
expect(payload.completedAtMs).toBe(1716600001500);
expect(payload.output).toBe(outputHash);
expect(payload.detail).toBe(detailHash);
});
});
@@ -0,0 +1,100 @@
import { describe, expect, test } from "vitest";
/**
* B-group tests: validate JSON parsing logic used by spawnAgent.
*
* We test the parsing logic inline since spawnAgent is a private function.
* These tests verify the contract: last line of stdout must be valid JSON
* with a valid stepHash CasRef.
*/
const CASREF_PATTERN = /^[0-9A-HJ-NP-TV-Z]{13}$/;
function isCasRef(s: string): boolean {
return CASREF_PATTERN.test(s);
}
type AdapterOutput = {
stepHash: string;
detailHash: string;
role: string;
frontmatter: Record<string, unknown>;
body: string;
startedAtMs: number;
completedAtMs: number;
};
function parseAgentStdout(stdout: string): AdapterOutput {
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
throw new Error(`agent stdout last line is not valid JSON: ${line || "(empty)"}`);
}
const obj = parsed as Record<string, unknown>;
if (
typeof obj !== "object" ||
obj === null ||
typeof obj.stepHash !== "string" ||
!isCasRef(obj.stepHash as string)
) {
throw new Error(`agent stdout JSON missing valid stepHash: ${line}`);
}
return obj as unknown as AdapterOutput;
}
const VALID_OUTPUT: AdapterOutput = {
stepHash: "0123456789ABC",
detailHash: "DEFGH12345678",
role: "planner",
frontmatter: { $status: "ready", plan: "somehash" },
body: "Plan body",
startedAtMs: 1000,
completedAtMs: 2000,
};
describe("spawnAgent JSON parsing", () => {
test("B1. parses valid JSON from agent stdout", () => {
const stdout = JSON.stringify(VALID_OUTPUT) + "\n";
const result = parseAgentStdout(stdout);
expect(result.stepHash).toBe("0123456789ABC");
expect(result.detailHash).toBe("DEFGH12345678");
expect(result.role).toBe("planner");
expect(result.frontmatter).toEqual({ $status: "ready", plan: "somehash" });
expect(result.body).toBe("Plan body");
expect(result.startedAtMs).toBe(1000);
expect(result.completedAtMs).toBe(2000);
});
test("B2. extracts stepHash for head pointer", () => {
const stdout = JSON.stringify(VALID_OUTPUT) + "\n";
const result = parseAgentStdout(stdout);
expect(result.stepHash).toBe("0123456789ABC");
expect(isCasRef(result.stepHash)).toBe(true);
});
test("B3. handles debug lines before JSON", () => {
const debugLines = "[debug] loading context...\n[debug] running agent...\n";
const stdout = debugLines + JSON.stringify(VALID_OUTPUT) + "\n";
const result = parseAgentStdout(stdout);
expect(result.stepHash).toBe("0123456789ABC");
});
test("B4. rejects non-JSON last line", () => {
const stdout = "not-json-at-all\n";
expect(() => parseAgentStdout(stdout)).toThrow("not valid JSON");
});
test("B5. rejects JSON missing stepHash", () => {
const incomplete = { detailHash: "DEFGH12345678", role: "planner" };
const stdout = JSON.stringify(incomplete) + "\n";
expect(() => parseAgentStdout(stdout)).toThrow("missing valid stepHash");
});
test("B6. rejects JSON with invalid stepHash", () => {
const bad = { ...VALID_OUTPUT, stepHash: "not-a-hash" };
const stdout = JSON.stringify(bad) + "\n";
expect(() => parseAgentStdout(stdout)).toThrow("missing valid stepHash");
});
});
+19 -5
View File
@@ -23,6 +23,7 @@ import {
generateUlid,
type ProcessLogger,
} from "@uncaged/workflow-util";
import type { AdapterOutput } from "@uncaged/workflow-util-agent";
import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-util-agent";
import { config as loadDotenv } from "dotenv";
import { parse } from "yaml";
@@ -788,7 +789,7 @@ function spawnAgent(
role: string,
edgePrompt: string,
cwd: string,
): CasRef {
): AdapterOutput {
const argv = [...agent.args, "--thread", threadId, "--role", role, "--prompt", edgePrompt];
let stdout: string;
try {
@@ -811,10 +812,22 @@ function spawnAgent(
}
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
if (!isCasRef(line)) {
failStep(plog, `agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
failStep(plog, `agent stdout last line is not valid JSON: ${line || "(empty)"}`);
}
return line;
const obj = parsed as Record<string, unknown>;
if (
typeof obj !== "object" ||
obj === null ||
typeof obj.stepHash !== "string" ||
!isCasRef(obj.stepHash as string)
) {
failStep(plog, `agent stdout JSON missing valid stepHash: ${line}`);
}
return obj as unknown as AdapterOutput;
}
async function archiveThread(
@@ -1019,7 +1032,8 @@ async function cmdThreadStepOnce(
});
loadDotenv({ path: getEnvPath(storageRoot) });
const newHead = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd);
const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd);
const newHead = agentResult.stepHash as CasRef;
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
@@ -0,0 +1,72 @@
import { createMemoryStore, putSchema } from "@uncaged/json-cas";
import { describe, expect, test } from "vitest";
import { tryFrontmatterFastPath } from "../src/frontmatter.js";
// ── Helpers ───────────────────────────────────────────────────────────────────
const PLANNER_SCHEMA = {
type: "object",
properties: {
$status: { type: "string", enum: ["ready", "failed"] },
plan: { type: "string" },
},
required: ["$status"],
additionalProperties: false,
};
describe("adapter-stdout: A4 retry loop survives JSON output", () => {
test("A4. first extraction fails, second succeeds — final result has correct data", async () => {
const store = createMemoryStore();
const schemaHash = await putSchema(store, PLANNER_SCHEMA);
// Simulate the retry loop from createAgent (run.ts lines 163-173):
// First attempt: agent outputs garbage (no frontmatter)
const badOutput = "Here is my response without frontmatter.\nJust plain text.";
const firstAttempt = await tryFrontmatterFastPath(badOutput, schemaHash, store);
expect(firstAttempt).toBeNull();
// Second attempt (after correction message): agent outputs valid frontmatter
const goodOutput = `---\n$status: ready\nplan: corrected-hash\n---\nCorrected body with valid frontmatter.`;
const secondAttempt = await tryFrontmatterFastPath(goodOutput, schemaHash, store);
expect(secondAttempt).not.toBeNull();
expect(secondAttempt!.outputHash).toMatch(/^[0-9A-Z]{13}$/);
expect(secondAttempt!.frontmatter).toEqual({ $status: "ready", plan: "corrected-hash" });
expect(secondAttempt!.body).toBe("Corrected body with valid frontmatter.");
// Verify the final AdapterOutput shape would be correct
const adapterOutput = {
stepHash: "MOCK_STEP_HASH",
detailHash: "MOCK_DETAIL_HA",
role: "planner",
frontmatter: secondAttempt!.frontmatter,
body: secondAttempt!.body,
startedAtMs: 1000,
completedAtMs: 2000,
};
const json = JSON.stringify(adapterOutput);
const parsed = JSON.parse(json);
expect(parsed.frontmatter).toEqual({ $status: "ready", plan: "corrected-hash" });
expect(parsed.body).toBe("Corrected body with valid frontmatter.");
expect(parsed.completedAtMs).toBeGreaterThanOrEqual(parsed.startedAtMs);
});
test("A4. all retries fail — extraction returns null on every attempt", async () => {
const store = createMemoryStore();
const schemaHash = await putSchema(store, PLANNER_SCHEMA);
const MAX_RETRIES = 2;
const badOutput = "No frontmatter here";
// Simulate MAX_FRONTMATTER_RETRIES iterations all failing
let extracted = await tryFrontmatterFastPath(badOutput, schemaHash, store);
for (let retry = 0; retry < MAX_RETRIES && extracted === null; retry++) {
// Each retry also gets bad output
extracted = await tryFrontmatterFastPath(badOutput, schemaHash, store);
}
expect(extracted).toBeNull();
});
});
@@ -0,0 +1,105 @@
import { createMemoryStore, putSchema } from "@uncaged/json-cas";
import { describe, expect, test } from "vitest";
import { tryFrontmatterFastPath } from "../src/frontmatter.js";
// ── Helpers ───────────────────────────────────────────────────────────────────
const PLANNER_SCHEMA = {
type: "object",
properties: {
$status: { type: "string", enum: ["ready", "failed"] },
plan: { type: "string" },
},
required: ["$status"],
additionalProperties: false,
};
const FRONTMATTER_SCHEMA = {
type: "object",
properties: {
status: { anyOf: [{ type: "string" }, { type: "null" }] },
next: { anyOf: [{ type: "string" }, { type: "null" }] },
confidence: { anyOf: [{ type: "number" }, { type: "null" }] },
artifacts: { type: "array", items: { type: "string" } },
scope: { type: "string" },
},
required: ["status", "next", "confidence", "artifacts", "scope"],
additionalProperties: false,
};
describe("adapter-stdout: FrontmatterFastPathResult includes frontmatter", () => {
test("A2. frontmatter field contains the parsed YAML frontmatter object", async () => {
const store = createMemoryStore();
const schemaHash = await putSchema(store, PLANNER_SCHEMA);
const raw = `---\n$status: ready\nplan: abc123\n---\nSome body text`;
const result = await tryFrontmatterFastPath(raw, schemaHash, store);
expect(result).not.toBeNull();
expect(result!.frontmatter).toEqual({ $status: "ready", plan: "abc123" });
});
test("A3. body field contains the markdown body after frontmatter", async () => {
const store = createMemoryStore();
const schemaHash = await putSchema(store, PLANNER_SCHEMA);
const raw = `---\n$status: ready\nplan: hash123\n---\nHere is the body.\n\nWith multiple paragraphs.`;
const result = await tryFrontmatterFastPath(raw, schemaHash, store);
expect(result).not.toBeNull();
expect(result!.body).toBe("Here is the body.\n\nWith multiple paragraphs.");
});
test("A1. result contains outputHash as valid CasRef", async () => {
const store = createMemoryStore();
const schemaHash = await putSchema(store, FRONTMATTER_SCHEMA);
const raw = `---\nstatus: done\nnext: null\nconfidence: 0.9\nartifacts: []\nscope: test\n---\nBody`;
const result = await tryFrontmatterFastPath(raw, schemaHash, store);
expect(result).not.toBeNull();
expect(result!.outputHash).toMatch(/^[0-9A-Z]{13}$/);
expect(result!.frontmatter).toBeDefined();
expect(result!.body).toBe("Body");
});
});
describe("adapter-stdout: AdapterOutput JSON shape", () => {
test("A5. JSON.stringify produces valid parseable JSON with all fields", () => {
const output = {
stepHash: "0123456789ABC",
detailHash: "DEFGH12345678",
role: "planner",
frontmatter: { $status: "ready", plan: "somehash" },
body: "Plan body text",
startedAtMs: 1000,
completedAtMs: 2000,
};
const json = JSON.stringify(output);
const parsed = JSON.parse(json);
expect(parsed.stepHash).toBe("0123456789ABC");
expect(parsed.detailHash).toBe("DEFGH12345678");
expect(parsed.role).toBe("planner");
expect(parsed.frontmatter).toEqual({ $status: "ready", plan: "somehash" });
expect(parsed.body).toBe("Plan body text");
expect(parsed.startedAtMs).toBe(1000);
expect(parsed.completedAtMs).toBe(2000);
});
test("completedAtMs >= startedAtMs", () => {
const output = {
stepHash: "0123456789ABC",
detailHash: "DEFGH12345678",
role: "planner",
frontmatter: {},
body: "",
startedAtMs: 1000,
completedAtMs: 2000,
};
expect(output.completedAtMs).toBeGreaterThanOrEqual(output.startedAtMs);
});
});
@@ -20,6 +20,7 @@ type StandardKey = (typeof STANDARD_KEYS)[number];
export type FrontmatterFastPathResult = {
body: string;
outputHash: CasRef;
frontmatter: Record<string, unknown>;
};
function extractYamlBlock(raw: string): string | null {
@@ -176,5 +177,5 @@ export async function tryFrontmatterFastPath(
return null;
}
return { body, outputHash };
return { body, outputHash, frontmatter: candidate };
}
@@ -15,6 +15,7 @@ export { createAgent, parseArgv } from "./run.js";
export { getCachedSessionId, getCachePath, setCachedSessionId } from "./session-cache.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type {
AdapterOutput,
AgentContext,
AgentContinueFn,
AgentOptions,
+28 -9
View File
@@ -6,7 +6,7 @@ import { buildContextWithMeta } from "./context.js";
import { tryFrontmatterFastPath } from "./frontmatter.js";
import type { AgentStore } from "./storage.js";
import { getEnvPath, resolveStorageRoot } from "./storage.js";
import type { AgentOptions } from "./types.js";
import type { AdapterOutput, AgentOptions } from "./types.js";
const MAX_FRONTMATTER_RETRIES = 2;
@@ -85,14 +85,24 @@ async function writeStepNode(options: {
return hash;
}
type ExtractedOutput = {
outputHash: CasRef;
frontmatter: Record<string, unknown>;
body: string;
};
async function tryExtractOutput(
rawOutput: string,
outputSchema: CasRef,
ctx: Awaited<ReturnType<typeof buildContextWithMeta>>,
): Promise<CasRef | null> {
): Promise<ExtractedOutput | null> {
const fastPath = await tryFrontmatterFastPath(rawOutput, outputSchema, ctx.meta.store);
if (fastPath !== null) {
return fastPath.outputHash;
return {
outputHash: fastPath.outputHash,
frontmatter: fastPath.frontmatter,
body: fastPath.body,
};
}
return null;
}
@@ -148,9 +158,9 @@ export function createAgent(options: AgentOptions): () => Promise<void> {
const primaryDetailHash = agentResult.detailHash;
// Try to extract frontmatter; retry via continue if it fails
let outputHash = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx);
let extracted = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx);
for (let retry = 0; retry < MAX_FRONTMATTER_RETRIES && outputHash === null; retry++) {
for (let retry = 0; retry < MAX_FRONTMATTER_RETRIES && extracted === null; retry++) {
const correctionMessage =
"Your previous response did not contain valid YAML frontmatter matching the role schema.\n" +
"You MUST begin your response with a YAML frontmatter block (--- delimited).\n" +
@@ -159,10 +169,10 @@ export function createAgent(options: AgentOptions): () => Promise<void> {
agentResult = await runWithMessage("agent continue failed", () =>
options.continue(agentResult.sessionId, correctionMessage, ctx.meta.store),
);
outputHash = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx);
extracted = await tryExtractOutput(agentResult.output, roleDef.frontmatter, ctx);
}
if (outputHash === null) {
if (extracted === null) {
fail(
"Agent output does not contain valid YAML frontmatter matching the role schema " +
`after ${MAX_FRONTMATTER_RETRIES} retries.\n` +
@@ -172,13 +182,22 @@ export function createAgent(options: AgentOptions): () => Promise<void> {
const completedAtMs = Date.now();
const stepHash = await persistStep({
ctx,
outputHash,
outputHash: extracted.outputHash,
detailHash: primaryDetailHash,
agentName: agentLabel(options.name),
startedAtMs,
completedAtMs,
});
process.stdout.write(`${stepHash}\n`);
const adapterOutput: AdapterOutput = {
stepHash,
detailHash: primaryDetailHash,
role,
frontmatter: extracted.frontmatter,
body: extracted.body,
startedAtMs,
completedAtMs,
};
process.stdout.write(`${JSON.stringify(adapterOutput)}\n`);
};
}
+10
View File
@@ -37,6 +37,16 @@ export type AgentContinueFn = (
export type AgentRunFn = (ctx: AgentContext) => Promise<AgentRunResult>;
export type AdapterOutput = {
stepHash: string;
detailHash: string;
role: string;
frontmatter: Record<string, unknown>;
body: string;
startedAtMs: number;
completedAtMs: number;
};
export type AgentOptions = {
name: string;
run: AgentRunFn;