Compare commits

...

20 Commits

Author SHA1 Message Date
xiaomo cd0a79d72b chore: remove accidental pnpm-lock.yaml 2026-05-23 06:47:25 +00:00
xiaomo 54631c43c7 docs: update cli-reference with log commands, --count flag, edge prompt concept 2026-05-23 06:32:27 +00:00
xiaomo 655b57c4b5 Merge pull request 'feat: add uwf log subcommands (list, show, clean)' (#415) from fix/413-log-subcommands into main 2026-05-23 06:27:15 +00:00
xiaoju 7faa8184ae feat: add uwf log subcommands (list, show, clean)
- uwf log list: list log files with sizes
- uwf log show --thread <id>: filter by thread ID
- uwf log show --process <pid>: filter by process ID
- uwf log clean --before <date>: delete old log files
- Tests: 12 new tests covering all subcommands

Implemented by solve-issue workflow, biome fixes applied manually.

Closes #413
Refs #411, #410
2026-05-23 06:23:56 +00:00
xiaoju 816137315e feat: add uwf log subcommands (list, show, clean)
- cmdLogList: list log files with sizes, sorted by date descending
- cmdLogShow: filter entries by thread, process, and/or date
- cmdLogClean: delete log files older than given date
- 12 tests covering all functions and edge cases

Fixes #413
2026-05-23 06:21:06 +00:00
xiaoju 9a111d16c7 fix: invalid Crockford Base32 char 'L' in log tag PL_AGENT_DONE
Fixes runtime crash on uwf thread step.
2026-05-23 06:13:29 +00:00
xiaoju ea6ceafe51 merge: resolve conflict in process-logger test (use null 3rd arg) 2026-05-23 06:10:53 +00:00
xiaoju d0dc7b5a19 feat: add process-level debug logger (Phase 1)
- New ProcessLogger in workflow-util: process-scoped JSONL logger
- Entry schema: {ts, pid, tag, msg, thread, workflow}
- Storage: ~/.uncaged/workflow/logs/YYYY-MM-DD.jsonl
- Auto logs process init info (argv, node version, context)
- cli-workflow thread commands fully instrumented:
  - thread start/step, moderator evaluate, agent spawn/done
  - thread archived, error paths

Refs #411, #412, #410
2026-05-23 06:10:05 +00:00
xiaomo 3b81521e9d Merge pull request 'feat: add process-level debug logger (Phase 1)' (#414) from feat/411-process-logger into main 2026-05-23 06:09:15 +00:00
xiaoju aa0a23293f feat: add process-level debug logger (Phase 1)
- New ProcessLogger in workflow-util: process-scoped JSONL logger
- Entry schema: {ts, pid, tag, msg, thread, workflow}
- Storage: ~/.uncaged/workflow/logs/YYYY-MM-DD.jsonl
- Auto logs process init info (argv, node version, context)
- cli-workflow thread commands fully instrumented:
  - thread start/step, moderator evaluate, agent spawn/done
  - thread archived, error paths

Refs #411, #412, #410
2026-05-23 06:07:45 +00:00
xiaomo 187dd036e5 Merge pull request 'feat: replace edgePrompt null check with isFirstVisit (Phase 2)' (#409) from feat/405-phase2-find-last-role-index into main 2026-05-23 04:55:23 +00:00
xiaoju 4b45f4e6d1 feat: replace edgePrompt null check with isFirstVisit (Phase 2)
- Add isFirstVisit: boolean to AgentContext
- Compute from steps history: !steps.some(s => s.role === role)
- hermes.ts: use isFirstVisit for first-entry vs re-entry logic
- buildInitialPrompt: always append edgePrompt as Moderator Instruction
- edgePrompt is never blanked — always the real moderator instruction
- New tests for first-visit, re-entry, and fallback scenarios

Refs #405, #407, #404
2026-05-23 04:54:11 +00:00
xiaomo 2a6bce4918 Merge pull request 'feat: make edge prompt required (Phase 1)' (#408) from feat/405-edge-prompt-required into main 2026-05-23 04:36:53 +00:00
xiaoju 3d6399c0e3 feat: make edge prompt required (Phase 1)
- Transition.prompt: string | null → string
- EvaluateResult.prompt: string | null → string
- AgentContext.edgePrompt: string | null → string
- CLI YAML validation rejects missing prompt
- All tests updated

Phase 2 will replace edgePrompt === null checks with findLastRoleIndex.

Refs #405, #406, #404
2026-05-23 04:28:58 +00:00
xiaomo b9258f84a5 Merge pull request 'feat: edge prompt + session resume (#402)' (#403) from feat/402-edge-prompt-session-resume into main 2026-05-23 04:00:24 +00:00
xiaoju 638329a562 feat: edge prompt + session resume implementation (#402)
- buildContinuationPrompt: incremental prompt for role re-entry
- buildHermesPrompt: dual-mode (initial vs continuation)
- session-cache: thread:role → hermes sessionId mapping
- HermesAcpClient.resume(): session/resume JSON-RPC
- Fallback: cache miss or resume fail → initial prompt
- UWF_NO_RESUME env to skip cache
- solve-issue.yaml: reviewer→developer edge prompt
- Tests updated for EvaluateResult + continuation prompt

Refs #402
2026-05-23 03:57:04 +00:00
xiaoju 1a06e014f5 feat(protocol): add edge prompt to Transition + EvaluateResult (#402)
- Transition type gains prompt: string | null
- evaluate() returns EvaluateResult { role, prompt } instead of string
- normalizeGraph coerces prompt: undefined → null
- spawnAgent passes edge prompt via UWF_EDGE_PROMPT env
- AgentContext gains edgePrompt field

Refs #402
2026-05-23 03:49:15 +00:00
xiaoju d5d05334f5 fix: ACP client permission handling and process cleanup
Two bugs fixed:
1. request_permission messages (JSON-RPC requests with both id+method) were
   silently swallowed by the response handler, causing hermes to hang waiting
   for permission approval. Now properly distinguish responses (id only) from
   server requests (id+method).
2. uwf-hermes process never exited after completing because the hermes ACP
   subprocess was still alive. Now explicitly close the ACP client after
   agent completion so the subprocess terminates.

小橘 <xiaoju@shazhou.work>
2026-05-22 14:51:43 +00:00
xiaoju 844f5438fe fix: replace @agentclientprotocol/sdk with readline-based JSON-RPC
The official TS SDK's ndJsonStream hangs indefinitely on prompt()
for sessions with 20+ messages (solve-issue planner). Root cause
appears to be a stream backpressure issue in the SDK's ReadableStream
adapter.

Switch back to readline-based line parsing which reliably receives
all JSON-RPC responses. Also handle session/request_permission
inline (auto-approve, yolo mode equivalent).

Ref #398
2026-05-22 14:34:27 +00:00
xiaomo e329d74ec0 Merge pull request 'refactor: migrate hermes agent from stdout parsing to ACP protocol' (#401) from feat/398-hermes-acp-client into main 2026-05-22 13:16:46 +00:00
35 changed files with 1558 additions and 256 deletions
+18
View File
@@ -154,25 +154,43 @@ conditions:
graph:
$START:
- role: "planner"
condition: null
prompt: "Analyze the issue and produce an implementation plan."
planner:
- role: "$END"
condition: "insufficientInfo"
prompt: "Insufficient information to proceed; end the workflow."
- role: "developer"
condition: null
prompt: "Implement the plan from the planner."
developer:
- role: "$END"
condition: "devFailed"
prompt: "Development failed; end the workflow."
- role: "reviewer"
condition: null
prompt: "Send the implementation to the reviewer."
reviewer:
- role: "developer"
condition: "rejected"
prompt: "Reviewer rejected the implementation; fix the issues."
- role: "tester"
condition: null
prompt: "Review passed; run tests on the implementation."
tester:
- role: "developer"
condition: "fixCode"
prompt: "Tests found code issues; return to developer."
- role: "planner"
condition: "fixSpec"
prompt: "Tests found spec issues; return to planner."
- role: "committer"
condition: null
prompt: "Tests passed; commit and push the changes."
committer:
- role: "developer"
condition: "hookFailed"
prompt: "Push hook failed; return to developer to fix."
- role: "$END"
condition: null
prompt: "Commit succeeded; complete the workflow."
+2
View File
@@ -36,6 +36,8 @@ graph:
$START:
- role: "analyst"
condition: null
prompt: "Analyze the topic in the task and produce a structured summary with key points."
analyst:
- role: "$END"
condition: null
prompt: "Analysis complete. Finish the workflow."
+5
View File
@@ -62,14 +62,19 @@ graph:
$START:
- role: "planner"
condition: null
prompt: "Analyze the issue described in the task and produce a detailed implementation plan."
planner:
- role: "developer"
condition: null
prompt: "Implement the plan from the planner. Write code, tests, and ensure existing tests pass."
developer:
- role: "reviewer"
condition: null
prompt: "Review the developer's implementation against the plan for correctness and quality."
reviewer:
- role: "developer"
condition: "notApproved"
prompt: "The reviewer rejected your implementation. Read their feedback and fix the issues."
- role: "$END"
condition: null
prompt: "The review passed. Complete the workflow."
@@ -0,0 +1,181 @@
import { mkdir, readdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdLogClean, cmdLogList, cmdLogShow } from "../commands/log.js";
let storageRoot: string;
beforeEach(async () => {
storageRoot = join(tmpdir(), `uwf-log-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
await mkdir(join(storageRoot, "logs"), { recursive: true });
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
const entry1 = JSON.stringify({
ts: "2026-05-20T10:00:00.000Z",
pid: "1716200000000-1234",
tag: "W9F3RK2M",
msg: "process start",
thread: "01J1234ABCDEF",
workflow: "solve-issue",
});
const entry2 = JSON.stringify({
ts: "2026-05-20T10:00:01.000Z",
pid: "1716200000000-1234",
tag: "ABC12345",
msg: "step executed",
thread: "01J1234ABCDEF",
workflow: "solve-issue",
});
const entry3 = JSON.stringify({
ts: "2026-05-20T10:00:02.000Z",
pid: "1716200000000-5678",
tag: "XYZ98765",
msg: "different process",
thread: "01JOTHER000000",
workflow: "review-code",
});
const oldEntry = JSON.stringify({
ts: "2026-05-19T08:00:00.000Z",
pid: "1716200000000-9999",
tag: "OLD1TAG1",
msg: "old entry",
thread: "01JOLD0000000",
workflow: "solve-issue",
});
const olderEntry = JSON.stringify({
ts: "2026-05-18T08:00:00.000Z",
pid: "1716200000000-0001",
tag: "OLD2TAG2",
msg: "older entry",
thread: "01JOLDER00000",
workflow: "review-code",
});
async function writeLogFiles(): Promise<void> {
const logsDir = join(storageRoot, "logs");
await writeFile(join(logsDir, "2026-05-20.jsonl"), [entry1, entry2, entry3].join("\n") + "\n");
await writeFile(join(logsDir, "2026-05-19.jsonl"), oldEntry + "\n");
await writeFile(join(logsDir, "2026-05-18.jsonl"), olderEntry + "\n");
}
describe("cmdLogList", () => {
test("lists log files with sizes sorted by date descending", async () => {
await writeLogFiles();
const result = await cmdLogList(storageRoot);
expect(result).toHaveLength(3);
expect(result[0].name).toBe("2026-05-20.jsonl");
expect(result[0].date).toBe("2026-05-20");
expect(result[0].size).toBeGreaterThan(0);
expect(result[1].name).toBe("2026-05-19.jsonl");
expect(result[2].name).toBe("2026-05-18.jsonl");
});
test("returns empty array when no log files exist", async () => {
const result = await cmdLogList(storageRoot);
expect(result).toEqual([]);
});
test("returns empty array when logs directory does not exist", async () => {
const noLogsRoot = join(storageRoot, "nonexistent");
await mkdir(noLogsRoot, { recursive: true });
const result = await cmdLogList(noLogsRoot);
expect(result).toEqual([]);
});
});
describe("cmdLogShow", () => {
test("filters by thread ID", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, {
thread: "01J1234ABCDEF",
process: null,
date: null,
});
expect(result).toHaveLength(2);
expect(result.every((e) => e.thread === "01J1234ABCDEF")).toBe(true);
});
test("filters by process ID", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, {
thread: null,
process: "1716200000000-1234",
date: null,
});
expect(result).toHaveLength(2);
expect(result.every((e) => e.pid === "1716200000000-1234")).toBe(true);
});
test("filters by date", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, {
thread: null,
process: null,
date: "2026-05-19",
});
expect(result).toHaveLength(1);
expect(result[0].msg).toBe("old entry");
});
test("reads all files when no date filter", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, { thread: null, process: null, date: null });
expect(result).toHaveLength(5);
// sorted by ts ascending
expect(result[0].ts).toBe("2026-05-18T08:00:00.000Z");
expect(result[4].ts).toBe("2026-05-20T10:00:02.000Z");
});
test("returns empty when no matches", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, {
thread: "NONEXISTENT",
process: null,
date: null,
});
expect(result).toEqual([]);
});
test("combined thread + date filter", async () => {
await writeLogFiles();
const result = await cmdLogShow(storageRoot, {
thread: "01J1234ABCDEF",
process: null,
date: "2026-05-20",
});
expect(result).toHaveLength(2);
expect(result.every((e) => e.thread === "01J1234ABCDEF")).toBe(true);
});
});
describe("cmdLogClean", () => {
test("deletes files before given date", async () => {
await writeLogFiles();
const result = await cmdLogClean(storageRoot, "2026-05-20");
expect(result.deleted).toBe(2);
const remaining = await readdir(join(storageRoot, "logs"));
expect(remaining).toEqual(["2026-05-20.jsonl"]);
});
test("deletes nothing when all files are newer", async () => {
await writeLogFiles();
const result = await cmdLogClean(storageRoot, "2026-05-18");
expect(result.deleted).toBe(0);
});
test("handles missing logs directory gracefully", async () => {
const noLogsRoot = join(storageRoot, "nonexistent");
await mkdir(noLogsRoot, { recursive: true });
const result = await cmdLogClean(noLogsRoot, "2026-05-20");
expect(result).toEqual({ deleted: 0 });
});
});
+50
View File
@@ -14,6 +14,7 @@ import {
cmdCasSchemaList,
cmdCasWalk,
} from "./commands/cas.js";
import { cmdLogClean, cmdLogList, cmdLogShow } from "./commands/log.js";
import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js";
import { cmdSkillCli } from "./commands/skill.js";
import {
@@ -379,6 +380,55 @@ casSchema
});
});
const log = program.command("log").description("Process-level debug logs");
log
.command("list")
.description("List log files with sizes")
.action(() => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdLogList(storageRoot);
writeOutput(result);
});
});
log
.command("show")
.description("Show and filter log entries")
.option("--thread <thread-id>", "Filter by thread ID")
.option("--process <pid>", "Filter by process ID")
.option("--date <date>", "Filter by date (YYYY-MM-DD)")
.action(
(opts: {
thread: string | undefined;
process: string | undefined;
date: string | undefined;
}) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdLogShow(storageRoot, {
thread: opts.thread ?? null,
process: opts.process ?? null,
date: opts.date ?? null,
});
writeOutput(result);
});
},
);
log
.command("clean")
.description("Delete log files older than given date")
.requiredOption("--before <date>", "Delete files before this date (YYYY-MM-DD)")
.action((opts: { before: string }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const result = await cmdLogClean(storageRoot, opts.before);
writeOutput(result);
});
});
program.parseAsync(process.argv).catch((e: unknown) => {
const message = e instanceof Error ? e.message : String(e);
process.stderr.write(`${message}\n`);
+116
View File
@@ -0,0 +1,116 @@
import { readdir, readFile, stat, unlink } from "node:fs/promises";
import { join } from "node:path";
type LogListItem = {
name: string;
size: number;
date: string;
};
type LogShowFilter = {
thread: string | null;
process: string | null;
date: string | null;
};
type LogEntry = {
ts: string;
pid: string;
tag: string;
msg: string;
thread: string | null;
workflow: string | null;
};
type LogCleanResult = {
deleted: number;
};
function logsDir(storageRoot: string): string {
return join(storageRoot, "logs");
}
async function listLogFiles(dir: string): Promise<Array<string>> {
try {
const files = await readdir(dir);
return files.filter((f) => f.endsWith(".jsonl")).sort();
} catch {
return [];
}
}
function dateFromFilename(name: string): string {
return name.replace(".jsonl", "");
}
async function parseJsonlFile(path: string): Promise<Array<LogEntry>> {
const content = await readFile(path, "utf-8");
const lines = content
.trim()
.split("\n")
.filter((l) => l.length > 0);
return lines.map((line) => JSON.parse(line) as LogEntry);
}
export async function cmdLogList(storageRoot: string): Promise<Array<LogListItem>> {
const dir = logsDir(storageRoot);
const files = await listLogFiles(dir);
const items: Array<LogListItem> = [];
for (const name of files) {
const s = await stat(join(dir, name));
items.push({ name, size: s.size, date: dateFromFilename(name) });
}
// sort by date descending
items.sort((a, b) => (a.date > b.date ? -1 : a.date < b.date ? 1 : 0));
return items;
}
export async function cmdLogShow(
storageRoot: string,
filter: LogShowFilter,
): Promise<Array<LogEntry>> {
const dir = logsDir(storageRoot);
let files: Array<string>;
if (filter.date !== null) {
files = [`${filter.date}.jsonl`];
} else {
files = await listLogFiles(dir);
}
let entries: Array<LogEntry> = [];
for (const file of files) {
try {
const parsed = await parseJsonlFile(join(dir, file));
entries = entries.concat(parsed);
} catch {
// file doesn't exist or is unreadable, skip
}
}
if (filter.thread !== null) {
entries = entries.filter((e) => e.thread === filter.thread);
}
if (filter.process !== null) {
entries = entries.filter((e) => e.pid === filter.process);
}
entries.sort((a, b) => (a.ts < b.ts ? -1 : a.ts > b.ts ? 1 : 0));
return entries;
}
export async function cmdLogClean(storageRoot: string, before: string): Promise<LogCleanResult> {
const dir = logsDir(storageRoot);
const files = await listLogFiles(dir);
let deleted = 0;
for (const name of files) {
const date = dateFromFilename(name);
if (date < before) {
await unlink(join(dir, name));
deleted++;
}
}
return { deleted };
}
+81 -16
View File
@@ -23,7 +23,7 @@ import type {
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv";
import { parse, stringify } from "yaml";
@@ -47,6 +47,18 @@ import { materializeWorkflowPayload } from "./workflow.js";
const END_ROLE = "$END";
export const THREAD_READ_DEFAULT_QUOTA = 4000;
const PL_THREAD_START = "7HNQ4B2X";
const PL_MODERATOR = "M3K8V9T1";
const PL_AGENT_SPAWN = "R5J2W8N4";
const PL_AGENT_DONE = "C6P9E3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6";
function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null);
fail(message);
}
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
@@ -168,6 +180,10 @@ export async function cmdThreadStart(
const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot);
const threadId = generateUlid(Date.now()) as ThreadId;
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
const startPayload: StartNodePayload = {
workflow: workflowHash,
prompt,
@@ -183,6 +199,12 @@ export async function cmdThreadStart(
index[threadId] = headHash;
await saveThreadsIndex(storageRoot, index);
plog.log(
PL_THREAD_START,
`thread created workflow=${workflowHash} thread=${threadId} head=${headHash}`,
null,
);
return { workflow: workflowHash, thread: threadId };
}
@@ -624,13 +646,20 @@ function resolveAgentConfig(
return agentConfig;
}
function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRef {
function spawnAgent(
plog: ProcessLogger,
agent: AgentConfig,
threadId: ThreadId,
role: string,
edgePrompt: string,
): CasRef {
const argv = [...agent.args, threadId, role];
const env = { ...process.env, UWF_EDGE_PROMPT: edgePrompt };
let stdout: string;
try {
stdout = execFileSync(agent.command, argv, {
encoding: "utf8",
env: process.env,
env,
stdio: ["ignore", "pipe", "pipe"],
});
} catch (e) {
@@ -642,12 +671,12 @@ function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRe
? err.stderr
: err.stderr.toString("utf8");
const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : "";
fail(`agent command failed (${agent.command})${detail}`);
failStep(plog, `agent command failed (${agent.command})${detail}`);
}
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
if (!isCasRef(line)) {
fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
failStep(plog, `agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
}
return line;
}
@@ -679,9 +708,15 @@ export async function cmdThreadStep(
fail(`--count must be a positive integer, got: ${count}`);
}
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
const results: StepOutput[] = [];
for (let i = 0; i < count; i++) {
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride);
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
results.push(result);
if (result.done) {
break;
@@ -690,16 +725,31 @@ export async function cmdThreadStep(
return results;
}
async function cmdThreadStepOnce(
async function resolveActiveThreadWorkflowHash(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
): Promise<StepOutput> {
): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
fail(`thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
return chain.start.workflow;
}
async function cmdThreadStepOnce(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
plog: ProcessLogger,
): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
@@ -709,10 +759,17 @@ async function cmdThreadStepOnce(
const nextResult = await evaluate(workflow, context);
if (!nextResult.ok) {
fail(nextResult.error.message);
failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`);
}
if (nextResult.value === END_ROLE) {
plog.log(
PL_MODERATOR,
`moderator role=${nextResult.value.role} prompt=${nextResult.value.prompt}`,
null,
);
if (nextResult.value.role === END_ROLE) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null);
await archiveThread(storageRoot, threadId, workflowHash, headHash);
return {
workflow: workflowHash,
@@ -722,18 +779,25 @@ async function cmdThreadStepOnce(
};
}
const role = nextResult.value;
const role = nextResult.value.role;
const edgePrompt = nextResult.value.prompt;
const config = await loadWorkflowConfig(storageRoot);
const agent = resolveAgentConfig(config, workflow, role, agentOverride);
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, {
args: [...agent.args, threadId, role].join(" "),
});
loadDotenv({ path: getEnvPath(storageRoot) });
const newHead = spawnAgent(agent, threadId, role);
const newHead = spawnAgent(plog, agent, threadId, role, edgePrompt);
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
// Re-create store to pick up nodes written by the agent subprocess
const uwfAfter = await createUwfStore(storageRoot);
const newNode = uwfAfter.store.get(newHead);
if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) {
fail(`agent returned hash that is not a StepNode: ${newHead}`);
failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`);
}
// Reload threads index to avoid overwriting changes made by the agent subprocess
@@ -745,11 +809,12 @@ async function cmdThreadStepOnce(
const contextAfter = buildModeratorContext(uwfAfter, chainAfter);
const afterResult = await evaluate(workflow, contextAfter);
if (!afterResult.ok) {
fail(afterResult.error.message);
failStep(plog, `post-step moderator evaluate failed: ${afterResult.error.message}`);
}
const done = afterResult.value === END_ROLE;
const done = afterResult.value.role === END_ROLE;
if (done) {
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null);
await archiveThread(storageRoot, threadId, workflowHash, newHead);
}
+10 -4
View File
@@ -55,10 +55,16 @@ function isJsonSchema(value: unknown): value is JSONSchema {
function normalizeGraph(graph: Record<string, Transition[]>): Record<string, Transition[]> {
const result: Record<string, Transition[]> = {};
for (const [node, transitions] of Object.entries(graph)) {
result[node] = transitions.map((t) => ({
role: t.role,
condition: t.condition ?? null,
}));
result[node] = transitions.map((t) => {
if (typeof t.prompt !== "string" || t.prompt.trim() === "") {
fail(`graph[${node}] transition to "${t.role}": prompt is required (non-empty string)`);
}
return {
role: t.role,
condition: t.condition ?? null,
prompt: t.prompt,
};
});
}
return result;
}
+2
View File
@@ -44,6 +44,8 @@ function isTransition(value: unknown): boolean {
const condition = value.condition;
return (
typeof value.role === "string" &&
typeof value.prompt === "string" &&
value.prompt.trim() !== "" &&
(condition === null || condition === undefined || typeof condition === "string")
);
}
@@ -1,9 +1,13 @@
import { describe, expect, test } from "bun:test";
import type { AgentContext } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { buildClaudeCodePrompt } from "../src/claude-code.js";
function makeCtx(overrides: Partial<AgentContext> = {}): AgentContext {
return {
threadId: "01JTEST0000000000000000000" as ThreadId,
edgePrompt: "Proceed with the assigned role.",
isFirstVisit: true,
workflow: {
roles: {
developer: {
@@ -0,0 +1,78 @@
import { describe, expect, test } from "bun:test";
import type { AgentContext } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
import { buildHermesPrompt } from "../src/hermes.js";
function makeCtx(overrides: Partial<AgentContext> = {}): AgentContext {
return {
threadId: "01JTEST0000000000000000000" as ThreadId,
edgePrompt: "Proceed with the assigned role.",
isFirstVisit: true,
workflow: {
roles: {
developer: {
description: "TDD implementation per test spec",
goal: "Write code",
capabilities: ["coding"],
procedure: "1. Read spec\n2. Write code",
output: "List files changed",
frontmatter: "",
},
},
conditions: {},
graph: {},
},
role: "developer",
start: { prompt: "Fix the bug", workflowHash: "abc123", threadId: "t1" },
steps: [],
store: {} as AgentContext["store"],
outputFormatInstruction: "Use YAML frontmatter",
...overrides,
};
}
describe("buildHermesPrompt", () => {
test("first visit uses full role prompt and includes moderator instruction", () => {
const result = buildHermesPrompt(
makeCtx({ edgePrompt: "Focus on the failing test.", isFirstVisit: true }),
);
expect(result).toMatch(/^Use YAML frontmatter/);
expect(result).toContain("Write code");
expect(result).toContain("## Task\nFix the bug");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("Focus on the failing test.");
});
test("re-entry uses continuation prompt with edge instruction", () => {
const ctx = makeCtx({
isFirstVisit: false,
edgePrompt: "The reviewer rejected your work. Fix the issues.",
steps: [
{ role: "developer", output: { summary: "Initial fix" }, agent: "uwf-hermes" },
{ role: "reviewer", output: { approved: false }, agent: "uwf-hermes" },
],
});
const result = buildHermesPrompt(ctx);
expect(result).not.toContain("## Task");
expect(result).toContain("## What Happened Since Your Last Turn");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("The reviewer rejected your work.");
});
test("forced first visit via isFirstVisit uses initial prompt even when role appears in history", () => {
const result = buildHermesPrompt(
makeCtx({
isFirstVisit: true,
steps: [{ role: "developer", output: { done: true }, agent: "uwf-hermes" }],
edgePrompt: "Retry with a fresh approach.",
}),
);
expect(result).toContain("## Task");
expect(result).toContain("Retry with a fresh approach.");
expect(result).not.toContain("## What Happened Since Your Last Turn");
});
});
+3 -2
View File
@@ -21,9 +21,10 @@
"test": "bun test"
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.22.1",
"@uncaged/json-cas": "^0.4.0",
"@uncaged/workflow-agent-kit": "workspace:^"
"@uncaged/workflow-agent-kit": "workspace:^",
"@uncaged/workflow-protocol": "workspace:^",
"@uncaged/workflow-util": "workspace:^"
},
"devDependencies": {
"typescript": "^5.8.3"
+314 -154
View File
@@ -1,17 +1,23 @@
import type { ChildProcess } from "node:child_process";
import { spawn } from "node:child_process";
import { Readable, Writable } from "node:stream";
import type {
Client,
RequestPermissionRequest,
RequestPermissionResponse,
SessionNotification,
} from "@agentclientprotocol/sdk";
import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
import { createInterface } from "node:readline";
import type { HermesSessionMessage } from "./types.js";
const HERMES_COMMAND = "hermes";
const PROTOCOL_VERSION = 1;
type JsonRpcResponse = {
jsonrpc: "2.0";
id: number;
result?: unknown;
error?: { code: number; message: string };
};
type PendingRequest = {
resolve: (value: JsonRpcResponse) => void;
reject: (reason: Error) => void;
};
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
type PendingToolCall = {
@@ -19,110 +25,6 @@ type PendingToolCall = {
args: string;
};
/**
* Collects ACP session/update events into a list of {@link HermesSessionMessage}
* that mirrors what Hermes writes to its session JSONL files.
*/
class UwfAcpClient implements Client {
private messageChunks: string[] = [];
private reasoningChunks: string[] = [];
private pendingTools = new Map<string, PendingToolCall>();
messages: HermesSessionMessage[] = [];
resetPerPrompt(): void {
this.messageChunks = [];
this.reasoningChunks = [];
}
async sessionUpdate(params: SessionNotification): Promise<void> {
const { update } = params;
switch (update.sessionUpdate) {
case "agent_message_chunk":
if (update.content.type === "text") {
this.messageChunks.push(update.content.text);
}
break;
case "agent_thought_chunk":
if (update.content.type === "text") {
this.reasoningChunks.push(update.content.text);
}
break;
case "tool_call": {
// Agent is invoking a tool — record the call.
const title = update.title ?? "";
const rawInput =
update.rawInput !== undefined && update.rawInput !== null
? JSON.stringify(update.rawInput)
: "";
this.pendingTools.set(update.toolCallId, { name: title, args: rawInput });
// Flush accumulated assistant text + reasoning as an assistant message
// (the agent "spoke" before calling the tool).
this.flushAssistantMessage();
break;
}
case "tool_call_update": {
if (update.status === "completed" || update.status === "failed") {
const pending = this.pendingTools.get(update.toolCallId);
const toolName = pending?.name ?? update.toolCallId;
const rawOutput =
update.rawOutput !== undefined && update.rawOutput !== null
? typeof update.rawOutput === "string"
? update.rawOutput
: JSON.stringify(update.rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: rawOutput,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(update.toolCallId);
}
break;
}
default:
break;
}
}
/** Flush any accumulated text/reasoning into an assistant message. */
flushAssistantMessage(): void {
const text = this.messageChunks.join("");
const reasoning = this.reasoningChunks.join("");
if (text !== "" || reasoning !== "") {
this.messages.push({
role: "assistant",
content: text || null,
reasoning: reasoning || null,
tool_calls: null,
});
}
this.messageChunks = [];
this.reasoningChunks = [];
}
async requestPermission(params: RequestPermissionRequest): Promise<RequestPermissionResponse> {
const firstOption = params.options[0];
return {
outcome: {
outcome: "selected",
optionId: firstOption?.optionId ?? "",
},
};
}
}
export type AcpPromptResult = {
text: string;
sessionId: string;
@@ -131,51 +33,52 @@ export type AcpPromptResult = {
export class HermesAcpClient {
private process: ChildProcess | null = null;
private connection: ClientSideConnection | null = null;
private nextId = 1;
private sessionId: string | null = null;
private stderrBuffer = "";
private client = new UwfAcpClient();
private pending = new Map<number, PendingRequest>();
// Message collection state
private messageChunks: string[] = [];
private reasoningChunks: string[] = [];
private pendingTools = new Map<string, PendingToolCall>();
messages: HermesSessionMessage[] = [];
/** Spawn hermes acp, initialize, create session */
async connect(cwd: string): Promise<string> {
const child = spawn(HERMES_COMMAND, ["acp"], {
env: process.env,
shell: false,
stdio: ["pipe", "pipe", "pipe"],
});
await this.ensureProcess();
await this.initialize();
this.process = child;
const sessionResponse = (await this.sendRequest("session/new", {
cwd,
mcpServers: [],
})) as { result: { sessionId: string } };
child.stderr?.on("data", (chunk: Buffer) => {
this.stderrBuffer += chunk.toString();
});
if (child.stdin === null || child.stdout === null) {
throw new Error("hermes acp process stdio is not available");
const sessionId = sessionResponse.result?.sessionId;
if (typeof sessionId !== "string" || sessionId === "") {
throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`);
}
const input = Writable.toWeb(child.stdin);
const output = Readable.toWeb(child.stdout);
const stream = ndJsonStream(input, output);
this.sessionId = sessionId;
return sessionId;
}
const clientRef = this.client;
const connection = new ClientSideConnection((_agent) => clientRef, stream);
this.connection = connection;
/** Spawn hermes acp, initialize, resume an existing session */
async resume(sessionId: string, cwd: string): Promise<string> {
await this.ensureProcess();
await this.initialize();
connection.signal.addEventListener("abort", () => {
if (this.sessionId !== null) {
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
throw new Error(`hermes acp connection closed unexpectedly${detail}`);
}
const response = await this.sendRequest("session/resume", {
cwd,
sessionId,
mcpServers: [],
});
await connection.initialize({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: {},
});
const sessionResult = await connection.newSession({ cwd, mcpServers: [] });
const { sessionId } = sessionResult;
if ((response as { error?: unknown }).error !== undefined) {
throw new Error(
`session/resume failed: ${JSON.stringify((response as { error: unknown }).error)}`,
);
}
this.sessionId = sessionId;
return sessionId;
@@ -183,26 +86,37 @@ export class HermesAcpClient {
/** Send prompt and collect full response text + structured messages. */
async prompt(text: string): Promise<AcpPromptResult> {
if (this.connection === null || this.sessionId === null) {
if (this.sessionId === null) {
throw new Error("Not connected — call connect() first");
}
this.client.resetPerPrompt();
this.messageChunks = [];
this.reasoningChunks = [];
await this.connection.prompt({
const response = await this.sendRequest("session/prompt", {
sessionId: this.sessionId,
prompt: [{ type: "text", text }],
});
if ((response as { error?: unknown }).error !== undefined) {
throw new Error(
`session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`,
);
}
// Flush any trailing assistant text that wasn't followed by a tool call.
this.client.flushAssistantMessage();
this.flushAssistantMessage();
// Extract the final assistant text from collected messages.
const messages = this.client.messages;
let finalText = "";
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") {
for (let i = this.messages.length - 1; i >= 0; i--) {
const msg = this.messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
finalText = msg.content;
break;
}
@@ -211,7 +125,7 @@ export class HermesAcpClient {
return {
text: finalText,
sessionId: this.sessionId,
messages,
messages: this.messages,
};
}
@@ -228,6 +142,252 @@ export class HermesAcpClient {
setTimeout(resolve, 5000);
});
this.process = null;
this.connection = null;
}
// ---- JSON-RPC transport ----
private sendRequest(
method: string,
params: Record<string, unknown>,
timeoutMs = 10 * 60 * 1000,
): Promise<JsonRpcResponse> {
const id = this.nextId++;
return new Promise<JsonRpcResponse>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`Timeout waiting for response to ${method} (id=${id})`));
}, timeoutMs);
this.pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
resolve(value);
},
reject: (err) => {
clearTimeout(timer);
reject(err);
},
});
this.writeLine(JSON.stringify({ jsonrpc: "2.0", id, method, params }));
});
}
private sendNotification(method: string, params?: Record<string, unknown>): void {
const message: Record<string, unknown> = { jsonrpc: "2.0", method };
if (params !== undefined) {
message.params = params;
}
this.writeLine(JSON.stringify(message));
}
private writeLine(line: string): void {
if (this.process?.stdin === null || this.process?.stdin === undefined) {
throw new Error("Cannot write: hermes acp process stdin not available");
}
this.process.stdin.write(`${line}\n`);
}
private handleLine(line: string): void {
if (line === "") {
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
const msg = parsed as Record<string, unknown>;
const hasId = "id" in msg && msg.id !== undefined && msg.id !== null;
const hasMethod = typeof msg.method === "string";
// JSON-RPC response to one of our requests (has "id" but no "method")
if (hasId && !hasMethod) {
const response = msg as unknown as JsonRpcResponse;
const handler = this.pending.get(response.id);
if (handler !== undefined) {
this.pending.delete(response.id);
handler.resolve(response);
}
return;
}
// Server-initiated JSON-RPC request: session/request_permission (has "id" + "method")
if (msg.method === "session/request_permission" && hasId) {
const params = msg.params as Record<string, unknown> | undefined;
const options = (params?.options ?? []) as Array<{ optionId?: string }>;
const firstOptionId = options[0]?.optionId ?? "";
this.writeLine(
JSON.stringify({
jsonrpc: "2.0",
id: msg.id,
result: { outcome: { outcome: "selected", optionId: firstOptionId } },
}),
);
return;
}
// JSON-RPC notification — session/update (no "id")
if (msg.method === "session/update") {
const params = msg.params as Record<string, unknown> | undefined;
const update = params?.update as Record<string, unknown> | undefined;
if (update !== undefined) {
this.handleSessionUpdate(update);
}
return;
}
}
// ---- Session update → structured messages ----
private handleSessionUpdate(update: Record<string, unknown>): void {
const updateType = update.sessionUpdate as string;
switch (updateType) {
case "agent_message_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text);
}
break;
}
case "agent_thought_chunk": {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
break;
}
case "tool_call": {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args =
rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
// Flush accumulated assistant text before tool call
this.flushAssistantMessage();
break;
}
case "tool_call_update": {
const status = update.status as string | undefined;
if (status === "completed" || status === "failed") {
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
break;
}
default:
break;
}
}
/** Flush any accumulated text/reasoning into an assistant message. */
private flushAssistantMessage(): void {
const text = this.messageChunks.join("");
const reasoning = this.reasoningChunks.join("");
if (text !== "" || reasoning !== "") {
this.messages.push({
role: "assistant",
content: text || null,
reasoning: reasoning || null,
tool_calls: null,
});
}
this.messageChunks = [];
this.reasoningChunks = [];
}
private rejectAll(err: Error): void {
for (const handler of this.pending.values()) {
handler.reject(err);
}
this.pending.clear();
}
private async ensureProcess(): Promise<void> {
if (this.process !== null) {
return;
}
const child = spawn(HERMES_COMMAND, ["acp"], {
env: process.env,
shell: false,
stdio: ["pipe", "pipe", "pipe"],
});
this.process = child;
child.stderr?.on("data", (chunk: Buffer) => {
this.stderrBuffer += chunk.toString();
});
child.on("error", (cause) => {
const message = cause instanceof Error ? cause.message : String(cause);
this.rejectAll(new Error(`hermes acp spawn failed: ${message}`));
});
child.on("close", (code) => {
if (code !== 0 && this.pending.size > 0) {
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
this.rejectAll(
new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`),
);
}
});
if (child.stdout === null) {
throw new Error("hermes acp process stdout is not available");
}
const rl = createInterface({ input: child.stdout });
rl.on("line", (line) => {
this.handleLine(line.trim());
});
}
private async initialize(): Promise<void> {
const initResponse = await this.sendRequest("initialize", {
protocolVersion: PROTOCOL_VERSION,
clientInfo: { name: "uwf", version: "0.1.0" },
capabilities: {},
});
if ((initResponse as { error?: unknown }).error !== undefined) {
throw new Error(
`initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`,
);
}
this.sendNotification("initialized");
}
}
+111 -12
View File
@@ -1,15 +1,19 @@
import type { Store } from "@uncaged/json-cas";
import {
type AgentContext,
type AgentRunResult,
buildContinuationPrompt,
buildRolePrompt,
createAgent,
} from "@uncaged/workflow-agent-kit";
import { createLogger } from "@uncaged/workflow-util";
import { HermesAcpClient } from "./acp-client.js";
import { getCachedSessionId, isResumeDisabled, setCachedSessionId } from "./session-cache.js";
import { storeHermesSessionDetail } from "./session-detail.js";
const log = createLogger({ sink: { kind: "stderr" } });
function buildHistorySummary(steps: AgentContext["steps"]): string {
if (steps.length === 0) {
return "";
@@ -29,12 +33,11 @@ function buildHistorySummary(steps: AgentContext["steps"]): string {
return lines.join("\n");
}
/** Assemble system prompt, task, and prior step outputs for Hermes. */
export function buildHermesPrompt(ctx: AgentContext): string {
function buildInitialPrompt(ctx: AgentContext): string {
const roleDef = ctx.workflow.roles[ctx.role];
const rolePrompt = roleDef !== undefined ? buildRolePrompt(roleDef) : "";
const parts: string[] = [];
if (ctx.outputFormatInstruction !== undefined && ctx.outputFormatInstruction !== "") {
if (ctx.outputFormatInstruction !== "") {
parts.push(ctx.outputFormatInstruction, "");
}
parts.push(rolePrompt, "", "## Task", ctx.start.prompt);
@@ -42,9 +45,73 @@ export function buildHermesPrompt(ctx: AgentContext): string {
if (historyBlock !== "") {
parts.push("", historyBlock);
}
parts.push("", "## Moderator Instruction", "", ctx.edgePrompt);
return parts.join("\n");
}
/** Assemble system prompt, task, and prior step outputs for Hermes. */
export function buildHermesPrompt(ctx: AgentContext): string {
if (!ctx.isFirstVisit) {
const parts: string[] = [];
if (ctx.outputFormatInstruction !== "") {
parts.push(ctx.outputFormatInstruction, "");
}
parts.push(buildContinuationPrompt(ctx.steps, ctx.role, ctx.edgePrompt));
return parts.join("\n");
}
return buildInitialPrompt(ctx);
}
async function storePromptResult(
store: Store,
sessionId: string,
messages: Awaited<ReturnType<HermesAcpClient["prompt"]>>["messages"],
): Promise<{ detailHash: string }> {
const session = {
session_id: sessionId,
model: "",
session_start: new Date().toISOString(),
messages,
};
return storeHermesSessionDetail(store, session);
}
type PromptAttempt = {
useContinuation: boolean;
resumed: boolean;
};
async function prepareSession(
client: HermesAcpClient,
ctx: AgentContext,
cwd: string,
): Promise<PromptAttempt> {
if (ctx.isFirstVisit || isResumeDisabled()) {
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
const cachedSessionId = await getCachedSessionId(ctx.threadId, ctx.role);
if (cachedSessionId === null) {
log("6RWK3N8Q", `no cached session for ${ctx.threadId}:${ctx.role}, starting new session`);
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
try {
await client.resume(cachedSessionId, cwd);
log("9MHT4V2P", `resumed hermes session ${cachedSessionId} for ${ctx.threadId}:${ctx.role}`);
return { useContinuation: true, resumed: true };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
log("3XPN7K4W", `session resume failed, falling back to new session: ${message}`);
await client.close();
await client.connect(cwd);
return { useContinuation: false, resumed: false };
}
}
/**
* Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode.
*
@@ -60,15 +127,38 @@ export function createHermesAgent(): () => Promise<void> {
void client.close();
});
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
const fullPrompt = buildHermesPrompt(ctx);
await client.connect(process.cwd());
async function runPrompt(ctx: AgentContext, useContinuation: boolean): Promise<AgentRunResult> {
const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true };
const fullPrompt = buildHermesPrompt(effectiveCtx);
const { text, sessionId, messages } = await client.prompt(fullPrompt);
const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages };
const { detailHash } = await storeHermesSessionDetail(ctx.store, session);
const { detailHash } = await storePromptResult(ctx.store, sessionId, messages);
if (!isResumeDisabled()) {
await setCachedSessionId(ctx.threadId, ctx.role, sessionId);
}
return { output: text, detailHash, sessionId };
}
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
const cwd = process.cwd();
const attempt = await prepareSession(client, ctx, cwd);
try {
return await runPrompt(ctx, attempt.useContinuation);
} catch (error) {
if (!attempt.resumed) {
throw error;
}
const message = error instanceof Error ? error.message : String(error);
log("8FQW2R6N", `continuation prompt failed, retrying with initial prompt: ${message}`);
await client.close();
await client.connect(cwd);
return runPrompt(ctx, false);
}
}
async function continueHermes(
_sessionId: string,
message: string,
@@ -77,14 +167,23 @@ export function createHermesAgent(): () => Promise<void> {
// Client is already connected from runHermes — same ACP session,
// so the agent sees the full conversation history (crucial for retries).
const { text, sessionId, messages } = await client.prompt(message);
const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages };
const { detailHash } = await storeHermesSessionDetail(store, session);
const { detailHash } = await storePromptResult(store, sessionId, messages);
return { output: text, detailHash, sessionId };
}
return createAgent({
const agentMain = createAgent({
name: "hermes",
run: runHermes,
continue: continueHermes,
});
// Wrap to ensure ACP client is closed after agent completes,
// so the hermes subprocess exits and bun can terminate.
return async () => {
try {
await agentMain();
} finally {
await client.close();
}
};
}
@@ -0,0 +1,70 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { resolveStorageRoot } from "@uncaged/workflow-agent-kit";
import type { ThreadId } from "@uncaged/workflow-protocol";
type HermesSessionCache = Record<string, string>;
function getCachePath(): string {
return join(resolveStorageRoot(), "cache", "hermes-sessions.json");
}
function cacheKey(threadId: ThreadId, role: string): string {
return `${threadId}:${role}`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
async function readCache(): Promise<HermesSessionCache> {
const path = getCachePath();
try {
const text = await readFile(path, "utf8");
const raw = JSON.parse(text) as unknown;
if (!isRecord(raw)) {
return {};
}
const cache: HermesSessionCache = {};
for (const [key, value] of Object.entries(raw)) {
if (typeof value === "string" && value !== "") {
cache[key] = value;
}
}
return cache;
} catch (e) {
const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") {
return {};
}
throw e;
}
}
async function writeCache(cache: HermesSessionCache): Promise<void> {
const path = getCachePath();
await mkdir(dirname(path), { recursive: true });
await writeFile(path, `${JSON.stringify(cache, null, 2)}\n`, "utf8");
}
export function isResumeDisabled(): boolean {
const flag = process.env.UWF_NO_RESUME;
return flag !== undefined && flag !== "";
}
export async function getCachedSessionId(threadId: ThreadId, role: string): Promise<string | null> {
const cache = await readCache();
const sessionId = cache[cacheKey(threadId, role)];
return sessionId ?? null;
}
export async function setCachedSessionId(
threadId: ThreadId,
role: string,
sessionId: string,
): Promise<void> {
const cache = await readCache();
cache[cacheKey(threadId, role)] = sessionId;
await writeCache(cache);
}
@@ -0,0 +1,70 @@
import type { StepContext } from "@uncaged/workflow-protocol";
import { describe, expect, test } from "vitest";
import { buildContinuationPrompt } from "../src/build-continuation-prompt.js";
const reviewerStep: StepContext = {
role: "reviewer",
output: { approved: false, comments: "Missing tests" },
detail: "2MXBG6PN4A8JR",
agent: "uwf-hermes",
};
const developerStep: StepContext = {
role: "developer",
output: { filesChanged: ["src/app.ts"], summary: "Initial fix" },
detail: "1VPBG9SM5E7WK",
agent: "uwf-hermes",
};
describe("buildContinuationPrompt", () => {
test("includes steps after the last matching role and the edge prompt", () => {
const steps: StepContext[] = [
developerStep,
reviewerStep,
{
role: "planner",
output: { plan: "revise approach" },
detail: "7BQST3VW9F2MA",
agent: "uwf-hermes",
},
];
const result = buildContinuationPrompt(
steps,
"developer",
"The reviewer rejected your implementation. Read their feedback and fix the issues.",
);
expect(result).toContain("## What Happened Since Your Last Turn");
expect(result).toContain("### Step 2: reviewer");
expect(result).toContain("Missing tests");
expect(result).toContain("### Step 3: planner");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("The reviewer rejected your implementation.");
expect(result).not.toContain("Initial fix");
});
test("uses all steps when the role has not run before", () => {
const result = buildContinuationPrompt(
[developerStep, reviewerStep],
"planner",
"Continue from the reviewer feedback.",
);
expect(result).toContain("### Step 1: developer");
expect(result).toContain("### Step 2: reviewer");
expect(result).toContain("Continue from the reviewer feedback.");
});
test("still includes moderator instruction when there are no intervening steps", () => {
const result = buildContinuationPrompt(
[developerStep],
"developer",
"Please revise your work.",
);
expect(result).not.toContain("## What Happened Since Your Last Turn");
expect(result).toContain("## Moderator Instruction");
expect(result).toContain("Please revise your work.");
});
});
@@ -0,0 +1,53 @@
import type { StepContext } from "@uncaged/workflow-protocol";
function formatStep(step: StepContext, stepNumber: number): string {
return [
`### Step ${stepNumber}: ${step.role}`,
`Output: ${JSON.stringify(step.output)}`,
`Agent: ${step.agent}`,
].join("\n");
}
function findLastRoleIndex(steps: StepContext[], role: string): number {
for (let i = steps.length - 1; i >= 0; i--) {
const step = steps[i];
if (step !== undefined && step.role === role) {
return i;
}
}
return -1;
}
/**
* Build a continuation prompt for a role re-entry.
*
* Finds the most recent step for `role`, collects everything after it as context,
* and appends the moderator edge prompt as the instruction.
*/
export function buildContinuationPrompt(
steps: StepContext[],
role: string,
edgePrompt: string,
): string {
const lastIndex = findLastRoleIndex(steps, role);
const sinceSteps = lastIndex >= 0 ? steps.slice(lastIndex + 1) : steps;
const parts: string[] = [];
if (sinceSteps.length > 0) {
parts.push("## What Happened Since Your Last Turn");
const baseStepNumber = lastIndex >= 0 ? lastIndex + 2 : 1;
for (let i = 0; i < sinceSteps.length; i++) {
const step = sinceSteps[i];
if (step === undefined) {
continue;
}
parts.push("");
parts.push(formatStep(step, baseStepNumber + i));
}
parts.push("");
}
parts.push("## Moderator Instruction", "", edgePrompt);
return parts.join("\n");
}
@@ -21,6 +21,14 @@ function fail(message: string): never {
throw new Error(message);
}
function readEdgePrompt(): string {
const value = process.env.UWF_EDGE_PROMPT;
if (value === undefined || value === "") {
fail("UWF_EDGE_PROMPT environment variable is required");
}
return value;
}
function walkChain(store: Store, schemas: AgentStore["schemas"], headHash: CasRef): ChainState {
const headNode = store.get(headHash);
if (headNode === null) {
@@ -133,6 +141,8 @@ export async function buildContext(threadId: ThreadId, role: string): Promise<Ag
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
threadId,
@@ -142,6 +152,8 @@ export async function buildContext(threadId: ThreadId, role: string): Promise<Ag
workflow,
store,
outputFormatInstruction: "",
edgePrompt,
isFirstVisit,
};
}
@@ -178,6 +190,8 @@ export async function buildContextWithMeta(
}
const steps = await buildHistory(store, chain.stepsNewestFirst);
const edgePrompt = readEdgePrompt();
const isFirstVisit = !steps.some((s) => s.role === role);
return {
threadId,
@@ -187,6 +201,8 @@ export async function buildContextWithMeta(
workflow,
store,
outputFormatInstruction: "",
edgePrompt,
isFirstVisit,
meta: { storageRoot, store, schemas, headHash, chain },
};
}
+2 -1
View File
@@ -1,3 +1,4 @@
export { buildContinuationPrompt } from "./build-continuation-prompt.js";
export { buildOutputFormatInstruction } from "./build-output-format-instruction.js";
export { buildRolePrompt } from "./build-role-prompt.js";
export type { BuildContextMeta } from "./context.js";
@@ -11,7 +12,7 @@ export {
export type { FrontmatterFastPathResult } from "./frontmatter.js";
export { tryFrontmatterFastPath } from "./frontmatter.js";
export { createAgent } from "./run.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig } from "./storage.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type {
AgentContext,
AgentContinueFn,
+9
View File
@@ -12,6 +12,15 @@ export type AgentContext = ModeratorContext & {
* role's output schema. Populated by `createAgent` at run time.
*/
outputFormatInstruction: string;
/**
* Edge prompt from the graph transition that led to this role (UWF_EDGE_PROMPT).
* Always the real moderator instruction for this step.
*/
edgePrompt: string;
/**
* True when the current role has not appeared in steps history before this invocation.
*/
isFirstVisit: boolean;
};
export type AgentRunResult = {
+11 -2
View File
@@ -77,9 +77,11 @@ function stepsToPayload(name: string, description: string, steps: WorkFlowSteps)
};
}
}
const targetRole = t.target === "END" ? "$END" : t.target;
return {
role: t.target === "END" ? "$END" : t.target,
role: targetRole,
condition: condName,
prompt: `Transition to ${targetRole}.`,
};
});
@@ -87,7 +89,14 @@ function stepsToPayload(name: string, description: string, steps: WorkFlowSteps)
}
if (steps.length > 0) {
graph["$START"] = [{ role: steps[0].role.name, condition: null }];
const firstRole = steps[0].role.name;
graph["$START"] = [
{
role: firstRole,
condition: null,
prompt: `Begin workflow at role ${firstRole}.`,
},
];
}
return { name, description, roles, conditions, graph };
@@ -9,27 +9,27 @@ const solveIssueWorkflow: WorkflowPayload = {
roles: {
planner: {
description: "Creates implementation plan",
identity: "You are a planning agent.",
prepare: "Review the issue context.",
execute: "Create a step-by-step plan.",
report: "Output the plan and steps.",
outputSchema: "5GWKR8TN1V3JA",
goal: "You are a planning agent.",
capabilities: ["planning"],
procedure: "Create a step-by-step plan.",
output: "Output the plan and steps.",
frontmatter: "5GWKR8TN1V3JA",
},
developer: {
description: "Implements code changes",
identity: "You are a developer agent.",
prepare: "Load coding tools.",
execute: "Implement the plan.",
report: "List files changed and summary.",
outputSchema: "8CNWT4KR6D1HV",
goal: "You are a developer agent.",
capabilities: ["coding"],
procedure: "Implement the plan.",
output: "List files changed and summary.",
frontmatter: "8CNWT4KR6D1HV",
},
reviewer: {
description: "Reviews code changes",
identity: "You are a code reviewer.",
prepare: "Review project conventions.",
execute: "Review the implementation.",
report: "Approve or reject with comments.",
outputSchema: "1VPBG9SM5E7WK",
goal: "You are a code reviewer.",
capabilities: ["code-review"],
procedure: "Review the implementation.",
output: "Approve or reject with comments.",
frontmatter: "1VPBG9SM5E7WK",
},
},
conditions: {
@@ -43,15 +43,35 @@ const solveIssueWorkflow: WorkflowPayload = {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
planner: [
{ role: "developer", condition: "needsClarification" },
{ role: "$END", condition: null },
$START: [
{
role: "planner",
condition: null,
prompt: "Start planning from the issue in the task.",
},
],
planner: [
{
role: "developer",
condition: "needsClarification",
prompt: "Clarification is needed; hand off to developer.",
},
{ role: "$END", condition: null, prompt: "Planning complete; end workflow." },
],
developer: [
{
role: "reviewer",
condition: null,
prompt: "Implementation done; send to reviewer.",
},
],
developer: [{ role: "reviewer", condition: null }],
reviewer: [
{ role: "developer", condition: "rejected" },
{ role: "$END", condition: null },
{
role: "developer",
condition: "rejected",
prompt: "Reviewer rejected; return to developer.",
},
{ role: "$END", condition: null, prompt: "Review passed; end workflow." },
],
},
};
@@ -69,7 +89,10 @@ function makeContext(steps: ModeratorContext["steps"]): ModeratorContext {
describe("evaluate", () => {
test("$START → first role (fallback)", async () => {
const result = await evaluate(solveIssueWorkflow, makeContext([]));
expect(result).toEqual({ ok: true, value: "planner" });
expect(result).toEqual({
ok: true,
value: { role: "planner", prompt: "Start planning from the issue in the task." },
});
});
test("condition match (rejected → developer)", async () => {
@@ -82,7 +105,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "Reviewer rejected; return to developer." },
});
});
test("fallback when condition does not match → $END", async () => {
@@ -95,7 +121,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "Review passed; end workflow." },
});
});
test("missing role in graph → error", async () => {
@@ -124,7 +153,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(solveIssueWorkflow, context);
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "Clarification is needed; hand off to developer." },
});
});
test("$last returns most recent matching role's frontmatter", async () => {
@@ -137,10 +169,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "developer", condition: null }],
$START: [
{
role: "developer",
condition: null,
prompt: "Begin development.",
},
],
developer: [
{ role: "$END", condition: "devFailed" },
{ role: "reviewer", condition: null },
{ role: "$END", condition: "devFailed", prompt: "Development failed; end." },
{
role: "reviewer",
condition: null,
prompt: "Development succeeded; review.",
},
],
},
};
@@ -165,7 +207,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(workflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "Development failed; end." },
});
});
test("$first returns earliest matching role's frontmatter", async () => {
@@ -178,10 +223,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
$START: [
{
role: "planner",
condition: null,
prompt: "Begin planning.",
},
],
planner: [
{ role: "$END", condition: "firstPlanReady" },
{ role: "developer", condition: null },
{ role: "$END", condition: "firstPlanReady", prompt: "First plan was ready; end." },
{
role: "developer",
condition: null,
prompt: "Plan not ready on first pass; implement.",
},
],
},
};
@@ -206,7 +261,10 @@ describe("evaluate", () => {
},
]);
const result = await evaluate(workflow, context);
expect(result).toEqual({ ok: true, value: "$END" });
expect(result).toEqual({
ok: true,
value: { role: "$END", prompt: "First plan was ready; end." },
});
});
test("$last returns undefined for unmatched role", async () => {
@@ -219,10 +277,20 @@ describe("evaluate", () => {
},
},
graph: {
$START: [{ role: "planner", condition: null }],
$START: [
{
role: "planner",
condition: null,
prompt: "Begin planning.",
},
],
planner: [
{ role: "$END", condition: "hasReviewer" },
{ role: "developer", condition: null },
{ role: "$END", condition: "hasReviewer", prompt: "Reviewer already ran; end." },
{
role: "developer",
condition: null,
prompt: "No reviewer yet; implement.",
},
],
},
};
@@ -236,6 +304,9 @@ describe("evaluate", () => {
]);
const result = await evaluate(workflow, context);
// no reviewer step → $exists returns false → fallback to developer
expect(result).toEqual({ ok: true, value: "developer" });
expect(result).toEqual({
ok: true,
value: { role: "developer", prompt: "No reviewer yet; implement." },
});
});
});
+4 -4
View File
@@ -1,7 +1,7 @@
import type { ModeratorContext, WorkflowPayload } from "@uncaged/workflow-protocol";
import jsonata from "jsonata";
import type { Result } from "./types.js";
import type { EvaluateResult, Result } from "./types.js";
const START_ROLE = "$START";
@@ -78,7 +78,7 @@ function currentRole(context: ModeratorContext): string {
export async function evaluate(
workflow: WorkflowPayload,
context: ModeratorContext,
): Promise<Result<string, Error>> {
): Promise<Result<EvaluateResult, Error>> {
const role = currentRole(context);
const transitions = workflow.graph[role];
if (transitions === undefined) {
@@ -90,7 +90,7 @@ export async function evaluate(
for (const transition of transitions) {
if (transition.condition === null) {
return { ok: true, value: transition.role };
return { ok: true, value: { role: transition.role, prompt: transition.prompt } };
}
const conditionDef = workflow.conditions[transition.condition];
@@ -106,7 +106,7 @@ export async function evaluate(
return evalResult;
}
if (isTruthy(evalResult.value)) {
return { ok: true, value: transition.role };
return { ok: true, value: { role: transition.role, prompt: transition.prompt } };
}
}
+1
View File
@@ -1 +1,2 @@
export { evaluate } from "./evaluate.js";
export type { EvaluateResult } from "./types.js";
+6
View File
@@ -1 +1,7 @@
export type Result<T, E> = { ok: true; value: T } | { ok: false; error: E };
/** The result of moderator evaluation — which role to go to, and the edge prompt. */
export type EvaluateResult = {
role: string;
prompt: string;
};
+2 -1
View File
@@ -26,10 +26,11 @@ const CONDITION_DEFINITION: JSONSchema = {
const TRANSITION: JSONSchema = {
type: "object",
required: ["role", "condition"],
required: ["role", "condition", "prompt"],
properties: {
role: { type: "string" },
condition: { anyOf: [{ type: "string" }, { type: "null" }] },
prompt: { type: "string" },
},
additionalProperties: false,
};
+1
View File
@@ -28,6 +28,7 @@ export type RoleDefinition = {
export type Transition = {
role: string;
condition: string | null;
prompt: string;
};
export type ConditionDefinition = {
@@ -0,0 +1,81 @@
import { mkdirSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, test } from "bun:test";
import { createProcessLogger } from "../src/process-logger/index.js";
function logDateKey(date: Date): string {
return date.toISOString().slice(0, 10);
}
describe("createProcessLogger", () => {
let tmpDir: string;
afterEach(() => {
if (tmpDir !== undefined) {
rmSync(tmpDir, { recursive: true, force: true });
}
});
test("writes init and log lines to dated JSONL under storage root", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
const plog = createProcessLogger({
storageRoot: tmpDir,
context: { thread: "THREAD01", workflow: "WORKFLOW01" },
});
expect(plog.pid).toMatch(/^\d+-\d+$/);
plog.log("7NQW4HBT", "moderator selected role=planner", null);
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
const lines = readFileSync(logPath, "utf8")
.trim()
.split("\n")
.map((line) => JSON.parse(line) as Record<string, string>);
expect(lines).toHaveLength(2);
expect(lines[0]?.tag).toBe("W9F3RK2M");
expect(lines[0]?.pid).toBe(plog.pid);
expect(lines[0]?.thread).toBe("THREAD01");
expect(lines[0]?.workflow).toBe("WORKFLOW01");
expect(lines[0]?.msg).toContain("process start");
expect(lines[0]?.msg).toContain("node=");
expect(lines[1]?.tag).toBe("7NQW4HBT");
expect(lines[1]?.msg).toBe("moderator selected role=planner");
expect(lines[1]?.thread).toBe("THREAD01");
expect(lines[1]?.workflow).toBe("WORKFLOW01");
});
test("creates logs directory when missing", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
createProcessLogger({
storageRoot: tmpDir,
context: { thread: null, workflow: null },
});
mkdirSync(join(tmpDir, "logs"), { recursive: true });
expect(() =>
readFileSync(join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`), "utf8"),
).not.toThrow();
});
test("merges per-call context into the JSONL entry", () => {
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
const plog = createProcessLogger({
storageRoot: tmpDir,
context: { thread: "T1", workflow: null },
});
plog.log("M3K8V9T1", "spawn agent", { command: "uwf-hermes", args: "tid role" });
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
const lines = readFileSync(logPath, "utf8")
.trim()
.split("\n")
.map((line) => JSON.parse(line) as Record<string, string>);
const last = lines[lines.length - 1];
expect(last?.command).toBe("uwf-hermes");
expect(last?.args).toBe("tid role");
});
});
+14 -1
View File
@@ -26,6 +26,7 @@ uwf workflow list # list all registered workflows
uwf thread start <workflow> -p <prompt> # create a thread (no execution)
uwf thread step <thread-id> # execute one moderator→agent→extract cycle
[--agent <cmd>] # override agent command
[-c, --count <number>] # run multiple steps (default: 1)
uwf thread show <thread-id> # show thread head pointer
uwf thread list # list active threads
[--all] # include archived threads
@@ -56,6 +57,17 @@ uwf cas schema list # list all registered schemas
uwf cas schema get <hash> # show a schema by its type hash
\`\`\`
## Log Commands
\`\`\`
uwf log list # list log files with sizes
uwf log show # show all log entries
[--thread <thread-id>] # filter by thread ID
[--process <pid>] # filter by process ID
[--date <YYYY-MM-DD>] # filter by date
uwf log clean --before <date> # delete log files before given date
\`\`\`
## Global Options
\`\`\`
@@ -69,6 +81,7 @@ uwf -V, --version # print version
- **Thread**: A single workflow execution (ULID). State is an immutable CAS chain; active threads are indexed in \`threads.yaml\`.
- **Step**: One moderator→agent→extract cycle. Run \`uwf thread step\` repeatedly until \`$END\`.
- **CAS**: Content-Addressed Storage — all nodes are immutable and identified by hash.
- **Role**: Named actor with goal, capabilities, procedure, output, and meta; the moderator routes between roles.
- **Role**: Named actor with goal, capabilities, procedure, output, and frontmatter schema; the moderator routes between roles.
- **Edge Prompt**: Required instruction on each graph edge — the moderator's dispatch message to the agent.
`;
}
+7
View File
@@ -13,6 +13,13 @@ export {
validateFrontmatter,
} from "./frontmatter-markdown/index.js";
export { createLogger } from "./logger.js";
export { createProcessLogger } from "./process-logger/index.js";
export type {
CreateProcessLoggerOptions,
ProcessLogFn,
ProcessLogger,
ProcessLoggerContext,
} from "./process-logger/index.js";
export { normalizeRefsField } from "./refs-field.js";
export { err, ok } from "./result.js";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
+1 -21
View File
@@ -1,28 +1,8 @@
import { appendFileSync } from "node:fs";
import { CROCKFORD_BASE32_ALPHABET } from "./base32.js";
import { assertValidLogTag } from "./process-logger/log-tag.js";
import type { CreateLoggerOptions, LogFn } from "./types.js";
const TAG_LENGTH = 8;
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
function assertValidLogTag(tag: string): void {
if (tag.length !== TAG_LENGTH) {
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
}
for (let i = 0; i < tag.length; i++) {
const ch = tag[i];
if (ch === undefined) {
throw new Error("log tag validation failed");
}
const upper = ch.toUpperCase();
if (!TAG_CHAR_SET.has(upper)) {
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
}
}
}
/** Append one JSONL log record: `{ tag, content, timestamp }` per RFC-001. */
export function createLogger(options: CreateLoggerOptions): LogFn {
if (options.sink.kind === "stderr") {
@@ -0,0 +1,7 @@
export { createProcessLogger } from "./process-logger.js";
export type {
CreateProcessLoggerOptions,
ProcessLogFn,
ProcessLogger,
ProcessLoggerContext,
} from "./types.js";
@@ -0,0 +1,21 @@
import { CROCKFORD_BASE32_ALPHABET } from "../base32.js";
const TAG_LENGTH = 8;
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
export function assertValidLogTag(tag: string): void {
if (tag.length !== TAG_LENGTH) {
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
}
for (let i = 0; i < tag.length; i++) {
const ch = tag[i];
if (ch === undefined) {
throw new Error("log tag validation failed");
}
const upper = ch.toUpperCase();
if (!TAG_CHAR_SET.has(upper)) {
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
}
}
}
@@ -0,0 +1,78 @@
import { appendFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
import { getDefaultWorkflowStorageRoot } from "../storage-root.js";
import { assertValidLogTag } from "./log-tag.js";
import type { CreateProcessLoggerOptions, ProcessLogger, ProcessLoggerContext } from "./types.js";
const INIT_TAG = "W9F3RK2M";
function logDateKey(date: Date): string {
return date.toISOString().slice(0, 10);
}
function getProcessLogsDir(storageRoot: string): string {
return join(storageRoot, "logs");
}
function getProcessLogFilePath(storageRoot: string, date: Date): string {
return join(getProcessLogsDir(storageRoot), `${logDateKey(date)}.jsonl`);
}
function buildEntry(
processId: string,
tag: string,
msg: string,
baseContext: ProcessLoggerContext,
extra: Record<string, string> | null,
): Record<string, string> {
const entry: Record<string, string> = {
ts: new Date().toISOString(),
pid: processId,
tag: tag.toUpperCase(),
msg,
};
if (baseContext.thread !== null) {
entry.thread = baseContext.thread;
}
if (baseContext.workflow !== null) {
entry.workflow = baseContext.workflow;
}
if (extra !== null) {
for (const [key, value] of Object.entries(extra)) {
entry[key] = value;
}
}
return entry;
}
function appendEntry(filePath: string, entry: Record<string, string>): void {
appendFileSync(filePath, `${JSON.stringify(entry)}\n`, "utf8");
}
/** Process-scoped debug logger — append-only JSONL under `<storageRoot>/logs/YYYY-MM-DD.jsonl`. */
export function createProcessLogger(options: CreateProcessLoggerOptions): ProcessLogger {
const storageRoot = options.storageRoot ?? getDefaultWorkflowStorageRoot();
const processId = `${Date.now()}-${process.pid}`;
const baseContext = options.context;
const logFilePath = getProcessLogFilePath(storageRoot, new Date());
mkdirSync(getProcessLogsDir(storageRoot), { recursive: true });
const log: ProcessLogger["log"] = (tag, msg, context = null) => {
assertValidLogTag(tag);
appendEntry(logFilePath, buildEntry(processId, tag, msg, baseContext, context));
};
const argvSummary = JSON.stringify(process.argv);
const initParts = [`argv=${argvSummary}`, `node=${process.version}`];
if (baseContext.thread !== null) {
initParts.push(`thread=${baseContext.thread}`);
}
if (baseContext.workflow !== null) {
initParts.push(`workflow=${baseContext.workflow}`);
}
log(INIT_TAG, `process start ${initParts.join(" ")}`, null);
return { pid: processId, log };
}
@@ -0,0 +1,20 @@
export type ProcessLoggerContext = {
thread: string | null;
workflow: string | null;
};
export type CreateProcessLoggerOptions = {
storageRoot: string | null;
context: ProcessLoggerContext;
};
export type ProcessLogFn = (
tag: string,
msg: string,
context: Record<string, string> | null,
) => void;
export type ProcessLogger = {
pid: string;
log: ProcessLogFn;
};