Compare commits

...

18 Commits

Author SHA1 Message Date
xiaoju d0dc7b5a19 feat: add process-level debug logger (Phase 1)
- New ProcessLogger in workflow-util: process-scoped JSONL logger
- Entry schema: {ts, pid, tag, msg, thread, workflow}
- Storage: ~/.uncaged/workflow/logs/YYYY-MM-DD.jsonl
- Auto logs process init info (argv, node version, context)
- cli-workflow thread commands fully instrumented:
  - thread start/step, moderator evaluate, agent spawn/done
  - thread archived, error paths

Refs #411, #412, #410
2026-05-23 06:10:05 +00:00
xiaomo 187dd036e5 Merge pull request 'feat: replace edgePrompt null check with isFirstVisit (Phase 2)' (#409) from feat/405-phase2-find-last-role-index into main 2026-05-23 04:55:23 +00:00
xiaoju 4b45f4e6d1 feat: replace edgePrompt null check with isFirstVisit (Phase 2)
- Add isFirstVisit: boolean to AgentContext
- Compute from steps history: !steps.some(s => s.role === role)
- hermes.ts: use isFirstVisit for first-entry vs re-entry logic
- buildInitialPrompt: always append edgePrompt as Moderator Instruction
- edgePrompt is never blanked — always the real moderator instruction
- New tests for first-visit, re-entry, and fallback scenarios

Refs #405, #407, #404
2026-05-23 04:54:11 +00:00
xiaomo 2a6bce4918 Merge pull request 'feat: make edge prompt required (Phase 1)' (#408) from feat/405-edge-prompt-required into main 2026-05-23 04:36:53 +00:00
xiaoju 3d6399c0e3 feat: make edge prompt required (Phase 1)
- Transition.prompt: string | null → string
- EvaluateResult.prompt: string | null → string
- AgentContext.edgePrompt: string | null → string
- CLI YAML validation rejects missing prompt
- All tests updated

Phase 2 will replace edgePrompt === null checks with findLastRoleIndex.

Refs #405, #406, #404
2026-05-23 04:28:58 +00:00
xiaomo b9258f84a5 Merge pull request 'feat: edge prompt + session resume (#402)' (#403) from feat/402-edge-prompt-session-resume into main 2026-05-23 04:00:24 +00:00
xiaoju 638329a562 feat: edge prompt + session resume implementation (#402)
- buildContinuationPrompt: incremental prompt for role re-entry
- buildHermesPrompt: dual-mode (initial vs continuation)
- session-cache: thread:role → hermes sessionId mapping
- HermesAcpClient.resume(): session/resume JSON-RPC
- Fallback: cache miss or resume fail → initial prompt
- UWF_NO_RESUME env to skip cache
- solve-issue.yaml: reviewer→developer edge prompt
- Tests updated for EvaluateResult + continuation prompt

Refs #402
2026-05-23 03:57:04 +00:00
xiaoju 1a06e014f5 feat(protocol): add edge prompt to Transition + EvaluateResult (#402)
- Transition type gains prompt: string | null
- evaluate() returns EvaluateResult { role, prompt } instead of string
- normalizeGraph coerces prompt: undefined → null
- spawnAgent passes edge prompt via UWF_EDGE_PROMPT env
- AgentContext gains edgePrompt field

Refs #402
2026-05-23 03:49:15 +00:00
xiaoju d5d05334f5 fix: ACP client permission handling and process cleanup
Two bugs fixed:
1. request_permission messages (JSON-RPC requests with both id+method) were
   silently swallowed by the response handler, causing hermes to hang waiting
   for permission approval. Now properly distinguish responses (id only) from
   server requests (id+method).
2. uwf-hermes process never exited after completing because the hermes ACP
   subprocess was still alive. Now explicitly close the ACP client after
   agent completion so the subprocess terminates.

小橘 <xiaoju@shazhou.work>
2026-05-22 14:51:43 +00:00
xiaoju 844f5438fe fix: replace @agentclientprotocol/sdk with readline-based JSON-RPC
The official TS SDK's ndJsonStream hangs indefinitely on prompt()
for sessions with 20+ messages (solve-issue planner). Root cause
appears to be a stream backpressure issue in the SDK's ReadableStream
adapter.

Switch back to readline-based line parsing which reliably receives
all JSON-RPC responses. Also handle session/request_permission
inline (auto-approve, yolo mode equivalent).

Ref #398
2026-05-22 14:34:27 +00:00
xiaomo e329d74ec0 Merge pull request 'refactor: migrate hermes agent from stdout parsing to ACP protocol' (#401) from feat/398-hermes-acp-client into main 2026-05-22 13:16:46 +00:00
xiaoju f90614a622 feat: collect structured turns from ACP session updates
UwfAcpClient now tracks all session/update events:
- agent_message_chunk → assistant message content
- agent_thought_chunk → assistant reasoning
- tool_call → pending tool invocation (name + rawInput)
- tool_call_update (completed/failed) → assistant tool_call + tool result

Messages are accumulated across prompts (same session) and stored
via storeHermesSessionDetail, restoring the full structured detail
(turns with tool calls, reasoning) that was lost in the initial ACP
migration.

Ref #398
2026-05-22 13:13:02 +00:00
xiaoju 68af555313 fix: share ACP client across run/continue for session continuity
The client is now created once in createHermesAgent() and shared by
runHermes and continueHermes closures. This preserves conversation
context during frontmatter retry loops — continue() sends a follow-up
prompt on the same ACP session instead of starting a new one.

Client is cleaned up via process.on('exit').

Ref #398
2026-05-22 13:06:14 +00:00
xiaoju 025695dbe9 refactor: use @agentclientprotocol/sdk instead of hand-rolled JSON-RPC
Replace 250-line custom ACP client with official TypeScript SDK.
Uses ClientSideConnection + ndJsonStream for stdio transport.
Same public API (connect/prompt/close), 115 lines, zero custom protocol code.

Ref #398
2026-05-22 12:58:55 +00:00
xiaoju 96584e481f refactor: replace spawnHermes with HermesAcpClient
Remove spawnHermes, spawnHermesChat, spawnHermesResume, parseSessionId,
and buildResultFromSession. runHermes and continueHermes now use
HermesAcpClient for structured JSON-RPC communication.

Session ID comes directly from ACP protocol, eliminating #380 race
condition. Agent output collected via streaming chunks instead of
session file loading.

Phase 2 of RFC #398
Fixes #380
2026-05-22 12:18:14 +00:00
xiaoju 766ec7ddc2 feat: add HermesAcpClient for structured ACP communication
Implements JSON-RPC client that communicates with `hermes acp` via
stdin/stdout. Replaces fragile stdout/stderr parsing with structured
protocol: initialize → session/new → session/prompt → collect chunks.

Session ID comes directly from protocol response, eliminating the
race condition in #380.

Phase 1 of RFC #398
2026-05-22 12:15:09 +00:00
xiaoju aeb7180e9d chore: fix meta.plan → frontmatter.plan in workflow procedures
小橘 <xiaoju@shazhou.work>
2026-05-22 11:22:34 +00:00
xiaomo 9b56f7b75e Merge pull request 'fix: add git worktree hygiene to solve-issue workflow' (#397) from fix/395-worktree-hygiene into main 2026-05-22 11:20:58 +00:00
34 changed files with 1382 additions and 200 deletions
+20 -2
View File
@@ -44,7 +44,7 @@ roles:
- If bounced back from reviewer or tester, reuse the existing branch instead
Then implement TDD:
3. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's meta.plan)
3. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
4. If bounced back from reviewer or tester: read the previous role's output to understand what needs fixing
5. Write tests first based on the spec
6. Implement the code to make tests pass
@@ -99,7 +99,7 @@ roles:
- testing
procedure: |
1. Run `bun test` for automated test verification
2. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's meta.plan)
2. Read the test spec from CAS: `uwf cas get <plan hash>` (find the hash from the latest planner step's frontmatter.plan)
3. Verify each scenario in the spec is covered and passing
4. Determine outcome:
- passed: all scenarios verified, tests pass
@@ -154,25 +154,43 @@ conditions:
graph:
$START:
- role: "planner"
condition: null
prompt: "Analyze the issue and produce an implementation plan."
planner:
- role: "$END"
condition: "insufficientInfo"
prompt: "Insufficient information to proceed; end the workflow."
- role: "developer"
condition: null
prompt: "Implement the plan from the planner."
developer:
- role: "$END"
condition: "devFailed"
prompt: "Development failed; end the workflow."
- role: "reviewer"
condition: null
prompt: "Send the implementation to the reviewer."
reviewer:
- role: "developer"
condition: "rejected"
prompt: "Reviewer rejected the implementation; fix the issues."
- role: "tester"
condition: null
prompt: "Review passed; run tests on the implementation."
tester:
- role: "developer"
condition: "fixCode"
prompt: "Tests found code issues; return to developer."
- role: "planner"
condition: "fixSpec"
prompt: "Tests found spec issues; return to planner."
- role: "committer"
condition: null
prompt: "Tests passed; commit and push the changes."
committer:
- role: "developer"
condition: "hookFailed"
prompt: "Push hook failed; return to developer to fix."
- role: "$END"
condition: null
prompt: "Commit succeeded; complete the workflow."
+2
View File
@@ -36,6 +36,8 @@ graph:
$START:
- role: "analyst"
condition: null
prompt: "Analyze the topic in the task and produce a structured summary with key points."
analyst:
- role: "$END"
condition: null
prompt: "Analysis complete. Finish the workflow."
+5
View File
@@ -62,14 +62,19 @@ graph:
$START:
- role: "planner"
condition: null
prompt: "Analyze the issue described in the task and produce a detailed implementation plan."
planner:
- role: "developer"
condition: null
prompt: "Implement the plan from the planner. Write code, tests, and ensure existing tests pass."
developer:
- role: "reviewer"
condition: null
prompt: "Review the developer's implementation against the plan for correctness and quality."
reviewer:
- role: "developer"
condition: "notApproved"
prompt: "The reviewer rejected your implementation. Read their feedback and fix the issues."
- role: "$END"
condition: null
prompt: "The review passed. Complete the workflow."
+2
View File
@@ -15,10 +15,12 @@
"release": "bun run build && bun test && node scripts/publish-all.mjs"
},
"devDependencies": {
"@agentclientprotocol/sdk": "^0.22.1",
"@biomejs/biome": "^2.4.14",
"@changesets/cli": "^2.31.0",
"@types/node": "^25.7.0",
"@types/xxhashjs": "^0.2.4",
"@uncaged/workflow-agent-hermes": "workspace:*",
"bun-types": "^1.3.13"
}
}
+81 -16
View File
@@ -23,7 +23,7 @@ import type {
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv";
import { parse, stringify } from "yaml";
@@ -47,6 +47,18 @@ import { materializeWorkflowPayload } from "./workflow.js";
const END_ROLE = "$END";
export const THREAD_READ_DEFAULT_QUOTA = 4000;
const PL_THREAD_START = "7HNQ4B2X";
const PL_MODERATOR = "M3K8V9T1";
const PL_AGENT_SPAWN = "R5J2W8N4";
const PL_AGENT_DONE = "C6P9L3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6";
function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null);
fail(message);
}
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
@@ -168,6 +180,10 @@ export async function cmdThreadStart(
const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot);
const threadId = generateUlid(Date.now()) as ThreadId;
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
const startPayload: StartNodePayload = {
workflow: workflowHash,
prompt,
@@ -183,6 +199,12 @@ export async function cmdThreadStart(
index[threadId] = headHash;
await saveThreadsIndex(storageRoot, index);
plog.log(
PL_THREAD_START,
`thread created workflow=${workflowHash} thread=${threadId} head=${headHash}`,
null,
);
return { workflow: workflowHash, thread: threadId };
}
@@ -624,13 +646,20 @@ function resolveAgentConfig(
return agentConfig;
}
function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRef {
function spawnAgent(
plog: ProcessLogger,
agent: AgentConfig,
threadId: ThreadId,
role: string,
edgePrompt: string,
): CasRef {
const argv = [...agent.args, threadId, role];
const env = { ...process.env, UWF_EDGE_PROMPT: edgePrompt };
let stdout: string;
try {
stdout = execFileSync(agent.command, argv, {
encoding: "utf8",
env: process.env,
env,
stdio: ["ignore", "pipe", "pipe"],
});
} catch (e) {
@@ -642,12 +671,12 @@ function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRe
? err.stderr
: err.stderr.toString("utf8");
const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : "";
fail(`agent command failed (${agent.command})${detail}`);
failStep(plog, `agent command failed (${agent.command})${detail}`);
}
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
if (!isCasRef(line)) {
fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
failStep(plog, `agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
}
return line;
}
@@ -679,9 +708,15 @@ export async function cmdThreadStep(
fail(`--count must be a positive integer, got: ${count}`);
}
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride);
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
@@ -690,16 +725,31 @@ export async function cmdThreadStep(
return results;
}
async function cmdThreadStepOnce(
async function resolveActiveThreadWorkflowHash(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
): Promise<StepOutput> {
): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
fail(`thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
return chain.start.workflow;
}
async function cmdThreadStepOnce(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
plog: ProcessLogger,
): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
@@ -709,10 +759,17 @@ async function cmdThreadStepOnce(
const nextResult = await evaluate(workflow, context);
if (!nextResult.ok) {
fail(nextResult.error.message);
failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`);
}
if (nextResult.value === END_ROLE) {
plog.log(
PL_MODERATOR,
`moderator role=${nextResult.value.role} prompt=${nextResult.value.prompt}`,
null,
);
if (nextResult.value.role === END_ROLE) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null);
await archiveThread(storageRoot, threadId, workflowHash, headHash);
return {
workflow: workflowHash,
@@ -722,18 +779,25 @@ async function cmdThreadStepOnce(
};
}
const role = nextResult.value;
const role = nextResult.value.role;
const edgePrompt = nextResult.value.prompt;
const config = await loadWorkflowConfig(storageRoot);
const agent = resolveAgentConfig(config, workflow, role, agentOverride);
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, {
args: [...agent.args, threadId, role].join(" "),
});
loadDotenv({ path: getEnvPath(storageRoot) });
const newHead = spawnAgent(agent, threadId, role);
const newHead = spawnAgent(plog, agent, threadId, role, edgePrompt);
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
// Re-create store to pick up nodes written by the agent subprocess
const uwfAfter = await createUwfStore(storageRoot);
const newNode = uwfAfter.store.get(newHead);
if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) {
fail(`agent returned hash that is not a StepNode: ${newHead}`);
failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`);
}
// Reload threads index to avoid overwriting changes made by the agent subprocess
@@ -745,11 +809,12 @@ async function cmdThreadStepOnce(
const contextAfter = buildModeratorContext(uwfAfter, chainAfter);
const afterResult = await evaluate(workflow, contextAfter);
if (!afterResult.ok) {
fail(afterResult.error.message);
failStep(plog, `post-step moderator evaluate failed: ${afterResult.error.message}`);
}
const done = afterResult.value === END_ROLE;
const done = afterResult.value.role === END_ROLE;
if (done) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null);
await archiveThread(storageRoot, threadId, workflowHash, newHead);
}
+10 -4
View File
@@ -55,10 +55,16 @@ function isJsonSchema(value: unknown): value is JSONSchema {
function normalizeGraph(graph: Record<string, Transition[]>): Record<string, Transition[]> {
const result: Record<string, Transition[]> = {};
for (const [node, transitions] of Object.entries(graph)) {
result[node] = transitions.map((t) => ({
role: t.role,
condition: t.condition ?? null,
}));
result[node] = transitions.map((t) => {
if (typeof t.prompt !== "string" || t.prompt.trim() === "") {
fail(`graph[${node}] transition to "${t.role}": prompt is required (non-empty string)`);
}
return {
role: t.role,
condition: t.condition ?? null,
prompt: t.prompt,
};
});
}
return result;
}
+2
View File
@@ -44,6 +44,8 @@ function isTransition(value: unknown): boolean {
const condition = value.condition;
return (
typeof value.role === "string" &&
typeof value.prompt === "string" &&
value.prompt.trim() !== "" &&
(condition === null || condition === undefined || typeof condition === "string")
);
}
@@ -1,9 +1,13 @@
import { describe, expect, test } from "bun:test";
import type { AgentContext } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { buildClaudeCodePrompt } from "../src/claude-code.js";
function makeCtx(overrides: Partial<AgentContext> = {}): AgentContext {
return {
threadId: "01JTEST0000000000000000000" as ThreadId,
edgePrompt: "Proceed with the assigned role.",
isFirstVisit: true,
workflow: {
roles: {
developer: {
@@ -0,0 +1,77 @@
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { HermesAcpClient } from "../src/acp-client.js";
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
describe("HermesAcpClient", () => {
let client: HermesAcpClient;
beforeEach(() => {
client = new HermesAcpClient();
});
afterEach(async () => {
await client.close();
});
it(
"connect() returns a UUID sessionId",
async () => {
const sessionId = await client.connect(process.cwd());
expect(typeof sessionId).toBe("string");
expect(sessionId).toMatch(UUID_RE);
},
{ timeout: 2 * 60 * 1000 },
);
it(
"prompt() returns a non-empty text response",
async () => {
await client.connect(process.cwd());
const result = await client.prompt("Reply with exactly the word: PONG");
expect(typeof result.text).toBe("string");
expect(result.text.length).toBeGreaterThan(0);
expect(typeof result.sessionId).toBe("string");
expect(result.sessionId).toMatch(UUID_RE);
},
{ timeout: 2 * 60 * 1000 },
);
it(
"prompt() can be called twice on the same session (resume)",
async () => {
await client.connect(process.cwd());
const first = await client.prompt("Say the word ALPHA and nothing else.");
expect(first.text.length).toBeGreaterThan(0);
const second = await client.prompt("Now say the word BETA and nothing else.");
expect(second.text.length).toBeGreaterThan(0);
expect(first.sessionId).toBe(second.sessionId);
},
{ timeout: 2 * 60 * 1000 },
);
it(
"prompt() collects structured messages including tool calls",
async () => {
await client.connect(process.cwd());
const result = await client.prompt("Run this command: echo TOOL_DETAIL_TEST");
expect(result.messages.length).toBeGreaterThan(0);
// Should have at least one tool message (the echo command)
const toolMessages = result.messages.filter((m) => m.role === "tool");
expect(toolMessages.length).toBeGreaterThan(0);
// Tool message should contain the output
const toolContent = toolMessages[0]?.content ?? "";
expect(toolContent).toContain("TOOL_DETAIL_TEST");
// Should have assistant messages with tool_calls
const assistantWithTools = result.messages.filter(
(m) => m.role === "assistant" && m.tool_calls !== null,
);
expect(assistantWithTools.length).toBeGreaterThan(0);
},
{ timeout: 2 * 60 * 1000 },
);
});
@@ -0,0 +1,78 @@
import { describe, expect, test } from "bun:test";
import type { AgentContext } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { buildHermesPrompt } from "../src/hermes.js";
function makeCtx(overrides: Partial<AgentContext> = {}): AgentContext {
return {
threadId: "01JTEST0000000000000000000" as ThreadId,
edgePrompt: "Proceed with the assigned role.",
isFirstVisit: true,
workflow: {
roles: {
developer: {
description: "TDD implementation per test spec",
goal: "Write code",
capabilities: ["coding"],
procedure: "1. Read spec\n2. Write code",
output: "List files changed",
frontmatter: "",
},
},
conditions: {},
graph: {},
},
role: "developer",
start: { prompt: "Fix the bug", workflowHash: "abc123", threadId: "t1" },
steps: [],
store: {} as AgentContext["store"],
outputFormatInstruction: "Use YAML frontmatter",
...overrides,
};
}
describe("buildHermesPrompt", () => {
test("first visit uses full role prompt and includes moderator instruction", () => {
const result = buildHermesPrompt(
makeCtx({ edgePrompt: "Focus on the failing test.", isFirstVisit: true }),
);
expect(result).toMatch(/^Use YAML frontmatter/);
expect(result).toContain("Write code");
expect(result).toContain("## Task\nFix the bug");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("Focus on the failing test.");
});
test("re-entry uses continuation prompt with edge instruction", () => {
const ctx = makeCtx({
isFirstVisit: false,
edgePrompt: "The reviewer rejected your work. Fix the issues.",
steps: [
{ role: "developer", output: { summary: "Initial fix" }, agent: "uwf-hermes" },
{ role: "reviewer", output: { approved: false }, agent: "uwf-hermes" },
],
});
const result = buildHermesPrompt(ctx);
expect(result).not.toContain("## Task");
expect(result).toContain("## What Happened Since Your Last Turn");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("The reviewer rejected your work.");
});
test("forced first visit via isFirstVisit uses initial prompt even when role appears in history", () => {
const result = buildHermesPrompt(
makeCtx({
isFirstVisit: true,
steps: [{ role: "developer", output: { done: true }, agent: "uwf-hermes" }],
edgePrompt: "Retry with a fresh approach.",
}),
);
expect(result).toContain("## Task");
expect(result).toContain("Retry with a fresh approach.");
expect(result).not.toContain("## What Happened Since Your Last Turn");
});
});
+3 -1
View File
@@ -22,7 +22,9 @@
},
"dependencies": {
"@uncaged/json-cas": "^0.4.0",
"@uncaged/workflow-agent-kit": "workspace:^"
"@uncaged/workflow-agent-kit": "workspace:^",
"@uncaged/workflow-protocol": "workspace:^",
"@uncaged/workflow-util": "workspace:^"
},
"devDependencies": {
"typescript": "^5.8.3"
@@ -0,0 +1,393 @@
import type { ChildProcess } from "node:child_process";
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import type { HermesSessionMessage } from "./types.js";
const HERMES_COMMAND = "hermes";
const PROTOCOL_VERSION = 1;
type JsonRpcResponse = {
jsonrpc: "2.0";
id: number;
result?: unknown;
error?: { code: number; message: string };
};
type PendingRequest = {
resolve: (value: JsonRpcResponse) => void;
reject: (reason: Error) => void;
};
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
type PendingToolCall = {
name: string;
args: string;
};
export type AcpPromptResult = {
text: string;
sessionId: string;
messages: HermesSessionMessage[];
};
export class HermesAcpClient {
private process: ChildProcess | null = null;
private nextId = 1;
private sessionId: string | null = null;
private stderrBuffer = "";
private pending = new Map<number, PendingRequest>();
// Message collection state
private messageChunks: string[] = [];
private reasoningChunks: string[] = [];
private pendingTools = new Map<string, PendingToolCall>();
messages: HermesSessionMessage[] = [];
/** Spawn hermes acp, initialize, create session */
async connect(cwd: string): Promise<string> {
await this.ensureProcess();
await this.initialize();
const sessionResponse = (await this.sendRequest("session/new", {
cwd,
mcpServers: [],
})) as { result: { sessionId: string } };
const sessionId = sessionResponse.result?.sessionId;
if (typeof sessionId !== "string" || sessionId === "") {
throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`);
}
this.sessionId = sessionId;
return sessionId;
}
/** Spawn hermes acp, initialize, resume an existing session */
async resume(sessionId: string, cwd: string): Promise<string> {
await this.ensureProcess();
await this.initialize();
const response = await this.sendRequest("session/resume", {
cwd,
sessionId,
mcpServers: [],
});
if ((response as { error?: unknown }).error !== undefined) {
throw new Error(
`session/resume failed: ${JSON.stringify((response as { error: unknown }).error)}`,
);
}
this.sessionId = sessionId;
return sessionId;
}
/** Send prompt and collect full response text + structured messages. */
async prompt(text: string): Promise<AcpPromptResult> {
if (this.sessionId === null) {
throw new Error("Not connected — call connect() first");
}
this.messageChunks = [];
this.reasoningChunks = [];
const response = await this.sendRequest("session/prompt", {
sessionId: this.sessionId,
prompt: [{ type: "text", text }],
});
if ((response as { error?: unknown }).error !== undefined) {
throw new Error(
`session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`,
);
}
// Flush any trailing assistant text that wasn't followed by a tool call.
this.flushAssistantMessage();
// Extract the final assistant text from collected messages.
let finalText = "";
for (let i = this.messages.length - 1; i >= 0; i--) {
const msg = this.messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
finalText = msg.content;
break;
}
}
return {
text: finalText,
sessionId: this.sessionId,
messages: this.messages,
};
}
/** Close the connection */
async close(): Promise<void> {
if (this.process === null) {
return;
}
this.sessionId = null;
this.process.stdin?.end();
const proc = this.process;
await new Promise<void>((resolve) => {
proc.on("close", () => resolve());
setTimeout(resolve, 5000);
});
this.process = null;
}
// ---- JSON-RPC transport ----
private sendRequest(
method: string,
params: Record<string, unknown>,
timeoutMs = 10 * 60 * 1000,
): Promise<JsonRpcResponse> {
const id = this.nextId++;
return new Promise<JsonRpcResponse>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`Timeout waiting for response to ${method} (id=${id})`));
}, timeoutMs);
this.pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
resolve(value);
},
reject: (err) => {
clearTimeout(timer);
reject(err);
},
});
this.writeLine(JSON.stringify({ jsonrpc: "2.0", id, method, params }));
});
}
private sendNotification(method: string, params?: Record<string, unknown>): void {
const message: Record<string, unknown> = { jsonrpc: "2.0", method };
if (params !== undefined) {
message.params = params;
}
this.writeLine(JSON.stringify(message));
}
private writeLine(line: string): void {
if (this.process?.stdin === null || this.process?.stdin === undefined) {
throw new Error("Cannot write: hermes acp process stdin not available");
}
this.process.stdin.write(`${line}\n`);
}
private handleLine(line: string): void {
if (line === "") {
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
const msg = parsed as Record<string, unknown>;
const hasId = "id" in msg && msg.id !== undefined && msg.id !== null;
const hasMethod = typeof msg.method === "string";
// JSON-RPC response to one of our requests (has "id" but no "method")
if (hasId && !hasMethod) {
const response = msg as unknown as JsonRpcResponse;
const handler = this.pending.get(response.id);
if (handler !== undefined) {
this.pending.delete(response.id);
handler.resolve(response);
}
return;
}
// Server-initiated JSON-RPC request: session/request_permission (has "id" + "method")
if (msg.method === "session/request_permission" && hasId) {
const params = msg.params as Record<string, unknown> | undefined;
const options = (params?.options ?? []) as Array<{ optionId?: string }>;
const firstOptionId = options[0]?.optionId ?? "";
this.writeLine(
JSON.stringify({
jsonrpc: "2.0",
id: msg.id,
result: { outcome: { outcome: "selected", optionId: firstOptionId } },
}),
);
return;
}
// JSON-RPC notification — session/update (no "id")
if (msg.method === "session/update") {
const params = msg.params as Record<string, unknown> | undefined;
const update = params?.update as Record<string, unknown> | undefined;
if (update !== undefined) {
this.handleSessionUpdate(update);
}
return;
}
}
// ---- Session update → structured messages ----
private handleSessionUpdate(update: Record<string, unknown>): void {
const updateType = update.sessionUpdate as string;
switch (updateType) {
case "agent_message_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
break;
}
case "agent_thought_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
break;
}
case "tool_call": {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args =
rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
// Flush accumulated assistant text before tool call
this.flushAssistantMessage();
break;
}
case "tool_call_update": {
const status = update.status as string | undefined;
if (status === "completed" || status === "failed") {
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
break;
}
default:
break;
}
}
/** Flush any accumulated text/reasoning into an assistant message. */
private flushAssistantMessage(): void {
const text = this.messageChunks.join("");
const reasoning = this.reasoningChunks.join("");
if (text !== "" || reasoning !== "") {
this.messages.push({
role: "assistant",
content: text || null,
reasoning: reasoning || null,
tool_calls: null,
});
}
this.messageChunks = [];
this.reasoningChunks = [];
}
private rejectAll(err: Error): void {
for (const handler of this.pending.values()) {
handler.reject(err);
}
this.pending.clear();
}
private async ensureProcess(): Promise<void> {
if (this.process !== null) {
return;
}
const child = spawn(HERMES_COMMAND, ["acp"], {
env: process.env,
shell: false,
stdio: ["pipe", "pipe", "pipe"],
});
this.process = child;
child.stderr?.on("data", (chunk: Buffer) => {
this.stderrBuffer += chunk.toString();
});
child.on("error", (cause) => {
const message = cause instanceof Error ? cause.message : String(cause);
this.rejectAll(new Error(`hermes acp spawn failed: ${message}`));
});
child.on("close", (code) => {
if (code !== 0 && this.pending.size > 0) {
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
this.rejectAll(
new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`),
);
}
});
if (child.stdout === null) {
throw new Error("hermes acp process stdout is not available");
}
const rl = createInterface({ input: child.stdout });
rl.on("line", (line) => {
this.handleLine(line.trim());
});
}
private async initialize(): Promise<void> {
const initResponse = await this.sendRequest("initialize", {
protocolVersion: PROTOCOL_VERSION,
clientInfo: { name: "uwf", version: "0.1.0" },
capabilities: {},
});
if ((initResponse as { error?: unknown }).error !== undefined) {
throw new Error(
`initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`,
);
}
this.sendNotification("initialized");
}
}
+135 -110
View File
@@ -1,21 +1,18 @@
import { spawn } from "node:child_process";
import type { Store } from "@uncaged/json-cas";
import {
type AgentContext,
type AgentRunResult,
buildContinuationPrompt,
buildRolePrompt,
createAgent,
} from "@uncaged/workflow-agent-kit";
import { createLogger } from "@uncaged/workflow-util";
import {
loadHermesSession,
parseSessionIdFromStdout,
storeHermesSessionDetail,
} from "./session-detail.js";
import { HermesAcpClient } from "./acp-client.js";
import { getCachedSessionId, isResumeDisabled, setCachedSessionId } from "./session-cache.js";
import { storeHermesSessionDetail } from "./session-detail.js";
const HERMES_COMMAND = "hermes";
const HERMES_MAX_TURNS = 90;
const log = createLogger({ sink: { kind: "stderr" } });
function buildHistorySummary(steps: AgentContext["steps"]): string {
if (steps.length === 0) {
@@ -36,12 +33,11 @@ function buildHistorySummary(steps: AgentContext["steps"]): string {
return lines.join("\n");
}
/** Assemble system prompt, task, and prior step outputs for Hermes. */
export function buildHermesPrompt(ctx: AgentContext): string {
function buildInitialPrompt(ctx: AgentContext): string {
const roleDef = ctx.workflow.roles[ctx.role];
const rolePrompt = roleDef !== undefined ? buildRolePrompt(roleDef) : "";
const parts: string[] = [];
if (ctx.outputFormatInstruction !== undefined && ctx.outputFormatInstruction !== "") {
if (ctx.outputFormatInstruction !== "") {
parts.push(ctx.outputFormatInstruction, "");
}
parts.push(rolePrompt, "", "## Task", ctx.start.prompt);
@@ -49,116 +45,145 @@ export function buildHermesPrompt(ctx: AgentContext): string {
if (historyBlock !== "") {
parts.push("", historyBlock);
}
parts.push("", "## Moderator Instruction", "", ctx.edgePrompt);
return parts.join("\n");
}
function spawnHermes(args: string[]): Promise<{ stdout: string; stderr: string }> {
return new Promise((resolve, reject) => {
const child = spawn(HERMES_COMMAND, args, {
env: process.env,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout?.on("data", (chunk: Buffer) => {
stdout += chunk.toString();
});
child.stderr?.on("data", (chunk: Buffer) => {
stderr += chunk.toString();
});
child.on("error", (cause) => {
const message = cause instanceof Error ? cause.message : String(cause);
reject(new Error(`hermes spawn failed: ${message}`));
});
child.on("close", (code) => {
if (code === 0) {
resolve({ stdout, stderr });
return;
}
const detail = stderr.trim() !== "" ? ` stderr=${stderr.trim()}` : "";
reject(new Error(`hermes exited with code ${code ?? "null"}${detail}`));
});
});
}
function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> {
return spawnHermes([
"chat",
"-q",
prompt,
"--yolo",
"--max-turns",
String(HERMES_MAX_TURNS),
"--quiet",
]);
}
function spawnHermesResume(
sessionId: string,
message: string,
): Promise<{ stdout: string; stderr: string }> {
return spawnHermes([
"chat",
"--resume",
sessionId,
"-q",
message,
"--yolo",
"--max-turns",
String(HERMES_MAX_TURNS),
"--quiet",
]);
}
function parseSessionId(stdout: string, stderr: string): string {
const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout);
if (sessionId === null) {
throw new Error(
"Failed to parse session_id from hermes output.\n" +
`stderr (first 200 chars): ${stderr.slice(0, 200)}\n` +
`stdout (first 200 chars): ${stdout.slice(0, 200)}`,
);
/** Assemble system prompt, task, and prior step outputs for Hermes. */
export function buildHermesPrompt(ctx: AgentContext): string {
if (!ctx.isFirstVisit) {
const parts: string[] = [];
if (ctx.outputFormatInstruction !== "") {
parts.push(ctx.outputFormatInstruction, "");
}
parts.push(buildContinuationPrompt(ctx.steps, ctx.role, ctx.edgePrompt));
return parts.join("\n");
}
return sessionId;
return buildInitialPrompt(ctx);
}
async function buildResultFromSession(sessionId: string, store: Store): Promise<AgentRunResult> {
const session = await loadHermesSession(sessionId);
if (session === null) {
throw new Error(`Failed to load hermes session file for session_id: ${sessionId}`);
}
const { detailHash, output } = await storeHermesSessionDetail(store, session);
return { output, detailHash, sessionId };
}
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
const fullPrompt = buildHermesPrompt(ctx);
const { stdout, stderr } = await spawnHermesChat(fullPrompt);
const sessionId = parseSessionId(stdout, stderr);
return buildResultFromSession(sessionId, ctx.store);
}
async function continueHermes(
sessionId: string,
message: string,
async function storePromptResult(
store: Store,
): Promise<AgentRunResult> {
const { stdout, stderr } = await spawnHermesResume(sessionId, message);
// Resume may return a new session_id
const newSessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout);
const resolvedId = newSessionId ?? sessionId;
return buildResultFromSession(resolvedId, store);
sessionId: string,
messages: Awaited<ReturnType<HermesAcpClient["prompt"]>>["messages"],
): Promise<{ detailHash: string }> {
const session = {
session_id: sessionId,
model: "",
session_start: new Date().toISOString(),
messages,
};
return storeHermesSessionDetail(store, session);
}
/** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */
type PromptAttempt = {
useContinuation: boolean;
resumed: boolean;
};
async function prepareSession(
client: HermesAcpClient,
ctx: AgentContext,
cwd: string,
): Promise<PromptAttempt> {
if (ctx.isFirstVisit || isResumeDisabled()) {
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
const cachedSessionId = await getCachedSessionId(ctx.threadId, ctx.role);
if (cachedSessionId === null) {
log("6RWK3N8Q", `no cached session for ${ctx.threadId}:${ctx.role}, starting new session`);
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
try {
await client.resume(cachedSessionId, cwd);
log("9MHT4V2P", `resumed hermes session ${cachedSessionId} for ${ctx.threadId}:${ctx.role}`);
return { useContinuation: true, resumed: true };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
log("3XPN7K4W", `session resume failed, falling back to new session: ${message}`);
await client.close();
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
}
/**
* Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode.
*
* A single ACP client is shared across run() and continue() calls so that
* frontmatter retry loops keep the same Hermes session context. The client
* is closed once the agent process exits (via process.on("exit")).
*/
export function createHermesAgent(): () => Promise<void> {
return createAgent({
const client = new HermesAcpClient();
// Ensure cleanup regardless of how the process exits.
process.on("exit", () => {
void client.close();
});
async function runPrompt(ctx: AgentContext, useContinuation: boolean): Promise<AgentRunResult> {
const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true };
const fullPrompt = buildHermesPrompt(effectiveCtx);
const { text, sessionId, messages } = await client.prompt(fullPrompt);
const { detailHash } = await storePromptResult(ctx.store, sessionId, messages);
if (!isResumeDisabled()) {
await setCachedSessionId(ctx.threadId, ctx.role, sessionId);
}
return { output: text, detailHash, sessionId };
}
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
const cwd = process.cwd();
const attempt = await prepareSession(client, ctx, cwd);
try {
return await runPrompt(ctx, attempt.useContinuation);
} catch (error) {
if (!attempt.resumed) {
throw error;
}
const message = error instanceof Error ? error.message : String(error);
log("8FQW2R6N", `continuation prompt failed, retrying with initial prompt: ${message}`);
await client.close();
await client.connect(cwd);
return runPrompt(ctx, false);
}
}
async function continueHermes(
_sessionId: string,
message: string,
store: Store,
): Promise<AgentRunResult> {
// Client is already connected from runHermes — same ACP session,
// so the agent sees the full conversation history (crucial for retries).
const { text, sessionId, messages } = await client.prompt(message);
const { detailHash } = await storePromptResult(store, sessionId, messages);
return { output: text, detailHash, sessionId };
}
const agentMain = createAgent({
name: "hermes",
run: runHermes,
continue: continueHermes,
});
// Wrap to ensure ACP client is closed after agent completes,
// so the hermes subprocess exits and bun can terminate.
return async () => {
try {
await agentMain();
} finally {
await client.close();
}
};
}
@@ -1 +1,2 @@
export { HermesAcpClient } from "./acp-client.js";
export { buildHermesPrompt, createHermesAgent } from "./hermes.js";
@@ -0,0 +1,70 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { resolveStorageRoot } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
type HermesSessionCache = Record<string, string>;
function getCachePath(): string {
return join(resolveStorageRoot(), "cache", "hermes-sessions.json");
}
function cacheKey(threadId: ThreadId, role: string): string {
return `${threadId}:${role}`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
async function readCache(): Promise<HermesSessionCache> {
const path = getCachePath();
try {
const text = await readFile(path, "utf8");
const raw = JSON.parse(text) as unknown;
if (!isRecord(raw)) {
return {};
}
const cache: HermesSessionCache = {};
for (const [key, value] of Object.entries(raw)) {
if (typeof value === "string" && value !== "") {
cache[key] = value;
}
}
return cache;
} catch (e) {
const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") {
return {};
}
throw e;
}
}
async function writeCache(cache: HermesSessionCache): Promise<void> {
const path = getCachePath();
await mkdir(dirname(path), { recursive: true });
await writeFile(path, `${JSON.stringify(cache, null, 2)}\n`, "utf8");
}
export function isResumeDisabled(): boolean {
const flag = process.env.UWF_NO_RESUME;
return flag !== undefined && flag !== "";
}
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
const cache = await readCache();
const sessionId = cache[cacheKey(threadId, role)];
return sessionId ?? null;
}
export async function setCachedSessionId(
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
const cache = await readCache();
cache[cacheKey(threadId, role)] = sessionId;
await writeCache(cache);
}
@@ -0,0 +1,70 @@
import type { StepContext } from "@uncaged/workflow-protocol";
import { describe, expect, test } from "vitest";
import { buildContinuationPrompt } from "../src/build-continuation-prompt.js";
const reviewerStep: StepContext = {
role: "reviewer",
output: { approved: false, comments: "Missing tests" },
detail: "2MXBG6PN4A8JR",
agent: "uwf-hermes",
};
const developerStep: StepContext = {
role: "developer",
output: { filesChanged: ["src/app.ts"], summary: "Initial fix" },
detail: "1VPBG9SM5E7WK",
agent: "uwf-hermes",
};
describe("buildContinuationPrompt", () => {
test("includes steps after the last matching role and the edge prompt", () => {
const steps: StepContext[] = [
developerStep,
reviewerStep,
{
role: "planner",
output: { plan: "revise approach" },
detail: "7BQST3VW9F2MA",
agent: "uwf-hermes",
},
];
const result = buildContinuationPrompt(
steps,
"developer",
"The reviewer rejected your implementation. Read their feedback and fix the issues.",
);
expect(result).toContain("## What Happened Since Your Last Turn");
expect(result).toContain("### Step 2: reviewer");
expect(result).toContain("Missing tests");
expect(result).toContain("### Step 3: planner");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("The reviewer rejected your implementation.");
expect(result).not.toContain("Initial fix");
});
test("uses all steps when the role has not run before", () => {
const result = buildContinuationPrompt(
[developerStep, reviewerStep],
"planner",
"Continue from the reviewer feedback.",
);
expect(result).toContain("### Step 1: developer");
expect(result).toContain("### Step 2: reviewer");
expect(result).toContain("Continue from the reviewer feedback.");
});
test("still includes moderator instruction when there are no intervening steps", () => {
const result = buildContinuationPrompt(
[developerStep],
"developer",
"Please revise your work.",
);
expect(result).not.toContain("## What Happened Since Your Last Turn");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("Please revise your work.");
});
});
@@ -0,0 +1,53 @@
import type { StepContext } from "@uncaged/workflow-protocol";
function formatStep(step: StepContext, stepNumber: number): string {
return [
`### Step ${stepNumber}: ${step.role}`,
`Output: ${JSON.stringify(step.output)}`,
`Agent: ${step.agent}`,
].join("\n");
}
function findLastRoleIndex(steps: StepContext[], role: string): number {
for (let i = steps.length - 1; i >= 0; i--) {
const step = steps[i];
if (step !== undefined && step.role === role) {
return i;
}
}
return -1;
}
/**
* Build a continuation prompt for a role re-entry.
*
* Finds the most recent step for `role`, collects everything after it as context,
* and appends the moderator edge prompt as the instruction.
*/
export function buildContinuationPrompt(
steps: StepContext[],
role: string,
edgePrompt: string,
): string {
const lastIndex = findLastRoleIndex(steps, role);
const sinceSteps = lastIndex >= 0 ? steps.slice(lastIndex + 1) : steps;
const parts: string[] = [];
if (sinceSteps.length > 0) {
parts.push("## What Happened Since Your Last Turn");
const baseStepNumber = lastIndex >= 0 ? lastIndex + 2 : 1;
for (let i = 0; i < sinceSteps.length; i++) {
const step = sinceSteps[i];
if (step === undefined) {
continue;
}
parts.push("");
parts.push(formatStep(step, baseStepNumber + i));
}
parts.push("");
}
parts.push("## Moderator Instruction", "", edgePrompt);
return parts.join("\n");
}
@@ -21,6 +21,14 @@ function fail(message: string): never {
throw new Error(message);
}
function readEdgePrompt(): string {
const value = process.env.UWF_EDGE_PROMPT;
if (value === undefined || value === "") {
fail("UWF_EDGE_PROMPT environment variable is required");
}
return value;
}
function walkChain(store: Store, schemas: AgentStore["schemas"], headHash: CasRef): ChainState {
const headNode = store.get(headHash);
if (headNode === null) {
@@ -133,6 +141,8 @@ export async function buildContext(threadId: ThreadId, role: string): Promise<Ag
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
threadId,
@@ -142,6 +152,8 @@ export async function buildContext(threadId: ThreadId, role: string): Promise<Ag
workflow,
store,
outputFormatInstruction: "",
edgePrompt,
isFirstVisit,
};
}
@@ -178,6 +190,8 @@ export async function buildContextWithMeta(
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
threadId,
@@ -187,6 +201,8 @@ export async function buildContextWithMeta(
workflow,
store,
outputFormatInstruction: "",
edgePrompt,
isFirstVisit,
meta: { storageRoot, store, schemas, headHash, chain },
};
}
+2 -1
View File
@@ -1,3 +1,4 @@
export { buildContinuationPrompt } from "./build-continuation-prompt.js";
export { buildOutputFormatInstruction } from "./build-output-format-instruction.js";
export { buildRolePrompt } from "./build-role-prompt.js";
export type { BuildContextMeta } from "./context.js";
@@ -11,7 +12,7 @@ export {
export type { FrontmatterFastPathResult } from "./frontmatter.js";
export { tryFrontmatterFastPath } from "./frontmatter.js";
export { createAgent } from "./run.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig } from "./storage.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type {
AgentContext,
AgentContinueFn,
+9
View File
@@ -12,6 +12,15 @@ export type AgentContext = ModeratorContext & {
* role's output schema. Populated by `createAgent` at run time.
*/
outputFormatInstruction: string;
/**
* Edge prompt from the graph transition that led to this role (UWF_EDGE_PROMPT).
* Always the real moderator instruction for this step.
*/
edgePrompt: string;
/**
* True when the current role has not appeared in steps history before this invocation.
*/
isFirstVisit: boolean;
};
export type AgentRunResult = {
+11 -2
View File
@@ -77,9 +77,11 @@ function stepsToPayload(name: string, description: string, steps: WorkFlowSteps)
};
}
}
const targetRole = t.target === "END" ? "$END" : t.target;
return {
role: t.target === "END" ? "$END" : t.target,
role: targetRole,
condition: condName,
prompt: `Transition to ${targetRole}.`,
};
});
@@ -87,7 +89,14 @@ function stepsToPayload(name: string, description: string, steps: WorkFlowSteps)
}
if (steps.length > 0) {
graph["$START"] = [{ role: steps[0].role.name, condition: null }];
const firstRole = steps[0].role.name;
graph["$START"] = [
{
role: firstRole,
condition: null,
prompt: `Begin workflow at role ${firstRole}.`,
},
];
}
return { name, description, roles, conditions, graph };
@@ -9,27 +9,27 @@ const solveIssueWorkflow: WorkflowPayload = {
roles: {
planner: {
description: "Creates implementation plan",
identity: "You are a planning agent.",
prepare: "Review the issue context.",
execute: "Create a step-by-step plan.",
report: "Output the plan and steps.",
outputSchema: "5GWKR8TN1V3JA",
goal: "You are a planning agent.",
capabilities: ["planning"],
procedure: "Create a step-by-step plan.",
output: "Output the plan and steps.",
frontmatter: "5GWKR8TN1V3JA",
},
developer: {
description: "Implements code changes",
identity: "You are a developer agent.",
prepare: "Load coding tools.",
execute: "Implement the plan.",
report: "List files changed and summary.",
outputSchema: "8CNWT4KR6D1HV",
goal: "You are a developer agent.",
capabilities: ["coding"],
procedure: "Implement the plan.",
output: "List files changed and summary.",
frontmatter: "8CNWT4KR6D1HV",
},
reviewer: {
description: "Reviews code changes",
identity: "You are a code reviewer.",
prepare: "Review project conventions.",
execute: "Review the implementation.",
report: "Approve or reject with comments.",
outputSchema: "1VPBG9SM5E7WK",
goal: "You are a code reviewer.",
capabilities: ["code-review"],
procedure: "Review the implementation.",
output: "Approve or reject with comments.",
frontmatter: "1VPBG9SM5E7WK",
},
},
conditions: {
@@ -43,15 +43,35 @@ const solveIssueWorkflow: WorkflowPayload = {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
planner: [
{ role: "developer", condition: "needsClarification" },
{ role: "$END", condition: null },
$START: [
{
role: "planner",
condition: null,
prompt: "Start planning from the issue in the task.",
},
],
planner: [
{
role: "developer",
condition: "needsClarification",
prompt: "Clarification is needed; hand off to developer.",
},
{ role: "$END", condition: null, prompt: "Planning complete; end workflow." },
],
developer: [
{
role: "reviewer",
condition: null,
prompt: "Implementation done; send to reviewer.",
},
],
developer: [{ role: "reviewer", condition: null }],
reviewer: [
{ role: "developer", condition: "rejected" },
{ role: "$END", condition: null },
{
role: "developer",
condition: "rejected",
prompt: "Reviewer rejected; return to developer.",
},
{ role: "$END", condition: null, prompt: "Review passed; end workflow." },
],
},
};
@@ -69,7 +89,10 @@ function makeContext(steps: ModeratorContext["steps"]): ModeratorContext {
describe("evaluate", () => {
test("$START → first role (fallback)", async () => {
const result = await evaluate(solveIssueWorkflow, makeContext([]));
expect(result).toEqual({ ok: true, value: "planner" });
expect(result).toEqual({
ok: true,
value: { role: "planner", prompt: "Start planning from the issue in the task." },
});
});
test("condition match (rejected → developer)", async () => {
@@ -82,7 +105,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "Reviewer rejected; return to developer." },
});
});
test("fallback when condition does not match → $END", async () => {
@@ -95,7 +121,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "Review passed; end workflow." },
});
});
test("missing role in graph → error", async () => {
@@ -124,7 +153,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "Clarification is needed; hand off to developer." },
});
});
test("$last returns most recent matching role's frontmatter", async () => {
@@ -137,10 +169,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "developer", condition: null }],
$START: [
{
role: "developer",
condition: null,
prompt: "Begin development.",
},
],
developer: [
{ role: "$END", condition: "devFailed" },
{ role: "reviewer", condition: null },
{ role: "$END", condition: "devFailed", prompt: "Development failed; end." },
{
role: "reviewer",
condition: null,
prompt: "Development succeeded; review.",
},
],
},
};
@@ -165,7 +207,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(workflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "Development failed; end." },
});
});
test("$first returns earliest matching role's frontmatter", async () => {
@@ -178,10 +223,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
$START: [
{
role: "planner",
condition: null,
prompt: "Begin planning.",
},
],
planner: [
{ role: "$END", condition: "firstPlanReady" },
{ role: "developer", condition: null },
{ role: "$END", condition: "firstPlanReady", prompt: "First plan was ready; end." },
{
role: "developer",
condition: null,
prompt: "Plan not ready on first pass; implement.",
},
],
},
};
@@ -206,7 +261,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(workflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "First plan was ready; end." },
});
});
test("$last returns undefined for unmatched role", async () => {
@@ -219,10 +277,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
$START: [
{
role: "planner",
condition: null,
prompt: "Begin planning.",
},
],
planner: [
{ role: "$END", condition: "hasReviewer" },
{ role: "developer", condition: null },
{ role: "$END", condition: "hasReviewer", prompt: "Reviewer already ran; end." },
{
role: "developer",
condition: null,
prompt: "No reviewer yet; implement.",
},
],
},
};
@@ -236,6 +304,9 @@ describe("evaluate", () => {
]);
const result = await evaluate(workflow, context);
// no reviewer step → $exists returns false → fallback to developer
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "No reviewer yet; implement." },
});
});
});
+4 -4
View File
@@ -1,7 +1,7 @@
import type { ModeratorContext, WorkflowPayload } from "@uncaged/workflow-protocol";
import jsonata from "jsonata";
import type { Result } from "./types.js";
import type { EvaluateResult, Result } from "./types.js";
const START_ROLE = "$START";
@@ -78,7 +78,7 @@ function currentRole(context: ModeratorContext): string {
export async function evaluate(
workflow: WorkflowPayload,
context: ModeratorContext,
): Promise<Result<string, Error>> {
): Promise<Result<EvaluateResult, Error>> {
const role = currentRole(context);
const transitions = workflow.graph[role];
if (transitions === undefined) {
@@ -90,7 +90,7 @@ export async function evaluate(
for (const transition of transitions) {
if (transition.condition === null) {
return { ok: true, value: transition.role };
return { ok: true, value: { role: transition.role, prompt: transition.prompt } };
}
const conditionDef = workflow.conditions[transition.condition];
@@ -106,7 +106,7 @@ export async function evaluate(
return evalResult;
}
if (isTruthy(evalResult.value)) {
return { ok: true, value: transition.role };
return { ok: true, value: { role: transition.role, prompt: transition.prompt } };
}
}
+1
View File
@@ -1 +1,2 @@
export { evaluate } from "./evaluate.js";
export type { EvaluateResult } from "./types.js";
+6
View File
@@ -1 +1,7 @@
export type Result<T, E> = { ok: true; value: T } | { ok: false; error: E };
/** The result of moderator evaluation — which role to go to, and the edge prompt. */
export type EvaluateResult = {
role: string;
prompt: string;
};
+2 -1
View File
@@ -26,10 +26,11 @@ const CONDITION_DEFINITION: JSONSchema = {
const TRANSITION: JSONSchema = {
type: "object",
required: ["role", "condition"],
required: ["role", "condition", "prompt"],
properties: {
role: { type: "string" },
condition: { anyOf: [{ type: "string" }, { type: "null" }] },
prompt: { type: "string" },
},
additionalProperties: false,
};
+1
View File
@@ -28,6 +28,7 @@ export type RoleDefinition = {
export type Transition = {
role: string;
condition: string | null;
prompt: string;
};
export type ConditionDefinition = {
@@ -0,0 +1,81 @@
import { mkdirSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, test } from "bun:test";
import { createProcessLogger } from "../src/process-logger/index.js";
function logDateKey(date: Date): string {
return date.toISOString().slice(0, 10);
}
describe("createProcessLogger", () => {
let tmpDir: string;
afterEach(() => {
if (tmpDir !== undefined) {
rmSync(tmpDir, { recursive: true, force: true });
}
});
test("writes init and log lines to dated JSONL under storage root", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
const plog = createProcessLogger({
storageRoot: tmpDir,
context: { thread: "THREAD01", workflow: "WORKFLOW01" },
});
expect(plog.pid).toMatch(/^\d+-\d+$/);
plog.log("7NQW4HBT", "moderator selected role=planner", null);
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
const lines = readFileSync(logPath, "utf8")
.trim()
.split("\n")
.map((line) => JSON.parse(line) as Record<string, string>);
expect(lines).toHaveLength(2);
expect(lines[0]?.tag).toBe("W9F3RK2M");
expect(lines[0]?.pid).toBe(plog.pid);
expect(lines[0]?.thread).toBe("THREAD01");
expect(lines[0]?.workflow).toBe("WORKFLOW01");
expect(lines[0]?.msg).toContain("process start");
expect(lines[0]?.msg).toContain("node=");
expect(lines[1]?.tag).toBe("7NQW4HBT");
expect(lines[1]?.msg).toBe("moderator selected role=planner");
expect(lines[1]?.thread).toBe("THREAD01");
expect(lines[1]?.workflow).toBe("WORKFLOW01");
});
test("creates logs directory when missing", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
createProcessLogger({
storageRoot: tmpDir,
context: { thread: null, workflow: null },
});
mkdirSync(join(tmpDir, "logs"), { recursive: true });
expect(() =>
readFileSync(join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`), "utf8"),
).not.toThrow();
});
test("merges per-call context into the JSONL entry", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
const plog = createProcessLogger({
storageRoot: tmpDir,
context: { thread: "T1", workflow: null },
});
plog.log("M3K8V9T1", "spawn agent", { command: "uwf-hermes", args: "tid role" });
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
const lines = readFileSync(logPath, "utf8")
.trim()
.split("\n")
.map((line) => JSON.parse(line) as Record<string, string>);
const last = lines[lines.length - 1];
expect(last?.command).toBe("uwf-hermes");
expect(last?.args).toBe("tid role");
});
});
+7
View File
@@ -13,6 +13,13 @@ export {
validateFrontmatter,
} from "./frontmatter-markdown/index.js";
export { createLogger } from "./logger.js";
export { createProcessLogger } from "./process-logger/index.js";
export type {
CreateProcessLoggerOptions,
ProcessLogFn,
ProcessLogger,
ProcessLoggerContext,
} from "./process-logger/index.js";
export { normalizeRefsField } from "./refs-field.js";
export { err, ok } from "./result.js";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
+1 -21
View File
@@ -1,28 +1,8 @@
import { appendFileSync } from "node:fs";
import { CROCKFORD_BASE32_ALPHABET } from "./base32.js";
import { assertValidLogTag } from "./process-logger/log-tag.js";
import type { CreateLoggerOptions, LogFn } from "./types.js";
const TAG_LENGTH = 8;
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
function assertValidLogTag(tag: string): void {
if (tag.length !== TAG_LENGTH) {
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
}
for (let i = 0; i < tag.length; i++) {
const ch = tag[i];
if (ch === undefined) {
throw new Error("log tag validation failed");
}
const upper = ch.toUpperCase();
if (!TAG_CHAR_SET.has(upper)) {
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
}
}
}
/** Append one JSONL log record: `{ tag, content, timestamp }` per RFC-001. */
export function createLogger(options: CreateLoggerOptions): LogFn {
if (options.sink.kind === "stderr") {
@@ -0,0 +1,7 @@
export { createProcessLogger } from "./process-logger.js";
export type {
CreateProcessLoggerOptions,
ProcessLogFn,
ProcessLogger,
ProcessLoggerContext,
} from "./types.js";
@@ -0,0 +1,21 @@
import { CROCKFORD_BASE32_ALPHABET } from "../base32.js";
const TAG_LENGTH = 8;
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
export function assertValidLogTag(tag: string): void {
if (tag.length !== TAG_LENGTH) {
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
}
for (let i = 0; i < tag.length; i++) {
const ch = tag[i];
if (ch === undefined) {
throw new Error("log tag validation failed");
}
const upper = ch.toUpperCase();
if (!TAG_CHAR_SET.has(upper)) {
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
}
}
}
@@ -0,0 +1,78 @@
import { appendFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
import { getDefaultWorkflowStorageRoot } from "../storage-root.js";
import { assertValidLogTag } from "./log-tag.js";
import type { CreateProcessLoggerOptions, ProcessLogger, ProcessLoggerContext } from "./types.js";
const INIT_TAG = "W9F3RK2M";
function logDateKey(date: Date): string {
return date.toISOString().slice(0, 10);
}
function getProcessLogsDir(storageRoot: string): string {
return join(storageRoot, "logs");
}
function getProcessLogFilePath(storageRoot: string, date: Date): string {
return join(getProcessLogsDir(storageRoot), `${logDateKey(date)}.jsonl`);
}
function buildEntry(
processId: string,
tag: string,
msg: string,
baseContext: ProcessLoggerContext,
extra: Record<string, string> | null,
): Record<string, string> {
const entry: Record<string, string> = {
ts: new Date().toISOString(),
pid: processId,
tag: tag.toUpperCase(),
msg,
};
if (baseContext.thread !== null) {
entry.thread = baseContext.thread;
}
if (baseContext.workflow !== null) {
entry.workflow = baseContext.workflow;
}
if (extra !== null) {
for (const [key, value] of Object.entries(extra)) {
entry[key] = value;
}
}
return entry;
}
function appendEntry(filePath: string, entry: Record<string, string>): void {
appendFileSync(filePath, `${JSON.stringify(entry)}\n`, "utf8");
}
/** Process-scoped debug logger — append-only JSONL under `<storageRoot>/logs/YYYY-MM-DD.jsonl`. */
export function createProcessLogger(options: CreateProcessLoggerOptions): ProcessLogger {
const storageRoot = options.storageRoot ?? getDefaultWorkflowStorageRoot();
const processId = `${Date.now()}-${process.pid}`;
const baseContext = options.context;
const logFilePath = getProcessLogFilePath(storageRoot, new Date());
mkdirSync(getProcessLogsDir(storageRoot), { recursive: true });
const log: ProcessLogger["log"] = (tag, msg, context = null) => {
assertValidLogTag(tag);
appendEntry(logFilePath, buildEntry(processId, tag, msg, baseContext, context));
};
const argvSummary = JSON.stringify(process.argv);
const initParts = [`argv=${argvSummary}`, `node=${process.version}`];
if (baseContext.thread !== null) {
initParts.push(`thread=${baseContext.thread}`);
}
if (baseContext.workflow !== null) {
initParts.push(`workflow=${baseContext.workflow}`);
}
log(INIT_TAG, `process start ${initParts.join(" ")}`, null);
return { pid: processId, log };
}
@@ -0,0 +1,20 @@
export type ProcessLoggerContext = {
thread: string | null;
workflow: string | null;
};
export type CreateProcessLoggerOptions = {
storageRoot: string | null;
context: ProcessLoggerContext;
};
export type ProcessLogFn = (
tag: string,
msg: string,
context: Record<string, string> | null,
) => void;
export type ProcessLogger = {
pid: string;
log: ProcessLogFn;
};