diff --git a/.gitignore b/.gitignore index 1375393..265c0c2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ node_modules dist .turbo *.tsbuildinfo +*.tgz diff --git a/packages/cli/package.json b/packages/cli/package.json index ca1a142..332bd91 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -23,7 +23,8 @@ }, "dependencies": { "@uncaged/nerve-core": "workspace:*", - "citty": "^0.1.6" + "citty": "^0.1.6", + "yaml": "^2.8.3" }, "devDependencies": { "@rslib/core": "^0.21.3", diff --git a/packages/cli/src/__tests__/workflow.test.ts b/packages/cli/src/__tests__/workflow.test.ts index aca698d..485d613 100644 --- a/packages/cli/src/__tests__/workflow.test.ts +++ b/packages/cli/src/__tests__/workflow.test.ts @@ -18,13 +18,17 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { buildInspectOutput, buildListOutput, + buildThreadCommandOutput, + DEFAULT_THREAD_BUDGET_CHARS, + formatThreadRoundBlock, formatTs, getAllWorkflowRuns, + partitionCommandEvent, parseIntArg, statusIcon, } from "../commands/workflow.js"; import { triggerWorkflowViaDaemon } from "../daemon-client.js"; -import type { LogStore, WorkflowRun } from "../daemon-types.js"; +import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js"; // --------------------------------------------------------------------------- // Test helpers @@ -322,6 +326,97 @@ describe("workflow list — integration with real store", () => { }); }); +// --------------------------------------------------------------------------- +// nerve workflow thread — formatting helpers +// --------------------------------------------------------------------------- + +describe("partitionCommandEvent", () => { + it("splits reserved type, role, content from rest", () => { + const p = partitionCommandEvent({ + type: "scan_done", + role: "scanner", + content: "ok", + items: [1, 2], + }); + expect(p.typeStr).toBe("scan_done"); + expect(p.roleStr).toBe("scanner"); + expect(p.contentBody).toBe("ok"); + expect(p.rest).toEqual({ items: [1, 2] }); + }); + + it("uses fallback role and stringifies non-string content", () => { + const p = partitionCommandEvent({ type: "x", content: { n: 1 } }); + expect(p.roleStr).toBe("?"); + expect(p.contentBody).toBe('{"n":1}'); + }); +}); + +describe("formatThreadRoundBlock", () => { + const row: ThreadRoundRow = { + round: 2, + logId: 99, + ts: new Date("2026-01-02T03:04:05.006Z").getTime(), + event: { type: "reply", role: "bot", content: "hi", score: 0.5 }, + }; + + it("includes header, YAML frontmatter for rest, and body", () => { + const text = formatThreadRoundBlock(row); + expect(text).toContain("[#2 bot]"); + expect(text).toContain("type=reply"); + expect(text).toContain("---\n"); + expect(text).toContain("score: 0.5"); + expect(text).toContain("hi"); + expect(text).not.toContain("role:"); + }); +}); + +describe("buildThreadCommandOutput", () => { + function row(n: number, content: string): ThreadRoundRow { + return { + round: n, + logId: 10 + n, + ts: 1000 + n, + event: { type: "ev", role: "r", content, extra: n }, + }; + } + + it("orders rounds chronologically (oldest first in output)", () => { + const desc = [row(3, "ccc"), row(2, "bbb"), row(1, "aaa")]; + const prefix = ["HEADER\n"]; + const { lines, paginationHint } = buildThreadCommandOutput(prefix, desc, 50_000, "run-x"); + const text = lines.join(""); + const idxA = text.indexOf("\naaa\n"); + const idxB = text.indexOf("\nbbb\n"); + const idxC = text.indexOf("\nccc\n"); + expect(idxA).toBeGreaterThan(-1); + expect(idxB).toBeGreaterThan(idxA); + expect(idxC).toBeGreaterThan(idxB); + expect(paginationHint).toBeNull(); + }); + + it("emits pagination hint with --before when oldest shown round is still > 1", () => { + const desc = [row(4, "d"), row(3, "c")]; + const { paginationHint } = buildThreadCommandOutput([], desc, 50_000, "run-y"); + expect(paginationHint).toContain("--before 3"); + expect(paginationHint).toContain("run-y"); + }); + + it("respects budget and hints with non-default --budget in command", () => { + const big = "y".repeat(500); + const desc = [row(2, big), row(1, "a")]; + const { lines, paginationHint } = buildThreadCommandOutput([], desc, 400, "run-z"); + const text = lines.join(""); + expect(text).toContain("[#2"); + expect(text).not.toContain("[#1"); + expect(paginationHint).toContain("--before 2"); + expect(paginationHint).toContain("--budget 400"); + }); + + it("default budget constant matches workflow command default", () => { + expect(DEFAULT_THREAD_BUDGET_CHARS).toBe(8000); + }); +}); + // --------------------------------------------------------------------------- // parseIntArg // --------------------------------------------------------------------------- diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index 81c8d40..e7386e9 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -2,14 +2,21 @@ import { existsSync } from "node:fs"; import { join } from "node:path"; import { defineCommand } from "citty"; +import { stringify } from "yaml"; import { triggerWorkflowViaDaemon } from "../daemon-client.js"; -import type { LogStore, WorkflowRun } from "../daemon-types.js"; +import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js"; import { loadDaemonModule } from "../workspace-daemon.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; export const DEFAULT_PAGE_SIZE = 20; +/** Default max characters for `nerve workflow thread` output (including run header). */ +export const DEFAULT_THREAD_BUDGET_CHARS = 8000; + +/** Max role-round rows read from SQLite per invocation (DESC by round). */ +export const THREAD_ROUNDS_FETCH_LIMIT = 8192; + export function parseIntArg(raw: string, fallback: number): number { const v = Number.parseInt(raw, 10); return Number.isNaN(v) ? fallback : v; @@ -172,6 +179,123 @@ export function buildInspectOutput( return { header, eventLines, paginationHint }; } +// --------------------------------------------------------------------------- +// nerve workflow thread — agent-oriented role rounds +// --------------------------------------------------------------------------- + +export type PartitionedEvent = { + typeStr: string; + roleStr: string; + contentBody: string; + rest: Record; +}; + +/** + * Split a CommandEvent: `type`, `role`, and `content` are reserved for the + * header / body; all other fields are serialized as YAML frontmatter. + */ +export function partitionCommandEvent(event: Record): PartitionedEvent { + const typeStr = + typeof event.type === "string" ? event.type : String(event.type === undefined ? "?" : event.type); + const roleStr = typeof event.role === "string" ? event.role : "?"; + const contentRaw = event.content; + const contentBody = + contentRaw === undefined || contentRaw === null + ? "" + : typeof contentRaw === "string" + ? contentRaw + : JSON.stringify(contentRaw); + const rest: Record = {}; + for (const key of Object.keys(event)) { + if (key === "type" || key === "role" || key === "content") continue; + rest[key] = event[key]; + } + return { typeStr, roleStr, contentBody, rest }; +} + +/** + * One role round as plain text: header line, YAML frontmatter (`rest` only), body (`content`). + */ +export function formatThreadRoundBlock(row: ThreadRoundRow): string { + const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event); + const yamlBlock = + Object.keys(rest).length === 0 ? "{}\n" : `${stringify(rest, { lineWidth: 100 })}\n`; + return ( + `[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` + + `---\n` + + yamlBlock + + `---\n` + + `${contentBody}\n\n` + ); +} + +export type ThreadCommandOutput = { + lines: string[]; + paginationHint: string | null; +}; + +/** + * Build stdout lines for `nerve workflow thread`: newest-first selection from + * `descRows` until `budgetChars` (including `prefixLines`), then chronological order. + */ +export function buildThreadCommandOutput( + prefixLines: string[], + descRows: ThreadRoundRow[], + budgetChars: number, + runId: string, +): ThreadCommandOutput { + const prefixText = prefixLines.join(""); + let remaining = Math.max(0, budgetChars - prefixText.length); + const picked: ThreadRoundRow[] = []; + + const budgetFlag = + budgetChars === DEFAULT_THREAD_BUDGET_CHARS ? "" : ` --budget ${String(budgetChars)}`; + + for (const row of descRows) { + const block = formatThreadRoundBlock(row); + if (block.length <= remaining) { + picked.push(row); + remaining -= block.length; + continue; + } + if (picked.length === 0) { + const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event); + const yamlBlock = + Object.keys(rest).length === 0 + ? "{}\n" + : `${stringify(rest, { lineWidth: 100 })}\n`; + const header = + `[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` + `---\n` + yamlBlock + `---\n`; + const maxBody = Math.max(0, remaining - header.length - `[truncated]\n`.length); + const truncated = + maxBody > 0 && contentBody.length > maxBody + ? `${contentBody.slice(0, maxBody)}\n[truncated]\n` + : `${contentBody}\n[truncated]\n`; + const single = header + truncated + "\n"; + const hintRound = row.round; + return { + lines: [...prefixLines, single], + paginationHint: + hintRound > 1 + ? `\n⏩ Older rounds exist. Fetch with:\n nerve workflow thread ${runId} --before ${String(hintRound)}${budgetFlag}\n` + : null, + }; + } + break; + } + + const blocksAsc = picked.map(formatThreadRoundBlock).reverse(); + const shownMinRound = picked.length === 0 ? null : Math.min(...picked.map((r) => r.round)); + let paginationHint: string | null = null; + if (shownMinRound !== null && shownMinRound > 1) { + paginationHint = + `\n⏩ Older rounds not shown. Fetch with:\n` + + ` nerve workflow thread ${runId} --before ${String(shownMinRound)}${budgetFlag}\n`; + } + + return { lines: [...prefixLines, ...blocksAsc], paginationHint }; +} + // --------------------------------------------------------------------------- // nerve workflow list // --------------------------------------------------------------------------- @@ -293,6 +417,92 @@ const workflowInspectCommand = defineCommand({ }, }); +// --------------------------------------------------------------------------- +// nerve workflow thread +// --------------------------------------------------------------------------- + +const workflowThreadCommand = defineCommand({ + meta: { + name: "thread", + description: "Print role rounds for a workflow run (agent-oriented, budget-limited)", + }, + args: { + runId: { + type: "positional", + description: "The run ID to dump role rounds for", + }, + before: { + type: "string", + description: + "Exclusive upper bound on 1-based round index (use with hint from prior output to load older rounds)", + default: "0", + }, + budget: { + type: "string", + description: `Max output characters including header (default: ${String(DEFAULT_THREAD_BUDGET_CHARS)})`, + default: String(DEFAULT_THREAD_BUDGET_CHARS), + }, + }, + async run({ args }) { + const store = await openStore(); + + try { + const before = Math.max(0, parseIntArg(args.before, 0)); + const budgetChars = Math.max(1, parseIntArg(args.budget, DEFAULT_THREAD_BUDGET_CHARS)); + + const run = store.getWorkflowRun(args.runId); + if (run === null) { + process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`); + process.exit(1); + } + + const totalRoleRounds = store.getThreadRoundCount(args.runId); + if (totalRoleRounds === 0) { + process.stdout.write( + `🧵 Workflow thread: ${run.runId}\n` + + ` workflow: ${run.workflow}\n` + + ` status: ${run.status}\n\n` + + `📭 No role rounds recorded for this run.\n`, + ); + return; + } + + const descRows = store.getThreadRounds(args.runId, { + before, + limit: THREAD_ROUNDS_FETCH_LIMIT, + }); + + const prefixLines = [ + `🧵 Role rounds (workflow thread)\n`, + ` runId: ${run.runId}\n`, + ` workflow: ${run.workflow}\n`, + ` status: ${run.status}\n`, + ` rounds: ${String(totalRoleRounds)} role event(s) total\n\n`, + ]; + + const { lines, paginationHint } = buildThreadCommandOutput( + prefixLines, + descRows, + budgetChars, + args.runId, + ); + + for (const line of lines) { + process.stdout.write(line); + } + if (paginationHint !== null) { + process.stdout.write(paginationHint); + } + + if (descRows.length === 0 && before > 0) { + process.stdout.write(`\n📭 No rounds with index < ${String(before)}.\n`); + } + } finally { + store.close(); + } + }, +}); + // --------------------------------------------------------------------------- // nerve workflow trigger // --------------------------------------------------------------------------- @@ -359,6 +569,7 @@ export const workflowCommand = defineCommand({ subCommands: { list: workflowListCommand, inspect: workflowInspectCommand, + thread: workflowThreadCommand, trigger: workflowTriggerCommand, }, }); diff --git a/packages/cli/src/daemon-types.ts b/packages/cli/src/daemon-types.ts index f03854a..edb2fc2 100644 --- a/packages/cli/src/daemon-types.ts +++ b/packages/cli/src/daemon-types.ts @@ -58,6 +58,20 @@ export type ArchiveLogsResult = { vacuumed: boolean; }; +/** One role round row — keep in sync with daemon `log-store` `ThreadRoundRow`. */ +export type ThreadRoundRow = { + round: number; + logId: number; + ts: number; + event: { type: string; [key: string]: unknown }; +}; + +/** Keep in sync with daemon `log-store` `GetThreadRoundsParams`. */ +export type GetThreadRoundsParams = { + before: number; + limit: number; +}; + /** Subset of daemon LogStore used by the CLI workflow commands. */ export type LogStore = { query: (filter?: LogQuery) => LogEntry[]; @@ -65,6 +79,8 @@ export type LogStore = { getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[]; upsertWorkflowRun: (entry: Omit, run: WorkflowRun) => LogEntry; + getThreadRoundCount: (runId: string) => number; + getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[]; archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult; close: () => void; }; diff --git a/packages/daemon/rslib.config.ts b/packages/daemon/rslib.config.ts index 216ba20..7a26194 100644 --- a/packages/daemon/rslib.config.ts +++ b/packages/daemon/rslib.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ entry: { index: "src/index.ts", "sense-worker": "src/sense-worker.ts", + "workflow-worker": "src/workflow-worker.ts", }, }, output: { diff --git a/packages/daemon/src/__tests__/crash-recovery.test.ts b/packages/daemon/src/__tests__/crash-recovery.test.ts index b4eea4c..52f99f5 100644 --- a/packages/daemon/src/__tests__/crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/crash-recovery.test.ts @@ -91,6 +91,8 @@ function makeLogStore( }), getTriggerPayload: vi.fn((): unknown => ({ value: 42 })), getThreadEvents: vi.fn((): Array<{ type: string; [key: string]: unknown }> => [{ type: "thread_start", triggerPayload: {} }]), + getThreadRoundCount: vi.fn(() => 0), + getThreadRounds: vi.fn(() => []), archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), getAllWorkflowRuns: vi.fn(() => []), diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index 0d56a24..35e5071 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -77,6 +77,8 @@ function makeLogStore() { getActiveWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + getThreadRoundCount: vi.fn(() => 0), + getThreadRounds: vi.fn(() => []), archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), getAllWorkflowRuns: vi.fn(() => []), diff --git a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts index 0b4bc19..b87be6a 100644 --- a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts +++ b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts @@ -74,6 +74,8 @@ function makeMockLogStore() { getAllWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + getThreadRoundCount: vi.fn(() => 0), + getThreadRounds: vi.fn(() => []), archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index da6c09c..4a8b19a 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -81,6 +81,8 @@ function makeLogStore() { getAllWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + getThreadRoundCount: vi.fn(() => 0), + getThreadRounds: vi.fn(() => []), archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; diff --git a/packages/daemon/src/__tests__/log-store-crash-recovery.test.ts b/packages/daemon/src/__tests__/log-store-crash-recovery.test.ts index 46e4de5..7145b98 100644 --- a/packages/daemon/src/__tests__/log-store-crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/log-store-crash-recovery.test.ts @@ -195,4 +195,65 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => { expect(result8[0].type).toBe("event_for_8"); }); }); + + describe("getThreadRoundCount / getThreadRounds", () => { + it("excludes thread_start from rounds and assigns ROW_NUMBER in chronological order", () => { + store.append({ + source: "workflow", + type: "thread_command_event", + refId: "run-tr", + payload: JSON.stringify({ type: "thread_start", triggerPayload: { x: 1 } }), + ts: 100, + }); + store.append({ + source: "workflow", + type: "thread_command_event", + refId: "run-tr", + payload: JSON.stringify({ + type: "step_a", + role: "alpha", + content: "hello", + meta: 1, + }), + ts: 101, + }); + store.append({ + source: "workflow", + type: "thread_command_event", + refId: "run-tr", + payload: JSON.stringify({ type: "step_b", role: "beta", content: "world" }), + ts: 102, + }); + + expect(store.getThreadRoundCount("run-tr")).toBe(2); + + const all = store.getThreadRounds("run-tr", { before: 0, limit: 50 }); + expect(all).toHaveLength(2); + expect(all.map((r) => r.round)).toEqual([2, 1]); + expect(all[0].event.type).toBe("step_b"); + expect(all[1].event.type).toBe("step_a"); + }); + + it("getThreadRounds respects exclusive before bound", () => { + for (let i = 0; i < 3; i++) { + store.append({ + source: "workflow", + type: "thread_command_event", + refId: "run-b4", + payload: JSON.stringify({ type: `ev_${i}`, role: "r", content: String(i) }), + ts: 200 + i, + }); + } + + expect(store.getThreadRoundCount("run-b4")).toBe(3); + + const page = store.getThreadRounds("run-b4", { before: 3, limit: 50 }); + expect(page.map((r) => r.round)).toEqual([2, 1]); + }); + + it("returns empty when no role rounds for runId", () => { + expect(store.getThreadRoundCount("missing")).toBe(0); + expect(store.getThreadRounds("missing", { before: 0, limit: 10 })).toHaveLength(0); + }); + }); }); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts index 8fb3a50..7227f57 100644 --- a/packages/daemon/src/__tests__/workflow-manager.test.ts +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -74,6 +74,8 @@ function makeLogStore() { getActiveWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + getThreadRoundCount: vi.fn(() => 0), + getThreadRounds: vi.fn(() => []), archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), getAllWorkflowRuns: vi.fn(() => []), diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 4d67c94..be51c7b 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -47,6 +47,8 @@ export type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult, + ThreadRoundRow, + GetThreadRoundsParams, } from "./log-store.js"; export { createWorkflowManager } from "./workflow-manager.js"; diff --git a/packages/daemon/src/log-store.ts b/packages/daemon/src/log-store.ts index 229f5b0..e197085 100644 --- a/packages/daemon/src/log-store.ts +++ b/packages/daemon/src/log-store.ts @@ -83,6 +83,25 @@ export type WorkflowRun = { ts: number; }; +/** One role-produced command-event row with 1-based round index (ROW_NUMBER over role events only). */ +export type ThreadRoundRow = { + round: number; + logId: number; + ts: number; + event: { type: string; [key: string]: unknown }; +}; + +/** Parameters for {@link LogStore.getThreadRounds}. */ +export type GetThreadRoundsParams = { + /** + * Exclusive upper bound on round index (1-based among role events). + * Use `0` to include all rounds (subject to `limit`). + */ + before: number; + /** Maximum rows returned from the DB (DESC by round). */ + limit: number; +}; + export type LogStore = { append: (entry: Omit) => LogEntry; query: (filter?: LogQuery) => LogEntry[]; @@ -120,6 +139,17 @@ export type LogStore = { * Used for crash recovery to rebuild ThreadState. */ getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>; + /** + * Count role command events for a run (excludes `thread_start` and invalid payloads). + * Round indices for {@link getThreadRounds} are 1..count in chronological order. + */ + getThreadRoundCount: (runId: string) => number; + /** + * Role rounds for agent-oriented retrieval: each row is one `thread_command_event` + * whose JSON `type` is not `thread_start`, with `round` from ROW_NUMBER() OVER (ORDER BY id ASC). + * No schema migration — numbering is computed in SQL. + */ + getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[]; /** * Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`, * then delete those rows and advance `meta.archived_up_to` in one transaction per day @@ -279,6 +309,28 @@ export function createLogStore(dbPath: string): LogStore { "SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC", ); + const getThreadRoundCountStmt = sqlite.prepare( + `SELECT COUNT(*) AS c FROM logs + WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? + AND payload IS NOT NULL AND json_valid(payload) = 1 + AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'`, + ); + + const getThreadRoundsStmt = sqlite.prepare( + `WITH numbered AS ( + SELECT id, ts, payload, + ROW_NUMBER() OVER (ORDER BY id ASC) AS rn + FROM logs + WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = @runId + AND payload IS NOT NULL AND json_valid(payload) = 1 + AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start' + ) + SELECT id, ts, payload, rn FROM numbered + WHERE (@before = 0 OR rn < @before) + ORDER BY rn DESC + LIMIT @lim`, + ); + const getActiveWorkflowRunsStmt = sqlite.prepare( "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC", ); @@ -475,6 +527,48 @@ export function createLogStore(dbPath: string): LogStore { return result; } + function getThreadRoundCount(runId: string): number { + const row = getThreadRoundCountStmt.get(runId) as { c: number } | undefined; + const c = row?.c; + if (c === null || c === undefined) return 0; + return Number(c); + } + + function getThreadRounds(runId: string, params: GetThreadRoundsParams): ThreadRoundRow[] { + const before = params.before; + const lim = params.limit; + if (lim < 1) return []; + + const rows = getThreadRoundsStmt.all({ + runId, + before, + lim, + }) as Array<{ id: number; ts: number; payload: string | null; rn: number }>; + + const out: ThreadRoundRow[] = []; + for (const row of rows) { + if (row.payload === null) continue; + try { + const parsed = JSON.parse(row.payload) as unknown; + if ( + parsed !== null && + typeof parsed === "object" && + typeof (parsed as Record).type === "string" + ) { + out.push({ + round: row.rn, + logId: row.id, + ts: row.ts, + event: parsed as { type: string; [key: string]: unknown }, + }); + } + } catch { + // skip malformed payloads + } + } + return out; + } + function archiveDayTx(day: string, start: number, endExclusive: number): void { runInTransaction(sqlite, () => { deleteLogsForDayStmt.run({ start, endExclusive }); @@ -539,6 +633,8 @@ export function createLogStore(dbPath: string): LogStore { getAllWorkflowRuns, getTriggerPayload, getThreadEvents, + getThreadRoundCount, + getThreadRounds, archiveLogs, close, }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eae9647..b83ffc4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -26,6 +26,9 @@ importers: citty: specifier: ^0.1.6 version: 0.1.6 + yaml: + specifier: ^2.8.3 + version: 2.8.3 devDependencies: '@rslib/core': specifier: ^0.21.3