Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 61fc1cfe1b | |||
| dedab62c49 | |||
| a44f1f34a8 |
@@ -1,5 +1,4 @@
|
||||
import { watch } from "node:fs";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import { statSync, watch } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { Hono } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
@@ -11,6 +10,30 @@ type PumpState = {
|
||||
carry: string;
|
||||
};
|
||||
|
||||
function fileSize(path: string): number {
|
||||
try {
|
||||
return statSync(path).size;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
|
||||
const size = fileSize(path);
|
||||
if (size < state.contentOffset) {
|
||||
// File was truncated — reset
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
if (size <= state.contentOffset) {
|
||||
return null;
|
||||
}
|
||||
const blob = Bun.file(path).slice(state.contentOffset, size);
|
||||
const chunk = await blob.text();
|
||||
state.contentOffset = size;
|
||||
return chunk;
|
||||
}
|
||||
|
||||
function parseJsonLine(line: string): unknown {
|
||||
try {
|
||||
return JSON.parse(line) as unknown;
|
||||
@@ -28,14 +51,7 @@ function isWorkflowResult(record: unknown): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
function parseNewLines(text: string, state: PumpState): string[] {
|
||||
if (text.length < state.contentOffset) {
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
|
||||
const chunk = text.slice(state.contentOffset);
|
||||
state.contentOffset = text.length;
|
||||
function parseNewLines(chunk: string, state: PumpState): string[] {
|
||||
state.carry += chunk;
|
||||
|
||||
const parts = state.carry.split("\n");
|
||||
@@ -70,14 +86,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
let eventId = 0;
|
||||
|
||||
async function pumpData(): Promise<boolean> {
|
||||
let text: string;
|
||||
let chunk: string | null;
|
||||
try {
|
||||
text = await readFile(resolvedDataPath, "utf8");
|
||||
chunk = await readNewBytes(resolvedDataPath, dataState);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
if (chunk === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const lines = parseNewLines(text, dataState);
|
||||
const lines = parseNewLines(chunk, dataState);
|
||||
for (const line of lines) {
|
||||
const record = parseJsonLine(line);
|
||||
eventId++;
|
||||
@@ -95,14 +114,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
}
|
||||
|
||||
async function pumpInfo(): Promise<void> {
|
||||
let text: string;
|
||||
let chunk: string | null;
|
||||
try {
|
||||
text = await readFile(infoPath, "utf8");
|
||||
chunk = await readNewBytes(infoPath, infoState);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (chunk === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lines = parseNewLines(text, infoState);
|
||||
const lines = parseNewLines(chunk, infoState);
|
||||
for (const line of lines) {
|
||||
const record = parseJsonLine(line);
|
||||
if (
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { useState } from "react";
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
import { useSSE } from "../use-sse.ts";
|
||||
|
||||
type Props = {
|
||||
threadId: string;
|
||||
@@ -8,8 +9,22 @@ type Props = {
|
||||
};
|
||||
|
||||
export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
const sse = useSSE(threadId);
|
||||
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
|
||||
const [actionStatus, setActionStatus] = useState<string | null>(null);
|
||||
const recordsEndRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
const liveActive = sse.connected && !sse.completed;
|
||||
const records = liveActive
|
||||
? sse.records
|
||||
: status === "ok"
|
||||
? data.records
|
||||
: ([] as typeof sse.records);
|
||||
|
||||
// biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows
|
||||
useEffect(() => {
|
||||
recordsEndRef.current?.scrollIntoView({ behavior: "smooth" });
|
||||
}, [records.length]);
|
||||
|
||||
async function handleAction(action: "kill" | "pause" | "resume") {
|
||||
setActionStatus(`${action}ing...`);
|
||||
@@ -61,20 +76,34 @@ export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<h2 className="text-xl font-semibold mb-2 font-mono">{threadId}</h2>
|
||||
<h2 className="text-xl font-semibold mb-2 font-mono flex items-center gap-2 flex-wrap">
|
||||
<span>{threadId}</span>
|
||||
{sse.connected && (
|
||||
<span
|
||||
className="text-xs font-medium px-2 py-0.5 rounded"
|
||||
style={{ background: "var(--color-success)", color: "var(--color-bg)" }}
|
||||
>
|
||||
Live
|
||||
</span>
|
||||
)}
|
||||
</h2>
|
||||
{actionStatus && (
|
||||
<p className="text-xs mb-4" style={{ color: "var(--color-text-muted)" }}>
|
||||
{actionStatus}
|
||||
</p>
|
||||
)}
|
||||
|
||||
{status === "loading" && <p style={{ color: "var(--color-text-muted)" }}>Loading...</p>}
|
||||
{status === "error" && <p style={{ color: "var(--color-error)" }}>Error: {error}</p>}
|
||||
{status === "ok" && (
|
||||
{status === "loading" && !liveActive && records.length === 0 && (
|
||||
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
|
||||
)}
|
||||
{status === "error" && !liveActive && (
|
||||
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
|
||||
)}
|
||||
{(status === "ok" || liveActive || records.length > 0) && (
|
||||
<div className="space-y-3">
|
||||
{data.records.map((r) => (
|
||||
{records.map((r, i) => (
|
||||
<div
|
||||
key={`${r.type}:${r.role ?? ""}:${r.timestamp ?? 0}:${String(r.content ?? "")}`}
|
||||
key={i}
|
||||
className="p-3 rounded border text-sm"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
@@ -90,7 +119,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
{r.role}
|
||||
</span>
|
||||
)}
|
||||
{r.timestamp && (
|
||||
{r.timestamp !== null && (
|
||||
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
|
||||
{new Date(r.timestamp).toLocaleTimeString()}
|
||||
</span>
|
||||
@@ -106,6 +135,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
<div ref={recordsEndRef} aria-hidden />
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
import {
|
||||
type Dispatch,
|
||||
type MutableRefObject,
|
||||
type SetStateAction,
|
||||
useEffect,
|
||||
useRef,
|
||||
useState,
|
||||
} from "react";
|
||||
|
||||
import type { ThreadRecord } from "./api.ts";
|
||||
|
||||
export type UseSSEReturn = {
|
||||
records: ThreadRecord[];
|
||||
connected: boolean;
|
||||
completed: boolean;
|
||||
};
|
||||
|
||||
function isWorkflowResult(record: ThreadRecord): boolean {
|
||||
return record.type === "workflow-result";
|
||||
}
|
||||
|
||||
function parseRecord(data: string): ThreadRecord | null {
|
||||
try {
|
||||
return JSON.parse(data) as ThreadRecord;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
type RecordEventContext = {
|
||||
cancelled: boolean;
|
||||
completedRef: MutableRefObject<boolean>;
|
||||
setRecords: Dispatch<SetStateAction<ThreadRecord[]>>;
|
||||
setCompleted: (value: boolean) => void;
|
||||
setConnected: (value: boolean) => void;
|
||||
cleanupEs: () => void;
|
||||
};
|
||||
|
||||
function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
|
||||
if (ctx.cancelled) {
|
||||
return;
|
||||
}
|
||||
const msg = ev as MessageEvent;
|
||||
const raw = typeof msg.data === "string" ? msg.data : "";
|
||||
const parsed = parseRecord(raw);
|
||||
if (parsed === null) {
|
||||
return;
|
||||
}
|
||||
ctx.setRecords((prev) => [...prev, parsed]);
|
||||
if (!isWorkflowResult(parsed)) {
|
||||
return;
|
||||
}
|
||||
ctx.completedRef.current = true;
|
||||
ctx.setCompleted(true);
|
||||
ctx.setConnected(false);
|
||||
ctx.cleanupEs();
|
||||
}
|
||||
|
||||
export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
const [records, setRecords] = useState<ThreadRecord[]>([]);
|
||||
const [connected, setConnected] = useState(false);
|
||||
const [completed, setCompleted] = useState(false);
|
||||
|
||||
const completedRef = useRef(false);
|
||||
const reconnectAttemptsRef = useRef(0);
|
||||
|
||||
useEffect(() => {
|
||||
if (threadId === null) {
|
||||
completedRef.current = false;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
setRecords([]);
|
||||
setConnected(false);
|
||||
setCompleted(false);
|
||||
return;
|
||||
}
|
||||
|
||||
const tid = threadId;
|
||||
|
||||
completedRef.current = false;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
setRecords([]);
|
||||
setConnected(false);
|
||||
setCompleted(false);
|
||||
|
||||
let es: EventSource | null = null;
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let cancelled = false;
|
||||
|
||||
function cleanupEs(): void {
|
||||
if (es !== null) {
|
||||
es.close();
|
||||
es = null;
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleReconnect(): void {
|
||||
if (cancelled || completedRef.current) {
|
||||
return;
|
||||
}
|
||||
const delayMs = Math.min(1000 * 2 ** reconnectAttemptsRef.current, 8000);
|
||||
reconnectAttemptsRef.current += 1;
|
||||
reconnectTimer = setTimeout(() => {
|
||||
reconnectTimer = null;
|
||||
if (!cancelled && !completedRef.current) {
|
||||
connect();
|
||||
}
|
||||
}, delayMs);
|
||||
}
|
||||
|
||||
function connect(): void {
|
||||
if (cancelled || completedRef.current) {
|
||||
return;
|
||||
}
|
||||
|
||||
cleanupEs();
|
||||
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
|
||||
es = new EventSource(url);
|
||||
|
||||
es.onopen = () => {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
reconnectAttemptsRef.current = 0;
|
||||
setConnected(true);
|
||||
setRecords([]);
|
||||
};
|
||||
|
||||
es.addEventListener("record", (ev: Event) =>
|
||||
handleRecordEvent(ev, {
|
||||
cancelled,
|
||||
completedRef,
|
||||
setRecords,
|
||||
setCompleted,
|
||||
setConnected,
|
||||
cleanupEs,
|
||||
}),
|
||||
);
|
||||
|
||||
es.onerror = () => {
|
||||
if (cancelled || completedRef.current) {
|
||||
return;
|
||||
}
|
||||
setConnected(false);
|
||||
cleanupEs();
|
||||
scheduleReconnect();
|
||||
};
|
||||
}
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
if (reconnectTimer !== null) {
|
||||
clearTimeout(reconnectTimer);
|
||||
}
|
||||
cleanupEs();
|
||||
};
|
||||
}, [threadId]);
|
||||
|
||||
return { records, connected, completed };
|
||||
}
|
||||
Reference in New Issue
Block a user