feat(cli): add nerve workflow thread <runId> command — closes #77
Implements the workflow thread CLI command that retrieves workflow execution context (logs, events, state) for a given run. - Add 'nerve workflow thread <runId>' subcommand - Add log-store query API in daemon - Add tests for CLI and log-store - Export new daemon types for thread data 小橘 <xiaoju@shazhou.work>
This commit is contained in:
@@ -23,7 +23,8 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "workspace:*",
|
||||
"citty": "^0.1.6"
|
||||
"citty": "^0.1.6",
|
||||
"yaml": "^2.8.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@rslib/core": "^0.21.3",
|
||||
|
||||
@@ -18,13 +18,17 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
buildInspectOutput,
|
||||
buildListOutput,
|
||||
buildThreadCommandOutput,
|
||||
DEFAULT_THREAD_BUDGET_CHARS,
|
||||
formatThreadRoundBlock,
|
||||
formatTs,
|
||||
getAllWorkflowRuns,
|
||||
partitionCommandEvent,
|
||||
parseIntArg,
|
||||
statusIcon,
|
||||
} from "../commands/workflow.js";
|
||||
import { triggerWorkflowViaDaemon } from "../daemon-client.js";
|
||||
import type { LogStore, WorkflowRun } from "../daemon-types.js";
|
||||
import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test helpers
|
||||
@@ -322,6 +326,97 @@ describe("workflow list — integration with real store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow thread — formatting helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("partitionCommandEvent", () => {
|
||||
it("splits reserved type, role, content from rest", () => {
|
||||
const p = partitionCommandEvent({
|
||||
type: "scan_done",
|
||||
role: "scanner",
|
||||
content: "ok",
|
||||
items: [1, 2],
|
||||
});
|
||||
expect(p.typeStr).toBe("scan_done");
|
||||
expect(p.roleStr).toBe("scanner");
|
||||
expect(p.contentBody).toBe("ok");
|
||||
expect(p.rest).toEqual({ items: [1, 2] });
|
||||
});
|
||||
|
||||
it("uses fallback role and stringifies non-string content", () => {
|
||||
const p = partitionCommandEvent({ type: "x", content: { n: 1 } });
|
||||
expect(p.roleStr).toBe("?");
|
||||
expect(p.contentBody).toBe('{"n":1}');
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatThreadRoundBlock", () => {
|
||||
const row: ThreadRoundRow = {
|
||||
round: 2,
|
||||
logId: 99,
|
||||
ts: new Date("2026-01-02T03:04:05.006Z").getTime(),
|
||||
event: { type: "reply", role: "bot", content: "hi", score: 0.5 },
|
||||
};
|
||||
|
||||
it("includes header, YAML frontmatter for rest, and body", () => {
|
||||
const text = formatThreadRoundBlock(row);
|
||||
expect(text).toContain("[#2 bot]");
|
||||
expect(text).toContain("type=reply");
|
||||
expect(text).toContain("---\n");
|
||||
expect(text).toContain("score: 0.5");
|
||||
expect(text).toContain("hi");
|
||||
expect(text).not.toContain("role:");
|
||||
});
|
||||
});
|
||||
|
||||
describe("buildThreadCommandOutput", () => {
|
||||
function row(n: number, content: string): ThreadRoundRow {
|
||||
return {
|
||||
round: n,
|
||||
logId: 10 + n,
|
||||
ts: 1000 + n,
|
||||
event: { type: "ev", role: "r", content, extra: n },
|
||||
};
|
||||
}
|
||||
|
||||
it("orders rounds chronologically (oldest first in output)", () => {
|
||||
const desc = [row(3, "ccc"), row(2, "bbb"), row(1, "aaa")];
|
||||
const prefix = ["HEADER\n"];
|
||||
const { lines, paginationHint } = buildThreadCommandOutput(prefix, desc, 50_000, "run-x");
|
||||
const text = lines.join("");
|
||||
const idxA = text.indexOf("\naaa\n");
|
||||
const idxB = text.indexOf("\nbbb\n");
|
||||
const idxC = text.indexOf("\nccc\n");
|
||||
expect(idxA).toBeGreaterThan(-1);
|
||||
expect(idxB).toBeGreaterThan(idxA);
|
||||
expect(idxC).toBeGreaterThan(idxB);
|
||||
expect(paginationHint).toBeNull();
|
||||
});
|
||||
|
||||
it("emits pagination hint with --before when oldest shown round is still > 1", () => {
|
||||
const desc = [row(4, "d"), row(3, "c")];
|
||||
const { paginationHint } = buildThreadCommandOutput([], desc, 50_000, "run-y");
|
||||
expect(paginationHint).toContain("--before 3");
|
||||
expect(paginationHint).toContain("run-y");
|
||||
});
|
||||
|
||||
it("respects budget and hints with non-default --budget in command", () => {
|
||||
const big = "y".repeat(500);
|
||||
const desc = [row(2, big), row(1, "a")];
|
||||
const { lines, paginationHint } = buildThreadCommandOutput([], desc, 400, "run-z");
|
||||
const text = lines.join("");
|
||||
expect(text).toContain("[#2");
|
||||
expect(text).not.toContain("[#1");
|
||||
expect(paginationHint).toContain("--before 2");
|
||||
expect(paginationHint).toContain("--budget 400");
|
||||
});
|
||||
|
||||
it("default budget constant matches workflow command default", () => {
|
||||
expect(DEFAULT_THREAD_BUDGET_CHARS).toBe(8000);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// parseIntArg
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -2,14 +2,21 @@ import { existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { defineCommand } from "citty";
|
||||
import { stringify } from "yaml";
|
||||
|
||||
import { triggerWorkflowViaDaemon } from "../daemon-client.js";
|
||||
import type { LogStore, WorkflowRun } from "../daemon-types.js";
|
||||
import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js";
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
|
||||
|
||||
export const DEFAULT_PAGE_SIZE = 20;
|
||||
|
||||
/** Default max characters for `nerve workflow thread` output (including run header). */
|
||||
export const DEFAULT_THREAD_BUDGET_CHARS = 8000;
|
||||
|
||||
/** Max role-round rows read from SQLite per invocation (DESC by round). */
|
||||
export const THREAD_ROUNDS_FETCH_LIMIT = 8192;
|
||||
|
||||
export function parseIntArg(raw: string, fallback: number): number {
|
||||
const v = Number.parseInt(raw, 10);
|
||||
return Number.isNaN(v) ? fallback : v;
|
||||
@@ -172,6 +179,123 @@ export function buildInspectOutput(
|
||||
return { header, eventLines, paginationHint };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow thread <runId> — agent-oriented role rounds
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export type PartitionedEvent = {
|
||||
typeStr: string;
|
||||
roleStr: string;
|
||||
contentBody: string;
|
||||
rest: Record<string, unknown>;
|
||||
};
|
||||
|
||||
/**
|
||||
* Split a CommandEvent: `type`, `role`, and `content` are reserved for the
|
||||
* header / body; all other fields are serialized as YAML frontmatter.
|
||||
*/
|
||||
export function partitionCommandEvent(event: Record<string, unknown>): PartitionedEvent {
|
||||
const typeStr =
|
||||
typeof event.type === "string" ? event.type : String(event.type === undefined ? "?" : event.type);
|
||||
const roleStr = typeof event.role === "string" ? event.role : "?";
|
||||
const contentRaw = event.content;
|
||||
const contentBody =
|
||||
contentRaw === undefined || contentRaw === null
|
||||
? ""
|
||||
: typeof contentRaw === "string"
|
||||
? contentRaw
|
||||
: JSON.stringify(contentRaw);
|
||||
const rest: Record<string, unknown> = {};
|
||||
for (const key of Object.keys(event)) {
|
||||
if (key === "type" || key === "role" || key === "content") continue;
|
||||
rest[key] = event[key];
|
||||
}
|
||||
return { typeStr, roleStr, contentBody, rest };
|
||||
}
|
||||
|
||||
/**
|
||||
* One role round as plain text: header line, YAML frontmatter (`rest` only), body (`content`).
|
||||
*/
|
||||
export function formatThreadRoundBlock(row: ThreadRoundRow): string {
|
||||
const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event);
|
||||
const yamlBlock =
|
||||
Object.keys(rest).length === 0 ? "{}\n" : `${stringify(rest, { lineWidth: 100 })}\n`;
|
||||
return (
|
||||
`[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` +
|
||||
`---\n` +
|
||||
yamlBlock +
|
||||
`---\n` +
|
||||
`${contentBody}\n\n`
|
||||
);
|
||||
}
|
||||
|
||||
export type ThreadCommandOutput = {
|
||||
lines: string[];
|
||||
paginationHint: string | null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Build stdout lines for `nerve workflow thread`: newest-first selection from
|
||||
* `descRows` until `budgetChars` (including `prefixLines`), then chronological order.
|
||||
*/
|
||||
export function buildThreadCommandOutput(
|
||||
prefixLines: string[],
|
||||
descRows: ThreadRoundRow[],
|
||||
budgetChars: number,
|
||||
runId: string,
|
||||
): ThreadCommandOutput {
|
||||
const prefixText = prefixLines.join("");
|
||||
let remaining = Math.max(0, budgetChars - prefixText.length);
|
||||
const picked: ThreadRoundRow[] = [];
|
||||
|
||||
const budgetFlag =
|
||||
budgetChars === DEFAULT_THREAD_BUDGET_CHARS ? "" : ` --budget ${String(budgetChars)}`;
|
||||
|
||||
for (const row of descRows) {
|
||||
const block = formatThreadRoundBlock(row);
|
||||
if (block.length <= remaining) {
|
||||
picked.push(row);
|
||||
remaining -= block.length;
|
||||
continue;
|
||||
}
|
||||
if (picked.length === 0) {
|
||||
const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event);
|
||||
const yamlBlock =
|
||||
Object.keys(rest).length === 0
|
||||
? "{}\n"
|
||||
: `${stringify(rest, { lineWidth: 100 })}\n`;
|
||||
const header =
|
||||
`[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` + `---\n` + yamlBlock + `---\n`;
|
||||
const maxBody = Math.max(0, remaining - header.length - `[truncated]\n`.length);
|
||||
const truncated =
|
||||
maxBody > 0 && contentBody.length > maxBody
|
||||
? `${contentBody.slice(0, maxBody)}\n[truncated]\n`
|
||||
: `${contentBody}\n[truncated]\n`;
|
||||
const single = header + truncated + "\n";
|
||||
const hintRound = row.round;
|
||||
return {
|
||||
lines: [...prefixLines, single],
|
||||
paginationHint:
|
||||
hintRound > 1
|
||||
? `\n⏩ Older rounds exist. Fetch with:\n nerve workflow thread ${runId} --before ${String(hintRound)}${budgetFlag}\n`
|
||||
: null,
|
||||
};
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
const blocksAsc = picked.map(formatThreadRoundBlock).reverse();
|
||||
const shownMinRound = picked.length === 0 ? null : Math.min(...picked.map((r) => r.round));
|
||||
let paginationHint: string | null = null;
|
||||
if (shownMinRound !== null && shownMinRound > 1) {
|
||||
paginationHint =
|
||||
`\n⏩ Older rounds not shown. Fetch with:\n` +
|
||||
` nerve workflow thread ${runId} --before ${String(shownMinRound)}${budgetFlag}\n`;
|
||||
}
|
||||
|
||||
return { lines: [...prefixLines, ...blocksAsc], paginationHint };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow list
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -293,6 +417,92 @@ const workflowInspectCommand = defineCommand({
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow thread <runId>
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const workflowThreadCommand = defineCommand({
|
||||
meta: {
|
||||
name: "thread",
|
||||
description: "Print role rounds for a workflow run (agent-oriented, budget-limited)",
|
||||
},
|
||||
args: {
|
||||
runId: {
|
||||
type: "positional",
|
||||
description: "The run ID to dump role rounds for",
|
||||
},
|
||||
before: {
|
||||
type: "string",
|
||||
description:
|
||||
"Exclusive upper bound on 1-based round index (use with hint from prior output to load older rounds)",
|
||||
default: "0",
|
||||
},
|
||||
budget: {
|
||||
type: "string",
|
||||
description: `Max output characters including header (default: ${String(DEFAULT_THREAD_BUDGET_CHARS)})`,
|
||||
default: String(DEFAULT_THREAD_BUDGET_CHARS),
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const store = await openStore();
|
||||
|
||||
try {
|
||||
const before = Math.max(0, parseIntArg(args.before, 0));
|
||||
const budgetChars = Math.max(1, parseIntArg(args.budget, DEFAULT_THREAD_BUDGET_CHARS));
|
||||
|
||||
const run = store.getWorkflowRun(args.runId);
|
||||
if (run === null) {
|
||||
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const totalRoleRounds = store.getThreadRoundCount(args.runId);
|
||||
if (totalRoleRounds === 0) {
|
||||
process.stdout.write(
|
||||
`🧵 Workflow thread: ${run.runId}\n` +
|
||||
` workflow: ${run.workflow}\n` +
|
||||
` status: ${run.status}\n\n` +
|
||||
`📭 No role rounds recorded for this run.\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const descRows = store.getThreadRounds(args.runId, {
|
||||
before,
|
||||
limit: THREAD_ROUNDS_FETCH_LIMIT,
|
||||
});
|
||||
|
||||
const prefixLines = [
|
||||
`🧵 Role rounds (workflow thread)\n`,
|
||||
` runId: ${run.runId}\n`,
|
||||
` workflow: ${run.workflow}\n`,
|
||||
` status: ${run.status}\n`,
|
||||
` rounds: ${String(totalRoleRounds)} role event(s) total\n\n`,
|
||||
];
|
||||
|
||||
const { lines, paginationHint } = buildThreadCommandOutput(
|
||||
prefixLines,
|
||||
descRows,
|
||||
budgetChars,
|
||||
args.runId,
|
||||
);
|
||||
|
||||
for (const line of lines) {
|
||||
process.stdout.write(line);
|
||||
}
|
||||
if (paginationHint !== null) {
|
||||
process.stdout.write(paginationHint);
|
||||
}
|
||||
|
||||
if (descRows.length === 0 && before > 0) {
|
||||
process.stdout.write(`\n📭 No rounds with index < ${String(before)}.\n`);
|
||||
}
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow trigger <name>
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -359,6 +569,7 @@ export const workflowCommand = defineCommand({
|
||||
subCommands: {
|
||||
list: workflowListCommand,
|
||||
inspect: workflowInspectCommand,
|
||||
thread: workflowThreadCommand,
|
||||
trigger: workflowTriggerCommand,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -58,6 +58,20 @@ export type ArchiveLogsResult = {
|
||||
vacuumed: boolean;
|
||||
};
|
||||
|
||||
/** One role round row — keep in sync with daemon `log-store` `ThreadRoundRow`. */
|
||||
export type ThreadRoundRow = {
|
||||
round: number;
|
||||
logId: number;
|
||||
ts: number;
|
||||
event: { type: string; [key: string]: unknown };
|
||||
};
|
||||
|
||||
/** Keep in sync with daemon `log-store` `GetThreadRoundsParams`. */
|
||||
export type GetThreadRoundsParams = {
|
||||
before: number;
|
||||
limit: number;
|
||||
};
|
||||
|
||||
/** Subset of daemon LogStore used by the CLI workflow commands. */
|
||||
export type LogStore = {
|
||||
query: (filter?: LogQuery) => LogEntry[];
|
||||
@@ -65,6 +79,8 @@ export type LogStore = {
|
||||
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
|
||||
getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[];
|
||||
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
|
||||
getThreadRoundCount: (runId: string) => number;
|
||||
getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[];
|
||||
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
@@ -11,6 +11,7 @@ export default defineConfig({
|
||||
entry: {
|
||||
index: "src/index.ts",
|
||||
"sense-worker": "src/sense-worker.ts",
|
||||
"workflow-worker": "src/workflow-worker.ts",
|
||||
},
|
||||
},
|
||||
output: {
|
||||
|
||||
@@ -91,6 +91,8 @@ function makeLogStore(
|
||||
}),
|
||||
getTriggerPayload: vi.fn((): unknown => ({ value: 42 })),
|
||||
getThreadEvents: vi.fn((): Array<{ type: string; [key: string]: unknown }> => [{ type: "thread_start", triggerPayload: {} }]),
|
||||
getThreadRoundCount: vi.fn(() => 0),
|
||||
getThreadRounds: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
|
||||
@@ -77,6 +77,8 @@ function makeLogStore() {
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
getThreadRoundCount: vi.fn(() => 0),
|
||||
getThreadRounds: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
|
||||
@@ -74,6 +74,8 @@ function makeMockLogStore() {
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
getThreadRoundCount: vi.fn(() => 0),
|
||||
getThreadRounds: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
|
||||
@@ -81,6 +81,8 @@ function makeLogStore() {
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
getThreadRoundCount: vi.fn(() => 0),
|
||||
getThreadRounds: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
|
||||
@@ -195,4 +195,65 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
|
||||
expect(result8[0].type).toBe("event_for_8");
|
||||
});
|
||||
});
|
||||
|
||||
describe("getThreadRoundCount / getThreadRounds", () => {
|
||||
it("excludes thread_start from rounds and assigns ROW_NUMBER in chronological order", () => {
|
||||
store.append({
|
||||
source: "workflow",
|
||||
type: "thread_command_event",
|
||||
refId: "run-tr",
|
||||
payload: JSON.stringify({ type: "thread_start", triggerPayload: { x: 1 } }),
|
||||
ts: 100,
|
||||
});
|
||||
store.append({
|
||||
source: "workflow",
|
||||
type: "thread_command_event",
|
||||
refId: "run-tr",
|
||||
payload: JSON.stringify({
|
||||
type: "step_a",
|
||||
role: "alpha",
|
||||
content: "hello",
|
||||
meta: 1,
|
||||
}),
|
||||
ts: 101,
|
||||
});
|
||||
store.append({
|
||||
source: "workflow",
|
||||
type: "thread_command_event",
|
||||
refId: "run-tr",
|
||||
payload: JSON.stringify({ type: "step_b", role: "beta", content: "world" }),
|
||||
ts: 102,
|
||||
});
|
||||
|
||||
expect(store.getThreadRoundCount("run-tr")).toBe(2);
|
||||
|
||||
const all = store.getThreadRounds("run-tr", { before: 0, limit: 50 });
|
||||
expect(all).toHaveLength(2);
|
||||
expect(all.map((r) => r.round)).toEqual([2, 1]);
|
||||
expect(all[0].event.type).toBe("step_b");
|
||||
expect(all[1].event.type).toBe("step_a");
|
||||
});
|
||||
|
||||
it("getThreadRounds respects exclusive before bound", () => {
|
||||
for (let i = 0; i < 3; i++) {
|
||||
store.append({
|
||||
source: "workflow",
|
||||
type: "thread_command_event",
|
||||
refId: "run-b4",
|
||||
payload: JSON.stringify({ type: `ev_${i}`, role: "r", content: String(i) }),
|
||||
ts: 200 + i,
|
||||
});
|
||||
}
|
||||
|
||||
expect(store.getThreadRoundCount("run-b4")).toBe(3);
|
||||
|
||||
const page = store.getThreadRounds("run-b4", { before: 3, limit: 50 });
|
||||
expect(page.map((r) => r.round)).toEqual([2, 1]);
|
||||
});
|
||||
|
||||
it("returns empty when no role rounds for runId", () => {
|
||||
expect(store.getThreadRoundCount("missing")).toBe(0);
|
||||
expect(store.getThreadRounds("missing", { before: 0, limit: 10 })).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -74,6 +74,8 @@ function makeLogStore() {
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
getThreadRoundCount: vi.fn(() => 0),
|
||||
getThreadRounds: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
|
||||
@@ -47,6 +47,8 @@ export type {
|
||||
ArchiveLogsDayResult,
|
||||
ArchiveLogsOptions,
|
||||
ArchiveLogsResult,
|
||||
ThreadRoundRow,
|
||||
GetThreadRoundsParams,
|
||||
} from "./log-store.js";
|
||||
|
||||
export { createWorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
@@ -83,6 +83,25 @@ export type WorkflowRun = {
|
||||
ts: number;
|
||||
};
|
||||
|
||||
/** One role-produced command-event row with 1-based round index (ROW_NUMBER over role events only). */
|
||||
export type ThreadRoundRow = {
|
||||
round: number;
|
||||
logId: number;
|
||||
ts: number;
|
||||
event: { type: string; [key: string]: unknown };
|
||||
};
|
||||
|
||||
/** Parameters for {@link LogStore.getThreadRounds}. */
|
||||
export type GetThreadRoundsParams = {
|
||||
/**
|
||||
* Exclusive upper bound on round index (1-based among role events).
|
||||
* Use `0` to include all rounds (subject to `limit`).
|
||||
*/
|
||||
before: number;
|
||||
/** Maximum rows returned from the DB (DESC by round). */
|
||||
limit: number;
|
||||
};
|
||||
|
||||
export type LogStore = {
|
||||
append: (entry: Omit<LogEntry, "id">) => LogEntry;
|
||||
query: (filter?: LogQuery) => LogEntry[];
|
||||
@@ -120,6 +139,17 @@ export type LogStore = {
|
||||
* Used for crash recovery to rebuild ThreadState.
|
||||
*/
|
||||
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
|
||||
/**
|
||||
* Count role command events for a run (excludes `thread_start` and invalid payloads).
|
||||
* Round indices for {@link getThreadRounds} are 1..count in chronological order.
|
||||
*/
|
||||
getThreadRoundCount: (runId: string) => number;
|
||||
/**
|
||||
* Role rounds for agent-oriented retrieval: each row is one `thread_command_event`
|
||||
* whose JSON `type` is not `thread_start`, with `round` from ROW_NUMBER() OVER (ORDER BY id ASC).
|
||||
* No schema migration — numbering is computed in SQL.
|
||||
*/
|
||||
getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[];
|
||||
/**
|
||||
* Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`,
|
||||
* then delete those rows and advance `meta.archived_up_to` in one transaction per day
|
||||
@@ -279,6 +309,28 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC",
|
||||
);
|
||||
|
||||
const getThreadRoundCountStmt = sqlite.prepare(
|
||||
`SELECT COUNT(*) AS c FROM logs
|
||||
WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ?
|
||||
AND payload IS NOT NULL AND json_valid(payload) = 1
|
||||
AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'`,
|
||||
);
|
||||
|
||||
const getThreadRoundsStmt = sqlite.prepare(
|
||||
`WITH numbered AS (
|
||||
SELECT id, ts, payload,
|
||||
ROW_NUMBER() OVER (ORDER BY id ASC) AS rn
|
||||
FROM logs
|
||||
WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = @runId
|
||||
AND payload IS NOT NULL AND json_valid(payload) = 1
|
||||
AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'
|
||||
)
|
||||
SELECT id, ts, payload, rn FROM numbered
|
||||
WHERE (@before = 0 OR rn < @before)
|
||||
ORDER BY rn DESC
|
||||
LIMIT @lim`,
|
||||
);
|
||||
|
||||
const getActiveWorkflowRunsStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC",
|
||||
);
|
||||
@@ -475,6 +527,48 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
return result;
|
||||
}
|
||||
|
||||
function getThreadRoundCount(runId: string): number {
|
||||
const row = getThreadRoundCountStmt.get(runId) as { c: number } | undefined;
|
||||
const c = row?.c;
|
||||
if (c === null || c === undefined) return 0;
|
||||
return Number(c);
|
||||
}
|
||||
|
||||
function getThreadRounds(runId: string, params: GetThreadRoundsParams): ThreadRoundRow[] {
|
||||
const before = params.before;
|
||||
const lim = params.limit;
|
||||
if (lim < 1) return [];
|
||||
|
||||
const rows = getThreadRoundsStmt.all({
|
||||
runId,
|
||||
before,
|
||||
lim,
|
||||
}) as Array<{ id: number; ts: number; payload: string | null; rn: number }>;
|
||||
|
||||
const out: ThreadRoundRow[] = [];
|
||||
for (const row of rows) {
|
||||
if (row.payload === null) continue;
|
||||
try {
|
||||
const parsed = JSON.parse(row.payload) as unknown;
|
||||
if (
|
||||
parsed !== null &&
|
||||
typeof parsed === "object" &&
|
||||
typeof (parsed as Record<string, unknown>).type === "string"
|
||||
) {
|
||||
out.push({
|
||||
round: row.rn,
|
||||
logId: row.id,
|
||||
ts: row.ts,
|
||||
event: parsed as { type: string; [key: string]: unknown },
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// skip malformed payloads
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function archiveDayTx(day: string, start: number, endExclusive: number): void {
|
||||
runInTransaction(sqlite, () => {
|
||||
deleteLogsForDayStmt.run({ start, endExclusive });
|
||||
@@ -539,6 +633,8 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
getAllWorkflowRuns,
|
||||
getTriggerPayload,
|
||||
getThreadEvents,
|
||||
getThreadRoundCount,
|
||||
getThreadRounds,
|
||||
archiveLogs,
|
||||
close,
|
||||
};
|
||||
|
||||
Generated
+3
@@ -26,6 +26,9 @@ importers:
|
||||
citty:
|
||||
specifier: ^0.1.6
|
||||
version: 0.1.6
|
||||
yaml:
|
||||
specifier: ^2.8.3
|
||||
version: 2.8.3
|
||||
devDependencies:
|
||||
'@rslib/core':
|
||||
specifier: ^0.21.3
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user