feat: add process-level debug logger (Phase 1)
- New ProcessLogger in workflow-util: process-scoped JSONL logger
- Entry schema: {ts, pid, tag, msg, thread, workflow}
- Storage: ~/.uncaged/workflow/logs/YYYY-MM-DD.jsonl
- Auto logs process init info (argv, node version, context)
- cli-workflow thread commands fully instrumented:
- thread start/step, moderator evaluate, agent spawn/done
- thread archived, error paths
Refs #411, #412, #410
This commit is contained in:
@@ -23,7 +23,7 @@ import type {
|
||||
WorkflowConfig,
|
||||
WorkflowPayload,
|
||||
} from "@uncaged/workflow-protocol";
|
||||
import { generateUlid } from "@uncaged/workflow-util";
|
||||
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
|
||||
import { config as loadDotenv } from "dotenv";
|
||||
import { parse, stringify } from "yaml";
|
||||
|
||||
@@ -47,6 +47,18 @@ import { materializeWorkflowPayload } from "./workflow.js";
|
||||
const END_ROLE = "$END";
|
||||
export const THREAD_READ_DEFAULT_QUOTA = 4000;
|
||||
|
||||
const PL_THREAD_START = "7HNQ4B2X";
|
||||
const PL_MODERATOR = "M3K8V9T1";
|
||||
const PL_AGENT_SPAWN = "R5J2W8N4";
|
||||
const PL_AGENT_DONE = "C6P9L3H7";
|
||||
const PL_THREAD_ARCHIVED = "F4D8Q2K5";
|
||||
const PL_STEP_ERROR = "B8T5N1V6";
|
||||
|
||||
function failStep(plog: ProcessLogger, message: string): never {
|
||||
plog.log(PL_STEP_ERROR, message, null);
|
||||
fail(message);
|
||||
}
|
||||
|
||||
type ChainState = {
|
||||
startHash: CasRef;
|
||||
start: StartNodePayload;
|
||||
@@ -168,6 +180,10 @@ export async function cmdThreadStart(
|
||||
const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId, projectRoot);
|
||||
|
||||
const threadId = generateUlid(Date.now()) as ThreadId;
|
||||
const plog = createProcessLogger({
|
||||
storageRoot,
|
||||
context: { thread: threadId, workflow: workflowHash },
|
||||
});
|
||||
const startPayload: StartNodePayload = {
|
||||
workflow: workflowHash,
|
||||
prompt,
|
||||
@@ -183,6 +199,12 @@ export async function cmdThreadStart(
|
||||
index[threadId] = headHash;
|
||||
await saveThreadsIndex(storageRoot, index);
|
||||
|
||||
plog.log(
|
||||
PL_THREAD_START,
|
||||
`thread created workflow=${workflowHash} thread=${threadId} head=${headHash}`,
|
||||
null,
|
||||
);
|
||||
|
||||
return { workflow: workflowHash, thread: threadId };
|
||||
}
|
||||
|
||||
@@ -625,6 +647,7 @@ function resolveAgentConfig(
|
||||
}
|
||||
|
||||
function spawnAgent(
|
||||
plog: ProcessLogger,
|
||||
agent: AgentConfig,
|
||||
threadId: ThreadId,
|
||||
role: string,
|
||||
@@ -648,12 +671,12 @@ function spawnAgent(
|
||||
? err.stderr
|
||||
: err.stderr.toString("utf8");
|
||||
const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : "";
|
||||
fail(`agent command failed (${agent.command})${detail}`);
|
||||
failStep(plog, `agent command failed (${agent.command})${detail}`);
|
||||
}
|
||||
|
||||
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
|
||||
if (!isCasRef(line)) {
|
||||
fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
|
||||
failStep(plog, `agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
|
||||
}
|
||||
return line;
|
||||
}
|
||||
@@ -685,9 +708,15 @@ export async function cmdThreadStep(
|
||||
fail(`--count must be a positive integer, got: ${count}`);
|
||||
}
|
||||
|
||||
const workflowHash = await resolveActiveThreadWorkflowHash(storageRoot, threadId);
|
||||
const plog = createProcessLogger({
|
||||
storageRoot,
|
||||
context: { thread: threadId, workflow: workflowHash },
|
||||
});
|
||||
|
||||
const results: StepOutput[] = [];
|
||||
for (let i = 0; i < count; i++) {
|
||||
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride);
|
||||
const result = await cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog);
|
||||
results.push(result);
|
||||
if (result.done) {
|
||||
break;
|
||||
@@ -696,16 +725,31 @@ export async function cmdThreadStep(
|
||||
return results;
|
||||
}
|
||||
|
||||
async function cmdThreadStepOnce(
|
||||
async function resolveActiveThreadWorkflowHash(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
agentOverride: string | null,
|
||||
): Promise<StepOutput> {
|
||||
): Promise<CasRef> {
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
const headHash = index[threadId];
|
||||
if (headHash === undefined) {
|
||||
fail(`thread not active: ${threadId}`);
|
||||
}
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const chain = walkChain(uwf, headHash);
|
||||
return chain.start.workflow;
|
||||
}
|
||||
|
||||
async function cmdThreadStepOnce(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
agentOverride: string | null,
|
||||
plog: ProcessLogger,
|
||||
): Promise<StepOutput> {
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
const headHash = index[threadId];
|
||||
if (headHash === undefined) {
|
||||
failStep(plog, `thread not active: ${threadId}`);
|
||||
}
|
||||
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const chain = walkChain(uwf, headHash);
|
||||
@@ -715,10 +759,17 @@ async function cmdThreadStepOnce(
|
||||
|
||||
const nextResult = await evaluate(workflow, context);
|
||||
if (!nextResult.ok) {
|
||||
fail(nextResult.error.message);
|
||||
failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`);
|
||||
}
|
||||
|
||||
plog.log(
|
||||
PL_MODERATOR,
|
||||
`moderator role=${nextResult.value.role} prompt=${nextResult.value.prompt}`,
|
||||
null,
|
||||
);
|
||||
|
||||
if (nextResult.value.role === END_ROLE) {
|
||||
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${headHash}`, null);
|
||||
await archiveThread(storageRoot, threadId, workflowHash, headHash);
|
||||
return {
|
||||
workflow: workflowHash,
|
||||
@@ -733,14 +784,20 @@ async function cmdThreadStepOnce(
|
||||
const config = await loadWorkflowConfig(storageRoot);
|
||||
const agent = resolveAgentConfig(config, workflow, role, agentOverride);
|
||||
|
||||
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, {
|
||||
args: [...agent.args, threadId, role].join(" "),
|
||||
});
|
||||
|
||||
loadDotenv({ path: getEnvPath(storageRoot) });
|
||||
const newHead = spawnAgent(agent, threadId, role, edgePrompt);
|
||||
const newHead = spawnAgent(plog, agent, threadId, role, edgePrompt);
|
||||
|
||||
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
|
||||
|
||||
// Re-create store to pick up nodes written by the agent subprocess
|
||||
const uwfAfter = await createUwfStore(storageRoot);
|
||||
const newNode = uwfAfter.store.get(newHead);
|
||||
if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) {
|
||||
fail(`agent returned hash that is not a StepNode: ${newHead}`);
|
||||
failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`);
|
||||
}
|
||||
|
||||
// Reload threads index to avoid overwriting changes made by the agent subprocess
|
||||
@@ -752,11 +809,12 @@ async function cmdThreadStepOnce(
|
||||
const contextAfter = buildModeratorContext(uwfAfter, chainAfter);
|
||||
const afterResult = await evaluate(workflow, contextAfter);
|
||||
if (!afterResult.ok) {
|
||||
fail(afterResult.error.message);
|
||||
failStep(plog, `post-step moderator evaluate failed: ${afterResult.error.message}`);
|
||||
}
|
||||
|
||||
const done = afterResult.value.role === END_ROLE;
|
||||
if (done) {
|
||||
plog.log(PL_THREAD_ARCHIVED, `thread archived head=${newHead}`, null);
|
||||
await archiveThread(storageRoot, threadId, workflowHash, newHead);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
import { mkdirSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
|
||||
import { createProcessLogger } from "../src/process-logger/index.js";
|
||||
|
||||
function logDateKey(date: Date): string {
|
||||
return date.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
describe("createProcessLogger", () => {
|
||||
let tmpDir: string;
|
||||
|
||||
afterEach(() => {
|
||||
if (tmpDir !== undefined) {
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("writes init and log lines to dated JSONL under storage root", () => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
|
||||
const plog = createProcessLogger({
|
||||
storageRoot: tmpDir,
|
||||
context: { thread: "THREAD01", workflow: "WORKFLOW01" },
|
||||
});
|
||||
|
||||
expect(plog.pid).toMatch(/^\d+-\d+$/);
|
||||
|
||||
plog.log("7NQW4HBT", "moderator selected role=planner", null);
|
||||
|
||||
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
|
||||
const lines = readFileSync(logPath, "utf8")
|
||||
.trim()
|
||||
.split("\n")
|
||||
.map((line) => JSON.parse(line) as Record<string, string>);
|
||||
|
||||
expect(lines).toHaveLength(2);
|
||||
expect(lines[0]?.tag).toBe("W9F3RK2M");
|
||||
expect(lines[0]?.pid).toBe(plog.pid);
|
||||
expect(lines[0]?.thread).toBe("THREAD01");
|
||||
expect(lines[0]?.workflow).toBe("WORKFLOW01");
|
||||
expect(lines[0]?.msg).toContain("process start");
|
||||
expect(lines[0]?.msg).toContain("node=");
|
||||
|
||||
expect(lines[1]?.tag).toBe("7NQW4HBT");
|
||||
expect(lines[1]?.msg).toBe("moderator selected role=planner");
|
||||
expect(lines[1]?.thread).toBe("THREAD01");
|
||||
expect(lines[1]?.workflow).toBe("WORKFLOW01");
|
||||
});
|
||||
|
||||
test("creates logs directory when missing", () => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
|
||||
createProcessLogger({
|
||||
storageRoot: tmpDir,
|
||||
context: { thread: null, workflow: null },
|
||||
});
|
||||
mkdirSync(join(tmpDir, "logs"), { recursive: true });
|
||||
expect(() =>
|
||||
readFileSync(join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`), "utf8"),
|
||||
).not.toThrow();
|
||||
});
|
||||
|
||||
test("merges per-call context into the JSONL entry", () => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "uwf-process-log-"));
|
||||
const plog = createProcessLogger({
|
||||
storageRoot: tmpDir,
|
||||
context: { thread: "T1", workflow: null },
|
||||
});
|
||||
plog.log("M3K8V9T1", "spawn agent", { command: "uwf-hermes", args: "tid role" });
|
||||
|
||||
const logPath = join(tmpDir, "logs", `${logDateKey(new Date())}.jsonl`);
|
||||
const lines = readFileSync(logPath, "utf8")
|
||||
.trim()
|
||||
.split("\n")
|
||||
.map((line) => JSON.parse(line) as Record<string, string>);
|
||||
const last = lines[lines.length - 1];
|
||||
expect(last?.command).toBe("uwf-hermes");
|
||||
expect(last?.args).toBe("tid role");
|
||||
});
|
||||
});
|
||||
@@ -13,6 +13,13 @@ export {
|
||||
validateFrontmatter,
|
||||
} from "./frontmatter-markdown/index.js";
|
||||
export { createLogger } from "./logger.js";
|
||||
export { createProcessLogger } from "./process-logger/index.js";
|
||||
export type {
|
||||
CreateProcessLoggerOptions,
|
||||
ProcessLogFn,
|
||||
ProcessLogger,
|
||||
ProcessLoggerContext,
|
||||
} from "./process-logger/index.js";
|
||||
export { normalizeRefsField } from "./refs-field.js";
|
||||
export { err, ok } from "./result.js";
|
||||
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
|
||||
|
||||
@@ -1,28 +1,8 @@
|
||||
import { appendFileSync } from "node:fs";
|
||||
|
||||
import { CROCKFORD_BASE32_ALPHABET } from "./base32.js";
|
||||
import { assertValidLogTag } from "./process-logger/log-tag.js";
|
||||
import type { CreateLoggerOptions, LogFn } from "./types.js";
|
||||
|
||||
const TAG_LENGTH = 8;
|
||||
|
||||
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
|
||||
|
||||
function assertValidLogTag(tag: string): void {
|
||||
if (tag.length !== TAG_LENGTH) {
|
||||
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
|
||||
}
|
||||
for (let i = 0; i < tag.length; i++) {
|
||||
const ch = tag[i];
|
||||
if (ch === undefined) {
|
||||
throw new Error("log tag validation failed");
|
||||
}
|
||||
const upper = ch.toUpperCase();
|
||||
if (!TAG_CHAR_SET.has(upper)) {
|
||||
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Append one JSONL log record: `{ tag, content, timestamp }` per RFC-001. */
|
||||
export function createLogger(options: CreateLoggerOptions): LogFn {
|
||||
if (options.sink.kind === "stderr") {
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
export { createProcessLogger } from "./process-logger.js";
|
||||
export type {
|
||||
CreateProcessLoggerOptions,
|
||||
ProcessLogFn,
|
||||
ProcessLogger,
|
||||
ProcessLoggerContext,
|
||||
} from "./types.js";
|
||||
@@ -0,0 +1,21 @@
|
||||
import { CROCKFORD_BASE32_ALPHABET } from "../base32.js";
|
||||
|
||||
const TAG_LENGTH = 8;
|
||||
|
||||
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
|
||||
|
||||
export function assertValidLogTag(tag: string): void {
|
||||
if (tag.length !== TAG_LENGTH) {
|
||||
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
|
||||
}
|
||||
for (let i = 0; i < tag.length; i++) {
|
||||
const ch = tag[i];
|
||||
if (ch === undefined) {
|
||||
throw new Error("log tag validation failed");
|
||||
}
|
||||
const upper = ch.toUpperCase();
|
||||
if (!TAG_CHAR_SET.has(upper)) {
|
||||
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
import { appendFileSync, mkdirSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { getDefaultWorkflowStorageRoot } from "../storage-root.js";
|
||||
import { assertValidLogTag } from "./log-tag.js";
|
||||
import type { CreateProcessLoggerOptions, ProcessLogger, ProcessLoggerContext } from "./types.js";
|
||||
|
||||
const INIT_TAG = "W9F3RK2M";
|
||||
|
||||
function logDateKey(date: Date): string {
|
||||
return date.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
function getProcessLogsDir(storageRoot: string): string {
|
||||
return join(storageRoot, "logs");
|
||||
}
|
||||
|
||||
function getProcessLogFilePath(storageRoot: string, date: Date): string {
|
||||
return join(getProcessLogsDir(storageRoot), `${logDateKey(date)}.jsonl`);
|
||||
}
|
||||
|
||||
function buildEntry(
|
||||
processId: string,
|
||||
tag: string,
|
||||
msg: string,
|
||||
baseContext: ProcessLoggerContext,
|
||||
extra: Record<string, string> | null,
|
||||
): Record<string, string> {
|
||||
const entry: Record<string, string> = {
|
||||
ts: new Date().toISOString(),
|
||||
pid: processId,
|
||||
tag: tag.toUpperCase(),
|
||||
msg,
|
||||
};
|
||||
if (baseContext.thread !== null) {
|
||||
entry.thread = baseContext.thread;
|
||||
}
|
||||
if (baseContext.workflow !== null) {
|
||||
entry.workflow = baseContext.workflow;
|
||||
}
|
||||
if (extra !== null) {
|
||||
for (const [key, value] of Object.entries(extra)) {
|
||||
entry[key] = value;
|
||||
}
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
function appendEntry(filePath: string, entry: Record<string, string>): void {
|
||||
appendFileSync(filePath, `${JSON.stringify(entry)}\n`, "utf8");
|
||||
}
|
||||
|
||||
/** Process-scoped debug logger — append-only JSONL under `<storageRoot>/logs/YYYY-MM-DD.jsonl`. */
|
||||
export function createProcessLogger(options: CreateProcessLoggerOptions): ProcessLogger {
|
||||
const storageRoot = options.storageRoot ?? getDefaultWorkflowStorageRoot();
|
||||
const processId = `${Date.now()}-${process.pid}`;
|
||||
const baseContext = options.context;
|
||||
const logFilePath = getProcessLogFilePath(storageRoot, new Date());
|
||||
|
||||
mkdirSync(getProcessLogsDir(storageRoot), { recursive: true });
|
||||
|
||||
const log: ProcessLogger["log"] = (tag, msg, context = null) => {
|
||||
assertValidLogTag(tag);
|
||||
appendEntry(logFilePath, buildEntry(processId, tag, msg, baseContext, context));
|
||||
};
|
||||
|
||||
const argvSummary = JSON.stringify(process.argv);
|
||||
const initParts = [`argv=${argvSummary}`, `node=${process.version}`];
|
||||
if (baseContext.thread !== null) {
|
||||
initParts.push(`thread=${baseContext.thread}`);
|
||||
}
|
||||
if (baseContext.workflow !== null) {
|
||||
initParts.push(`workflow=${baseContext.workflow}`);
|
||||
}
|
||||
log(INIT_TAG, `process start ${initParts.join(" ")}`, null);
|
||||
|
||||
return { pid: processId, log };
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
export type ProcessLoggerContext = {
|
||||
thread: string | null;
|
||||
workflow: string | null;
|
||||
};
|
||||
|
||||
export type CreateProcessLoggerOptions = {
|
||||
storageRoot: string | null;
|
||||
context: ProcessLoggerContext;
|
||||
};
|
||||
|
||||
export type ProcessLogFn = (
|
||||
tag: string,
|
||||
msg: string,
|
||||
context: Record<string, string> | null,
|
||||
) => void;
|
||||
|
||||
export type ProcessLogger = {
|
||||
pid: string;
|
||||
log: ProcessLogFn;
|
||||
};
|
||||
Reference in New Issue
Block a user