diff --git a/packages/cli-workflow/src/commands/serve/routes-live.ts b/packages/cli-workflow/src/commands/serve/routes-live.ts index c28dd1c..23ae355 100644 --- a/packages/cli-workflow/src/commands/serve/routes-live.ts +++ b/packages/cli-workflow/src/commands/serve/routes-live.ts @@ -1,5 +1,4 @@ -import { watch } from "node:fs"; -import { readFile } from "node:fs/promises"; +import { statSync, watch } from "node:fs"; import { dirname, join } from "node:path"; import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; @@ -11,6 +10,30 @@ type PumpState = { carry: string; }; +function fileSize(path: string): number { + try { + return statSync(path).size; + } catch { + return 0; + } +} + +async function readNewBytes(path: string, state: PumpState): Promise { + const size = fileSize(path); + if (size < state.contentOffset) { + // File was truncated — reset + state.contentOffset = 0; + state.carry = ""; + } + if (size <= state.contentOffset) { + return null; + } + const blob = Bun.file(path).slice(state.contentOffset, size); + const chunk = await blob.text(); + state.contentOffset = size; + return chunk; +} + function parseJsonLine(line: string): unknown { try { return JSON.parse(line) as unknown; @@ -28,14 +51,7 @@ function isWorkflowResult(record: unknown): boolean { ); } -function parseNewLines(text: string, state: PumpState): string[] { - if (text.length < state.contentOffset) { - state.contentOffset = 0; - state.carry = ""; - } - - const chunk = text.slice(state.contentOffset); - state.contentOffset = text.length; +function parseNewLines(chunk: string, state: PumpState): string[] { state.carry += chunk; const parts = state.carry.split("\n"); @@ -70,14 +86,17 @@ export function createLiveRoutes(storageRoot: string): Hono { let eventId = 0; async function pumpData(): Promise { - let text: string; + let chunk: string | null; try { - text = await readFile(resolvedDataPath, "utf8"); + chunk = await readNewBytes(resolvedDataPath, dataState); } catch { return false; } + if (chunk === null) { + return false; + } - const lines = parseNewLines(text, dataState); + const lines = parseNewLines(chunk, dataState); for (const line of lines) { const record = parseJsonLine(line); eventId++; @@ -95,14 +114,17 @@ export function createLiveRoutes(storageRoot: string): Hono { } async function pumpInfo(): Promise { - let text: string; + let chunk: string | null; try { - text = await readFile(infoPath, "utf8"); + chunk = await readNewBytes(infoPath, infoState); } catch { return; } + if (chunk === null) { + return; + } - const lines = parseNewLines(text, infoState); + const lines = parseNewLines(chunk, infoState); for (const line of lines) { const record = parseJsonLine(line); if (