From a44f1f34a889e80d60dd379428b3247729837d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Fri, 8 May 2026 17:43:02 +0800 Subject: [PATCH] feat(dashboard): connect thread detail to SSE live stream Add useSSE hook that connects to /api/threads/:id/live via EventSource. Thread detail page now shows records in real-time with auto-scroll. - useSSE hook: EventSource connection, record accumulation, auto-reconnect with exponential backoff, cleanup on unmount - Thread detail: Live badge, SSE-first with fetch fallback, smooth scroll - Records clear on reconnect (server replays full file) Closes #131, testing verified per #133 Refs: #118 --- .../src/components/thread-detail.tsx | 46 ++++- packages/dashboard/src/use-sse.ts | 161 ++++++++++++++++++ 2 files changed, 199 insertions(+), 8 deletions(-) create mode 100644 packages/dashboard/src/use-sse.ts diff --git a/packages/dashboard/src/components/thread-detail.tsx b/packages/dashboard/src/components/thread-detail.tsx index 36da442..27ddd0c 100644 --- a/packages/dashboard/src/components/thread-detail.tsx +++ b/packages/dashboard/src/components/thread-detail.tsx @@ -1,6 +1,7 @@ -import { useState } from "react"; +import { useEffect, useRef, useState } from "react"; import { getThread, killThread, pauseThread, resumeThread } from "../api.ts"; import { useFetch } from "../hooks.ts"; +import { useSSE } from "../use-sse.ts"; type Props = { threadId: string; @@ -8,8 +9,22 @@ type Props = { }; export function ThreadDetail({ threadId, onBack }: Props) { + const sse = useSSE(threadId); const { status, data, error } = useFetch(() => getThread(threadId), [threadId]); const [actionStatus, setActionStatus] = useState(null); + const recordsEndRef = useRef(null); + + const liveActive = sse.connected && !sse.completed; + const records = liveActive + ? sse.records + : status === "ok" + ? data.records + : ([] as typeof sse.records); + + // biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows + useEffect(() => { + recordsEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }, [records.length]); async function handleAction(action: "kill" | "pause" | "resume") { setActionStatus(`${action}ing...`); @@ -61,20 +76,34 @@ export function ThreadDetail({ threadId, onBack }: Props) { -

{threadId}

+

+ {threadId} + {sse.connected && ( + + Live + + )} +

{actionStatus && (

{actionStatus}

)} - {status === "loading" &&

Loading...

} - {status === "error" &&

Error: {error}

} - {status === "ok" && ( + {status === "loading" && !liveActive && records.length === 0 && ( +

Loading...

+ )} + {status === "error" && !liveActive && ( +

Error: {error}

+ )} + {(status === "ok" || liveActive || records.length > 0) && (
- {data.records.map((r) => ( + {records.map((r, i) => (
@@ -90,7 +119,7 @@ export function ThreadDetail({ threadId, onBack }: Props) { {r.role} )} - {r.timestamp && ( + {r.timestamp !== null && ( {new Date(r.timestamp).toLocaleTimeString()} @@ -106,6 +135,7 @@ export function ThreadDetail({ threadId, onBack }: Props) { )}
))} +
)}
diff --git a/packages/dashboard/src/use-sse.ts b/packages/dashboard/src/use-sse.ts new file mode 100644 index 0000000..591b625 --- /dev/null +++ b/packages/dashboard/src/use-sse.ts @@ -0,0 +1,161 @@ +import { + type Dispatch, + type MutableRefObject, + type SetStateAction, + useEffect, + useRef, + useState, +} from "react"; + +import type { ThreadRecord } from "./api.ts"; + +export type UseSSEReturn = { + records: ThreadRecord[]; + connected: boolean; + completed: boolean; +}; + +function isWorkflowResult(record: ThreadRecord): boolean { + return record.type === "workflow-result"; +} + +function parseRecord(data: string): ThreadRecord | null { + try { + return JSON.parse(data) as ThreadRecord; + } catch { + return null; + } +} + +type RecordEventContext = { + cancelled: boolean; + completedRef: MutableRefObject; + setRecords: Dispatch>; + setCompleted: (value: boolean) => void; + setConnected: (value: boolean) => void; + cleanupEs: () => void; +}; + +function handleRecordEvent(ev: Event, ctx: RecordEventContext): void { + if (ctx.cancelled) { + return; + } + const msg = ev as MessageEvent; + const raw = typeof msg.data === "string" ? msg.data : ""; + const parsed = parseRecord(raw); + if (parsed === null) { + return; + } + ctx.setRecords((prev) => [...prev, parsed]); + if (!isWorkflowResult(parsed)) { + return; + } + ctx.completedRef.current = true; + ctx.setCompleted(true); + ctx.setConnected(false); + ctx.cleanupEs(); +} + +export function useSSE(threadId: string | null): UseSSEReturn { + const [records, setRecords] = useState([]); + const [connected, setConnected] = useState(false); + const [completed, setCompleted] = useState(false); + + const completedRef = useRef(false); + const reconnectAttemptsRef = useRef(0); + + useEffect(() => { + if (threadId === null) { + completedRef.current = false; + reconnectAttemptsRef.current = 0; + setRecords([]); + setConnected(false); + setCompleted(false); + return; + } + + const tid = threadId; + + completedRef.current = false; + reconnectAttemptsRef.current = 0; + setRecords([]); + setConnected(false); + setCompleted(false); + + let es: EventSource | null = null; + let reconnectTimer: ReturnType | null = null; + let cancelled = false; + + function cleanupEs(): void { + if (es !== null) { + es.close(); + es = null; + } + } + + function scheduleReconnect(): void { + if (cancelled || completedRef.current) { + return; + } + const delayMs = Math.min(1000 * 2 ** reconnectAttemptsRef.current, 8000); + reconnectAttemptsRef.current += 1; + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + if (!cancelled && !completedRef.current) { + connect(); + } + }, delayMs); + } + + function connect(): void { + if (cancelled || completedRef.current) { + return; + } + + cleanupEs(); + const url = `/api/threads/${encodeURIComponent(tid)}/live`; + es = new EventSource(url); + + es.onopen = () => { + if (cancelled) { + return; + } + reconnectAttemptsRef.current = 0; + setConnected(true); + setRecords([]); + }; + + es.addEventListener("record", (ev: Event) => + handleRecordEvent(ev, { + cancelled, + completedRef, + setRecords, + setCompleted, + setConnected, + cleanupEs, + }), + ); + + es.onerror = () => { + if (cancelled || completedRef.current) { + return; + } + setConnected(false); + cleanupEs(); + scheduleReconnect(); + }; + } + + connect(); + + return () => { + cancelled = true; + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + } + cleanupEs(); + }; + }, [threadId]); + + return { records, connected, completed }; +}