refactor(store): rename LogEntry.ts → LogEntry.timestamp

- Rename logs table column ts → timestamp (no migration, breaking)
- Update all SQL, type definitions, and consumers
- Integration tests use mkdtempSync to avoid stale DB conflicts

Fixes #113
This commit is contained in:
2026-04-25 02:08:57 +00:00
parent 529cceba06
commit 4da2c87a77
19 changed files with 282 additions and 148 deletions
@@ -30,8 +30,8 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("exports one UTC day to JSONL, deletes rows, advances archived_up_to", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', ts });
store.append({ source: "reflex", type: "y", refId: "z", payload: null, ts: ts + 1 });
store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', timestamp: ts });
store.append({ source: "reflex", type: "y", refId: "z", payload: null, timestamp: ts + 1 });
const now = nowForLastArchivableFeb1();
const result = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
@@ -61,7 +61,7 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("does nothing when all logs are inside the hot window", () => {
const now = Date.UTC(2026, 3, 23, 12, 0, 0);
const ts = now - 5 * DAY_MS;
store.append({ source: "system", type: "warm", refId: null, payload: null, ts });
store.append({ source: "system", type: "warm", refId: null, payload: null, timestamp: ts });
const r = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(r.days).toHaveLength(0);
expect(store.query()).toHaveLength(1);
@@ -69,7 +69,7 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("second archive with same clock is a no-op (watermark already caught up)", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
store.append({ source: "system", type: "x", refId: null, payload: null, timestamp: ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
@@ -82,11 +82,11 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("overwrites JSONL when the same UTC day is archived again after watermark rewind", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "a", type: "1", refId: null, payload: null, ts });
store.append({ source: "a", type: "1", refId: null, payload: null, timestamp: ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-31");
store.append({ source: "b", type: "2", refId: null, payload: null, ts: ts + 100 });
store.append({ source: "b", type: "2", refId: null, payload: null, timestamp: ts + 100 });
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
@@ -98,8 +98,8 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("respects maxDays across invocations", () => {
const t1 = Date.UTC(2026, 1, 1, 10, 0, 0);
const t2 = Date.UTC(2026, 1, 2, 10, 0, 0);
store.append({ source: "system", type: "a", refId: null, payload: null, ts: t1 });
store.append({ source: "system", type: "b", refId: null, payload: null, ts: t2 });
store.append({ source: "system", type: "a", refId: null, payload: null, timestamp: t1 });
store.append({ source: "system", type: "b", refId: null, payload: null, timestamp: t2 });
const now = Date.UTC(2027, 0, 1, 12, 0, 0);
const r1 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
@@ -116,7 +116,7 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("starts from earliest log day when it is before watermark+1", () => {
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-10");
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "x", type: "p", refId: null, payload: null, ts });
store.append({ source: "x", type: "p", refId: null, payload: null, timestamp: ts });
const result = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
expect(result.days.map((d) => d.day)).toContain("2026-02-01");
});
@@ -128,7 +128,7 @@ describe("LogStore — cold archive (RFC-001 §5.4)", () => {
it("runs VACUUM when vacuum: true", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
store.append({ source: "system", type: "x", refId: null, payload: null, timestamp: ts });
const r = store.archiveLogs({
now: nowForLastArchivableFeb1(),
retentionMs: 30 * DAY_MS,
@@ -39,7 +39,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "started",
refId: "run-1",
payload: JSON.stringify({ triggerPayload: payload }),
ts: 1000,
timestamp: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", ts: 1000 },
);
@@ -55,7 +55,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "started",
refId: "run-2",
payload: null,
ts: 1000,
timestamp: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", ts: 1000 },
);
@@ -72,14 +72,14 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadA }),
ts: 100,
timestamp: 100,
});
store.append({
source: "workflow",
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadB }),
ts: 200,
timestamp: 200,
});
const result = store.getTriggerPayload("run-3");
@@ -106,7 +106,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-4",
payload: JSON.stringify(event),
ts: Date.now(),
timestamp: Date.now(),
});
}
@@ -123,14 +123,14 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-5",
payload: null,
ts: 1000,
timestamp: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-5",
payload: JSON.stringify({ type: "valid_event" }),
ts: 1001,
timestamp: 1001,
});
const result = store.getThreadEvents("run-5");
@@ -146,7 +146,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "started",
refId: "run-6",
payload: JSON.stringify({ triggerPayload: {} }),
ts: 1000,
timestamp: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", ts: 1000 },
);
@@ -155,14 +155,14 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-6",
payload: JSON.stringify({ type: "step_one" }),
ts: 1001,
timestamp: 1001,
});
store.append({
source: "workflow",
type: "step_complete",
refId: "run-6",
payload: JSON.stringify({ message: "done step" }),
ts: 1002,
timestamp: 1002,
});
const result = store.getThreadEvents("run-6");
@@ -176,14 +176,14 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-7",
payload: JSON.stringify({ type: "event_for_7" }),
ts: 1000,
timestamp: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-8",
payload: JSON.stringify({ type: "event_for_8" }),
ts: 1001,
timestamp: 1001,
});
const result7 = store.getThreadEvents("run-7");
@@ -203,7 +203,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-tr",
payload: JSON.stringify({ type: "thread_start", triggerPayload: { x: 1 } }),
ts: 100,
timestamp: 100,
});
store.append({
source: "workflow",
@@ -215,14 +215,14 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
content: "hello",
meta: 1,
}),
ts: 101,
timestamp: 101,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-tr",
payload: JSON.stringify({ type: "step_b", role: "beta", content: "world" }),
ts: 102,
timestamp: 102,
});
expect(store.getThreadRoundCount("run-tr")).toBe(2);
@@ -241,7 +241,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
type: "thread_command_event",
refId: "run-b4",
payload: JSON.stringify({ type: `ev_${i}`, role: "r", content: String(i) }),
ts: 200 + i,
timestamp: 200 + i,
});
}
@@ -34,7 +34,7 @@ describe("LogStore — workflow_runs", () => {
};
const entry = store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-1", payload: null, ts: 1000 },
{ source: "workflow", type: "started", refId: "run-1", payload: null, timestamp: 1000 },
run,
);
@@ -52,12 +52,12 @@ describe("LogStore — workflow_runs", () => {
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, ts: 1000 },
{ source: "workflow", type: "started", refId: "run-2", payload: null, timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", ts: 1000 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, ts: 2000 },
{ source: "workflow", type: "completed", refId: "run-2", payload: null, timestamp: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", ts: 2000 },
);
@@ -78,7 +78,7 @@ describe("LogStore — workflow_runs", () => {
["completed", "completed", 1005],
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, ts },
{ source: "workflow", type, refId: "run-3", payload: null, timestamp: ts },
{ runId: "run-3", workflow: "cleanup", status, ts },
);
}
@@ -97,11 +97,11 @@ describe("LogStore — workflow_runs", () => {
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, ts: 100 },
{ source: "workflow", type: "queued", refId: "run-4", payload: null, timestamp: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, ts: 200 },
{ source: "workflow", type: "started", refId: "run-4", payload: null, timestamp: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", ts: 200 },
);
@@ -114,19 +114,19 @@ describe("LogStore — workflow_runs", () => {
describe("getActiveWorkflowRuns", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, ts: 100 },
{ source: "workflow", type: "queued", refId: "r1", payload: null, timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, ts: 200 },
{ source: "workflow", type: "started", refId: "r2", payload: null, timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", ts: 200 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, ts: 300 },
{ source: "workflow", type: "completed", refId: "r3", payload: null, timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", ts: 300 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, ts: 400 },
{ source: "workflow", type: "failed", refId: "r4", payload: null, timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", ts: 400 },
);
});
@@ -176,7 +176,7 @@ describe("LogStore — workflow_runs", () => {
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, ts: 1 },
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: 1 },
{ runId, workflow: "test", status, ts: 1 },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
+67 -19
View File
@@ -27,7 +27,7 @@ describe("LogStore", () => {
type: "start",
refId: null,
payload: null,
ts: 1000,
timestamp: 1000,
});
expect(entry.id).toBe(1);
@@ -41,28 +41,40 @@ describe("LogStore", () => {
type: "start",
refId: null,
payload: null,
ts: 1000,
timestamp: 1000,
});
const e2 = store.append({
source: "system",
type: "stop",
refId: null,
payload: null,
ts: 2000,
timestamp: 2000,
});
expect(e2.id).toBe((e1.id ?? 0) + 1);
});
it("returns all entries when queried with no filter", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "system",
type: "start",
refId: null,
payload: null,
timestamp: 1000,
});
store.append({
source: "reflex",
type: "run_start",
refId: "cpu",
payload: null,
timestamp: 2000,
});
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
timestamp: 3000,
});
const all = store.query();
@@ -72,23 +84,35 @@ describe("LogStore", () => {
describe("query filters", () => {
beforeEach(() => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "system",
type: "start",
refId: null,
payload: null,
timestamp: 1000,
});
store.append({
source: "reflex",
type: "run_start",
refId: "cpu",
payload: null,
timestamp: 2000,
});
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
timestamp: 3000,
});
store.append({
source: "system",
type: "error",
refId: "disk",
payload: '{"error":"fail"}',
ts: 4000,
timestamp: 4000,
});
store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 });
store.append({ source: "system", type: "stop", refId: null, payload: null, timestamp: 5000 });
});
it("filters by source", () => {
@@ -111,7 +135,7 @@ describe("LogStore", () => {
it("filters by since (inclusive)", () => {
const results = store.query({ since: 3000 });
expect(results).toHaveLength(3);
expect(results[0].ts).toBe(3000);
expect(results[0].timestamp).toBe(3000);
});
it("filters by until (inclusive)", () => {
@@ -146,12 +170,24 @@ describe("LogStore", () => {
describe("query ordering", () => {
it("returns entries in insertion order (ascending id)", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 });
store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 });
store.append({
source: "system",
type: "start",
refId: null,
payload: null,
timestamp: 5000,
});
store.append({
source: "reflex",
type: "run_start",
refId: "a",
payload: null,
timestamp: 1000,
});
const all = store.query();
expect(all[0].ts).toBe(5000);
expect(all[1].ts).toBe(1000);
expect(all[0].timestamp).toBe(5000);
expect(all[1].timestamp).toBe(1000);
});
});
@@ -182,7 +218,7 @@ describe("LogStore", () => {
describe("append-only semantics", () => {
it("ids are always increasing", () => {
const entries = Array.from({ length: 10 }, (_, i) =>
store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }),
store.append({ source: "system", type: "test", refId: null, payload: null, timestamp: i }),
);
for (let i = 1; i < entries.length; i++) {
@@ -194,7 +230,13 @@ describe("LogStore", () => {
describe("payload JSON round-trip", () => {
it("preserves JSON payload", () => {
const payload = JSON.stringify({ cpu: 95, host: "node-1" });
store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 });
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload,
timestamp: 1000,
});
const results = store.query({ refId: "cpu" });
expect(results).toHaveLength(1);
@@ -206,7 +248,13 @@ describe("LogStore", () => {
it("creates nested directory structure for db path", () => {
const deepPath = join(tmpDir, "a", "b", "c", "test.db");
const deepStore = createLogStore(deepPath);
deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
deepStore.append({
source: "system",
type: "start",
refId: null,
payload: null,
timestamp: 1000,
});
expect(deepStore.query()).toHaveLength(1);
deepStore.close();
});
+26 -26
View File
@@ -35,7 +35,7 @@ export type LogEntry = {
type: string;
refId: string | null;
payload: string | null;
ts: number;
timestamp: number;
};
export type LogQuery = {
@@ -174,16 +174,16 @@ export type LogStore = {
const SCHEMA_SQL = `
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
type TEXT NOT NULL,
ref_id TEXT,
payload TEXT,
ts INTEGER NOT NULL
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
type TEXT NOT NULL,
ref_id TEXT,
payload TEXT,
timestamp INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type);
CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id);
CREATE TABLE IF NOT EXISTS meta (
@@ -208,7 +208,7 @@ type SqlLogRow = {
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
timestamp: number;
};
function buildJsonlBody(rows: SqlLogRow[]): string {
@@ -220,7 +220,7 @@ function buildJsonlBody(rows: SqlLogRow[]): string {
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
timestamp: r.timestamp,
}),
);
return `${lines.join("\n")}\n`;
@@ -333,7 +333,7 @@ export function createLogStore(dbPath: string): LogStore {
sqlite.exec(SCHEMA_SQL);
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)",
"INSERT INTO logs (source, type, ref_id, payload, timestamp) VALUES (@source, @type, @refId, @payload, @timestamp)",
);
const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?");
@@ -371,7 +371,7 @@ export function createLogStore(dbPath: string): LogStore {
const getThreadRoundsStmt = sqlite.prepare(
`WITH numbered AS (
SELECT id, ts, payload,
SELECT id, timestamp, payload,
ROW_NUMBER() OVER (ORDER BY id ASC) AS rn
FROM logs
WHERE source = 'workflow' AND type IN ('thread_command_event', 'thread_workflow_message') AND ref_id = @runId
@@ -379,7 +379,7 @@ export function createLogStore(dbPath: string): LogStore {
AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'
AND COALESCE(json_extract(payload, '$.role'), '') != '__start__'
)
SELECT id, ts, payload, rn FROM numbered
SELECT id, timestamp, payload, rn FROM numbered
WHERE (@before = 0 OR rn < @before)
ORDER BY rn DESC
LIMIT @lim`,
@@ -401,12 +401,12 @@ export function createLogStore(dbPath: string): LogStore {
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC",
);
const minLogTsStmt = sqlite.prepare("SELECT MIN(ts) AS m FROM logs");
const minLogTsStmt = sqlite.prepare("SELECT MIN(timestamp) AS m FROM logs");
const selectLogsForDayStmt = sqlite.prepare(
"SELECT id, source, type, ref_id, payload, ts FROM logs WHERE ts >= @start AND ts < @endExclusive ORDER BY id ASC",
"SELECT id, source, type, ref_id, payload, timestamp FROM logs WHERE timestamp >= @start AND timestamp < @endExclusive ORDER BY id ASC",
);
const deleteLogsForDayStmt = sqlite.prepare(
"DELETE FROM logs WHERE ts >= @start AND ts < @endExclusive",
"DELETE FROM logs WHERE timestamp >= @start AND timestamp < @endExclusive",
);
function upsertWorkflowRunTx(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
@@ -416,7 +416,7 @@ export function createLogStore(dbPath: string): LogStore {
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
timestamp: entry.timestamp,
});
upsertWorkflowRunStmt.run({
runId: run.runId,
@@ -434,7 +434,7 @@ export function createLogStore(dbPath: string): LogStore {
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
timestamp: entry.timestamp,
});
return { ...entry, id: Number(info.lastInsertRowid) };
}
@@ -456,17 +456,17 @@ export function createLogStore(dbPath: string): LogStore {
params.refId = filter.refId;
}
if (filter.since !== undefined) {
conditions.push("ts >= @since");
conditions.push("timestamp >= @since");
params.since = filter.since;
}
if (filter.until !== undefined) {
conditions.push("ts <= @until");
conditions.push("timestamp <= @until");
params.until = filter.until;
}
const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : "";
const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`;
const sql = `SELECT id, source, type, ref_id, payload, timestamp FROM logs ${where} ORDER BY id ASC ${limit}`;
const rows = sqlite.prepare(sql).all(params) as Array<{
id: number;
@@ -474,7 +474,7 @@ export function createLogStore(dbPath: string): LogStore {
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
timestamp: number;
}>;
return rows.map((r) => ({
@@ -483,7 +483,7 @@ export function createLogStore(dbPath: string): LogStore {
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
timestamp: r.timestamp,
}));
}
@@ -650,14 +650,14 @@ export function createLogStore(dbPath: string): LogStore {
runId,
before: params.before,
lim: params.limit,
}) as Array<{ id: number; ts: number; payload: string | null; rn: number }>;
}) as Array<{ id: number; timestamp: number; payload: string | null; rn: number }>;
const out: ThreadRoundRow[] = [];
for (const row of rows) {
if (row.payload === null) continue;
const message = parseRoundPayload(row.payload, row.ts);
const message = parseRoundPayload(row.payload, row.timestamp);
if (message !== null) {
out.push({ round: row.rn, logId: row.id, ts: row.ts, message });
out.push({ round: row.rn, logId: row.id, ts: row.timestamp, message });
}
}
return out;