feat(protocol): add step-level timing (startedAtMs / completedAtMs) (#489)

BREAKING CHANGE: StepRecord now requires startedAtMs and completedAtMs fields.
StepEntry now requires durationMs field. Old CAS data without these fields is invalid.

- Add startedAtMs/completedAtMs to StepRecord and StepNodePayload
- Add durationMs to StepEntry (computed: completedAtMs - startedAtMs)
- Update STEP_NODE_SCHEMA to require timing fields as integers
- Record Date.now() before/after agent execution in createAgent
- Show duration in thread read headers (formatStepHeader)
- Update existing test fixtures with timing fields
This commit is contained in:
2026-05-25 08:01:50 +00:00
parent 3fca67e443
commit 45f479e60f
10 changed files with 465 additions and 5 deletions
@@ -144,6 +144,8 @@ describe("step read", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step with large quota
@@ -227,6 +229,8 @@ describe("step read", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step with limited quota (700 chars)
@@ -304,6 +308,8 @@ describe("step read", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step with minimal quota (1 char)
@@ -357,6 +363,8 @@ describe("step read", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step - should return metadata only (no error)
@@ -431,6 +439,8 @@ describe("step read", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step - should return metadata only (no error)
@@ -505,6 +515,8 @@ describe("step read", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
// Read step
@@ -0,0 +1,378 @@
import { mkdir, mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { bootstrap, putSchema } from "@uncaged/json-cas";
import { createFsStore } from "@uncaged/json-cas-fs";
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
import { STEP_NODE_SCHEMA } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { cmdStepList } from "../commands/step.js";
import { cmdThreadRead } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js";
// ── schemas ──────────────────────────────────────────────────────────────────
const TURN_SCHEMA = {
title: "hermes-turn",
type: "object" as const,
required: ["index", "role", "content"],
properties: {
index: { type: "integer" as const },
role: { type: "string" as const },
content: { type: "string" as const },
toolCalls: {
anyOf: [
{ type: "array" as const, items: { type: "object" as const } },
{ type: "null" as const },
],
},
reasoning: { anyOf: [{ type: "string" as const }, { type: "null" as const }] },
},
additionalProperties: false,
};
const DETAIL_SCHEMA = {
title: "hermes-detail",
type: "object" as const,
required: ["sessionId", "model", "duration", "turnCount", "turns"],
properties: {
sessionId: { type: "string" as const },
model: { type: "string" as const },
duration: { type: "integer" as const },
turnCount: { type: "integer" as const },
turns: {
type: "array" as const,
items: { type: "string" as const, format: "cas_ref" },
},
},
additionalProperties: false,
};
// ── helpers ──────────────────────────────────────────────────────────────────
async function registerDetailSchemas(store: ReturnType<typeof createFsStore>) {
await bootstrap(store);
const [turn, detail] = await Promise.all([
putSchema(store, TURN_SCHEMA),
putSchema(store, DETAIL_SCHEMA),
]);
return { turn, detail };
}
// ── fixture ──────────────────────────────────────────────────────────────────
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-step-timing-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
// ── 1. Protocol types (compile-time) ─────────────────────────────────────────
describe("protocol types", () => {
test("StepRecord has startedAtMs and completedAtMs as required fields", () => {
// Type-level test: this block compiles only if fields exist and are number
const record: import("@uncaged/workflow-protocol").StepRecord = {
role: "test",
output: "hash1" as CasRef,
detail: "hash2" as CasRef,
agent: "uwf-test",
edgePrompt: "",
startedAtMs: 1000,
completedAtMs: 2000,
};
expect(record.startedAtMs).toBe(1000);
expect(record.completedAtMs).toBe(2000);
});
test("StepEntry has durationMs as required field", () => {
const entry: import("@uncaged/workflow-protocol").StepEntry = {
hash: "hash" as CasRef,
role: "test",
output: {},
detail: "hash2" as CasRef,
agent: "uwf-test",
timestamp: 123,
durationMs: 5000,
};
expect(entry.durationMs).toBe(5000);
});
});
// ── 2. JSON Schema ───────────────────────────────────────────────────────────
describe("StepNode JSON schema", () => {
test("schema requires startedAtMs and completedAtMs", () => {
const required = STEP_NODE_SCHEMA.required as string[];
expect(required).toContain("startedAtMs");
expect(required).toContain("completedAtMs");
});
test("schema defines timing fields as integer", () => {
const props = STEP_NODE_SCHEMA.properties as Record<string, { type: string }>;
expect(props.startedAtMs.type).toBe("integer");
expect(props.completedAtMs.type).toBe("integer");
});
test("StepNode with timing fields passes CAS validation", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const startHash = await store.put(schemas.startNode, {
workflow: "placeholder0000" as CasRef,
prompt: "test",
});
const outputHash = await store.put(schemas.text, "output text");
const detailSchemas = await registerDetailSchemas(store);
const detailHash = await store.put(detailSchemas.detail, {
sessionId: "s1",
model: "m1",
duration: 100,
turnCount: 0,
turns: [],
});
// Should succeed — valid timing fields
const hash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
edgePrompt: "",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
expect(hash).toBeTruthy();
});
});
// ── 3. step list — durationMs computed ───────────────────────────────────────
describe("step list timing", () => {
test("step list includes durationMs = completedAtMs - startedAtMs", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const detailSchemas = await registerDetailSchemas(store);
const workflowHash = await store.put(schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {},
graph: {},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "test",
});
const outputHash = await store.put(schemas.text, "output");
const detailHash = await store.put(detailSchemas.detail, {
sessionId: "s1",
model: "m1",
duration: 100,
turnCount: 0,
turns: [],
});
const startedAt = 1716600000000;
const completedAt = 1716600003500;
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
edgePrompt: "",
startedAtMs: startedAt,
completedAtMs: completedAt,
});
const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const result = await cmdStepList(tmpDir, threadId);
const stepEntries = result.steps.slice(1); // skip start entry
expect(stepEntries).toHaveLength(1);
const step = stepEntries[0] as import("@uncaged/workflow-protocol").StepEntry;
expect(step.durationMs).toBe(3500);
});
});
// ── 4. thread read — duration in header ──────────────────────────────────────
describe("thread read timing", () => {
test("thread read header includes Duration", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const detailSchemas = await registerDetailSchemas(store);
const workflowHash = await store.put(schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
worker: {
description: "Worker",
goal: "Do work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: "placeholder0000" as CasRef,
},
},
graph: {
$START: { _: { role: "worker", prompt: "go" } },
worker: { _: { role: "$END", prompt: "" } },
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "test task",
});
const turnHash = await store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Done.",
toolCalls: null,
reasoning: null,
});
const detailHash = await store.put(detailSchemas.detail, {
sessionId: "s1",
model: "m1",
duration: 100,
turnCount: 1,
turns: [turnHash],
});
const outputHash = await store.put(schemas.text, "output");
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
edgePrompt: "",
startedAtMs: 1716600000000,
completedAtMs: 1716600042000,
});
const threadId = "01HX2Q3R4S5T6V7W8X9YZ3" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false);
expect(markdown).toContain("**Duration:** 42.0s");
});
test("thread read shows sub-second duration as ms", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const detailSchemas = await registerDetailSchemas(store);
const workflowHash = await store.put(schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
worker: {
description: "Worker",
goal: "Do work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: "placeholder0000" as CasRef,
},
},
graph: {
$START: { _: { role: "worker", prompt: "go" } },
worker: { _: { role: "$END", prompt: "" } },
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "test",
});
const turnHash = await store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "Done.",
toolCalls: null,
reasoning: null,
});
const detailHash = await store.put(detailSchemas.detail, {
sessionId: "s1",
model: "m1",
duration: 100,
turnCount: 1,
turns: [turnHash],
});
const outputHash = await store.put(schemas.text, "output");
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-test",
edgePrompt: "",
startedAtMs: 1716600000000,
completedAtMs: 1716600000350,
});
const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: stepHash });
const markdown = await cmdThreadRead(tmpDir, threadId, 10000, null, false);
expect(markdown).toContain("**Duration:** 350ms");
});
});
// ── 6. Breaking change — old data without timing fails ───────────────────────
describe("breaking change", () => {
test("StepNode schema rejects payload without timing fields", () => {
const required = STEP_NODE_SCHEMA.required as string[];
// Both fields must be in the required array
expect(required).toContain("startedAtMs");
expect(required).toContain("completedAtMs");
// Payload without timing fields would fail schema validation
// because the schema marks them as required
const payloadWithoutTiming = {
start: "hash1",
prev: null,
role: "worker",
output: "hash2",
detail: "hash3",
agent: "uwf-test",
edgePrompt: "",
};
// Verify the payload is missing required fields
expect(payloadWithoutTiming).not.toHaveProperty("startedAtMs");
expect(payloadWithoutTiming).not.toHaveProperty("completedAtMs");
});
});
@@ -141,6 +141,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
steps.push(stepHash);
}
@@ -221,6 +223,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: step1DetailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const step2Content = generateContent(600, "Second");
@@ -245,6 +249,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: step2DetailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01HX2Q3R4S5T6V7W8X9YZ1" as ThreadId;
@@ -328,6 +334,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
steps.push(stepHash);
}
@@ -338,8 +346,8 @@ describe("thread read --quota flag", () => {
// Set tight quota with --start flag
const markdown = await cmdThreadRead(tmpDir, threadId, 600, null, true);
// Quota must be reasonably enforced (allow ~210 char tolerance for structure)
expect(markdown.length).toBeLessThanOrEqual(810);
// Quota must be reasonably enforced (allow ~260 char tolerance for structure)
expect(markdown.length).toBeLessThanOrEqual(860);
// Should contain thread header
expect(markdown).toMatch(/# Thread/);
@@ -405,6 +413,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01HX2Q3R4S5T6V7W8X9YZ4" as ThreadId;
@@ -480,6 +490,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
steps.push(stepHash);
}
@@ -559,6 +571,8 @@ describe("thread read --quota flag", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
steps.push(stepHash);
}
@@ -139,6 +139,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000001" as ThreadId;
@@ -214,6 +216,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-claude-code",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000002" as ThreadId;
@@ -274,6 +278,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
@@ -283,6 +289,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000003" as ThreadId;
@@ -335,6 +343,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000004" as ThreadId;
@@ -387,6 +397,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: missingDetailRef,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000005" as ThreadId;
@@ -439,6 +451,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000006" as ThreadId;
@@ -511,6 +525,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const step2 = await uwf.store.put(uwf.schemas.stepNode, {
@@ -520,6 +536,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const step3 = await uwf.store.put(uwf.schemas.stepNode, {
@@ -529,6 +547,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000007" as ThreadId;
@@ -607,6 +627,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: detailHash,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const threadId = "01JTEST0000000000000008" as ThreadId;
@@ -661,6 +683,8 @@ describe("thread read XML tag isolation", () => {
output: outputHash,
detail: null,
agent: "uwf-test",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
})) as CasRef;
steps.push(step);
prev = step;
@@ -58,6 +58,7 @@ export async function cmdStepList(
detail: item.payload.detail ?? null,
agent: item.payload.agent,
timestamp: item.timestamp,
durationMs: item.payload.completedAtMs - item.payload.startedAtMs,
});
}
+12 -1
View File
@@ -566,14 +566,25 @@ function selectByQuota(
return { selected, skippedCount: candidates.length - selected.length };
}
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
const seconds = ms / 1000;
if (seconds < 60) return `${seconds.toFixed(1)}s`;
const minutes = Math.floor(seconds / 60);
const remainingSec = Math.round(seconds % 60);
return `${minutes}m${remainingSec}s`;
}
function formatStepHeader(stepNum: number, item: OrderedStepItem): string {
const ts = new Date(item.timestamp)
.toISOString()
.replace("T", " ")
.replace(/\.\d+Z$/, "");
const durationMs = item.payload.completedAtMs - item.payload.startedAtMs;
const duration = formatDuration(durationMs);
return [
`## Step ${stepNum}: ${item.payload.role} \`${item.hash}\``,
`**Agent:** ${item.payload.agent} | **Time:** ${ts}`,
`**Agent:** ${item.payload.agent} | **Time:** ${ts} | **Duration:** ${duration}`,
].join("\n");
}