Merge pull request 'perf(serve): SSE live pump reads incrementally' (#137) from fix/130-sse-incremental into main
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import { watch } from "node:fs";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import { statSync, watch } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { Hono } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
@@ -11,6 +10,30 @@ type PumpState = {
|
||||
carry: string;
|
||||
};
|
||||
|
||||
function fileSize(path: string): number {
|
||||
try {
|
||||
return statSync(path).size;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
|
||||
const size = fileSize(path);
|
||||
if (size < state.contentOffset) {
|
||||
// File was truncated — reset
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
if (size <= state.contentOffset) {
|
||||
return null;
|
||||
}
|
||||
const blob = Bun.file(path).slice(state.contentOffset, size);
|
||||
const chunk = await blob.text();
|
||||
state.contentOffset = size;
|
||||
return chunk;
|
||||
}
|
||||
|
||||
function parseJsonLine(line: string): unknown {
|
||||
try {
|
||||
return JSON.parse(line) as unknown;
|
||||
@@ -28,14 +51,7 @@ function isWorkflowResult(record: unknown): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
function parseNewLines(text: string, state: PumpState): string[] {
|
||||
if (text.length < state.contentOffset) {
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
|
||||
const chunk = text.slice(state.contentOffset);
|
||||
state.contentOffset = text.length;
|
||||
function parseNewLines(chunk: string, state: PumpState): string[] {
|
||||
state.carry += chunk;
|
||||
|
||||
const parts = state.carry.split("\n");
|
||||
@@ -70,14 +86,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
let eventId = 0;
|
||||
|
||||
async function pumpData(): Promise<boolean> {
|
||||
let text: string;
|
||||
let chunk: string | null;
|
||||
try {
|
||||
text = await readFile(resolvedDataPath, "utf8");
|
||||
chunk = await readNewBytes(resolvedDataPath, dataState);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
if (chunk === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const lines = parseNewLines(text, dataState);
|
||||
const lines = parseNewLines(chunk, dataState);
|
||||
for (const line of lines) {
|
||||
const record = parseJsonLine(line);
|
||||
eventId++;
|
||||
@@ -95,14 +114,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
}
|
||||
|
||||
async function pumpInfo(): Promise<void> {
|
||||
let text: string;
|
||||
let chunk: string | null;
|
||||
try {
|
||||
text = await readFile(infoPath, "utf8");
|
||||
chunk = await readNewBytes(infoPath, infoState);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (chunk === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lines = parseNewLines(text, infoState);
|
||||
const lines = parseNewLines(chunk, infoState);
|
||||
for (const line of lines) {
|
||||
const record = parseJsonLine(line);
|
||||
if (
|
||||
|
||||
Reference in New Issue
Block a user