Compare commits

..

3 Commits

Author SHA1 Message Date
xingyue 61fc1cfe1b perf(serve): SSE live pump reads incrementally instead of full file
Use Bun.file().slice() to read only new bytes from the last known
offset instead of re-reading the entire JSONL file on every fs watch.

- readNewBytes() helper with byte-offset tracking
- Handles file truncation (resets offset)
- Early return when no new data

Closes #130
2026-05-08 18:14:04 +08:00
xiaomo dedab62c49 Merge pull request 'feat(dashboard): connect thread detail to SSE live stream' (#134) from feat/131-dashboard-sse into main 2026-05-08 09:53:10 +00:00
xingyue a44f1f34a8 feat(dashboard): connect thread detail to SSE live stream
Add useSSE hook that connects to /api/threads/:id/live via EventSource.
Thread detail page now shows records in real-time with auto-scroll.

- useSSE hook: EventSource connection, record accumulation, auto-reconnect
  with exponential backoff, cleanup on unmount
- Thread detail: Live badge, SSE-first with fetch fallback, smooth scroll
- Records clear on reconnect (server replays full file)

Closes #131, testing verified per #133
Refs: #118
2026-05-08 17:52:10 +08:00
19 changed files with 343 additions and 113 deletions
@@ -1,5 +1,4 @@
import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { statSync, watch } from "node:fs";
import { dirname, join } from "node:path";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
@@ -11,6 +10,30 @@ type PumpState = {
carry: string;
};
function fileSize(path: string): number {
try {
return statSync(path).size;
} catch {
return 0;
}
}
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
const size = fileSize(path);
if (size < state.contentOffset) {
// File was truncated — reset
state.contentOffset = 0;
state.carry = "";
}
if (size <= state.contentOffset) {
return null;
}
const blob = Bun.file(path).slice(state.contentOffset, size);
const chunk = await blob.text();
state.contentOffset = size;
return chunk;
}
function parseJsonLine(line: string): unknown {
try {
return JSON.parse(line) as unknown;
@@ -28,14 +51,7 @@ function isWorkflowResult(record: unknown): boolean {
);
}
function parseNewLines(text: string, state: PumpState): string[] {
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
function parseNewLines(chunk: string, state: PumpState): string[] {
state.carry += chunk;
const parts = state.carry.split("\n");
@@ -70,14 +86,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
let eventId = 0;
async function pumpData(): Promise<boolean> {
let text: string;
let chunk: string | null;
try {
text = await readFile(resolvedDataPath, "utf8");
chunk = await readNewBytes(resolvedDataPath, dataState);
} catch {
return false;
}
if (chunk === null) {
return false;
}
const lines = parseNewLines(text, dataState);
const lines = parseNewLines(chunk, dataState);
for (const line of lines) {
const record = parseJsonLine(line);
eventId++;
@@ -95,14 +114,17 @@ export function createLiveRoutes(storageRoot: string): Hono {
}
async function pumpInfo(): Promise<void> {
let text: string;
let chunk: string | null;
try {
text = await readFile(infoPath, "utf8");
chunk = await readNewBytes(infoPath, infoState);
} catch {
return;
}
if (chunk === null) {
return;
}
const lines = parseNewLines(text, infoState);
const lines = parseNewLines(chunk, infoState);
for (const line of lines) {
const record = parseJsonLine(line);
if (
@@ -1,6 +1,7 @@
import { useState } from "react";
import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
type Props = {
threadId: string;
@@ -8,8 +9,22 @@ type Props = {
};
export function ThreadDetail({ threadId, onBack }: Props) {
const sse = useSSE(threadId);
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
const liveActive = sse.connected && !sse.completed;
const records = liveActive
? sse.records
: status === "ok"
? data.records
: ([] as typeof sse.records);
// biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows
useEffect(() => {
recordsEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [records.length]);
async function handleAction(action: "kill" | "pause" | "resume") {
setActionStatus(`${action}ing...`);
@@ -61,20 +76,34 @@ export function ThreadDetail({ threadId, onBack }: Props) {
</div>
</div>
<h2 className="text-xl font-semibold mb-2 font-mono">{threadId}</h2>
<h2 className="text-xl font-semibold mb-2 font-mono flex items-center gap-2 flex-wrap">
<span>{threadId}</span>
{sse.connected && (
<span
className="text-xs font-medium px-2 py-0.5 rounded"
style={{ background: "var(--color-success)", color: "var(--color-bg)" }}
>
Live
</span>
)}
</h2>
{actionStatus && (
<p className="text-xs mb-4" style={{ color: "var(--color-text-muted)" }}>
{actionStatus}
</p>
)}
{status === "loading" && <p style={{ color: "var(--color-text-muted)" }}>Loading...</p>}
{status === "error" && <p style={{ color: "var(--color-error)" }}>Error: {error}</p>}
{status === "ok" && (
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
{status === "error" && !liveActive && (
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{data.records.map((r) => (
{records.map((r, i) => (
<div
key={`${r.type}:${r.role ?? ""}:${r.timestamp ?? 0}:${String(r.content ?? "")}`}
key={i}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
@@ -90,7 +119,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
{r.role}
</span>
)}
{r.timestamp && (
{r.timestamp !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{new Date(r.timestamp).toLocaleTimeString()}
</span>
@@ -106,6 +135,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
</div>
))}
<div ref={recordsEndRef} aria-hidden />
</div>
)}
</div>
+161
View File
@@ -0,0 +1,161 @@
import {
type Dispatch,
type MutableRefObject,
type SetStateAction,
useEffect,
useRef,
useState,
} from "react";
import type { ThreadRecord } from "./api.ts";
export type UseSSEReturn = {
records: ThreadRecord[];
connected: boolean;
completed: boolean;
};
function isWorkflowResult(record: ThreadRecord): boolean {
return record.type === "workflow-result";
}
function parseRecord(data: string): ThreadRecord | null {
try {
return JSON.parse(data) as ThreadRecord;
} catch {
return null;
}
}
type RecordEventContext = {
cancelled: boolean;
completedRef: MutableRefObject<boolean>;
setRecords: Dispatch<SetStateAction<ThreadRecord[]>>;
setCompleted: (value: boolean) => void;
setConnected: (value: boolean) => void;
cleanupEs: () => void;
};
function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
if (ctx.cancelled) {
return;
}
const msg = ev as MessageEvent;
const raw = typeof msg.data === "string" ? msg.data : "";
const parsed = parseRecord(raw);
if (parsed === null) {
return;
}
ctx.setRecords((prev) => [...prev, parsed]);
if (!isWorkflowResult(parsed)) {
return;
}
ctx.completedRef.current = true;
ctx.setCompleted(true);
ctx.setConnected(false);
ctx.cleanupEs();
}
export function useSSE(threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
const completedRef = useRef(false);
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
return;
}
const tid = threadId;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
let es: EventSource | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let cancelled = false;
function cleanupEs(): void {
if (es !== null) {
es.close();
es = null;
}
}
function scheduleReconnect(): void {
if (cancelled || completedRef.current) {
return;
}
const delayMs = Math.min(1000 * 2 ** reconnectAttemptsRef.current, 8000);
reconnectAttemptsRef.current += 1;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (!cancelled && !completedRef.current) {
connect();
}
}, delayMs);
}
function connect(): void {
if (cancelled || completedRef.current) {
return;
}
cleanupEs();
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
es = new EventSource(url);
es.onopen = () => {
if (cancelled) {
return;
}
reconnectAttemptsRef.current = 0;
setConnected(true);
setRecords([]);
};
es.addEventListener("record", (ev: Event) =>
handleRecordEvent(ev, {
cancelled,
completedRef,
setRecords,
setCompleted,
setConnected,
cleanupEs,
}),
);
es.onerror = () => {
if (cancelled || completedRef.current) {
return;
}
setConnected(false);
cleanupEs();
scheduleReconnect();
};
}
connect();
return () => {
cancelled = true;
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
}
cleanupEs();
};
}, [threadId]);
return { records, connected, completed };
}
@@ -0,0 +1,2 @@
export type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export { validateWorkflowDescriptor } from "./workflow-descriptor.js";
@@ -0,0 +1,13 @@
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
@@ -0,0 +1,40 @@
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
@@ -0,0 +1 @@
export type { CasStore } from "./types.js";
@@ -0,0 +1,6 @@
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
@@ -1,10 +1,10 @@
import type * as z from "zod/v4";
import type { CasStore } from "../cas/types.js";
import {
type AgentBinding,
type AgentContext,
type AgentFn,
type CasStore,
END,
type ExtractContext,
type ModeratorContext,
@@ -18,7 +18,8 @@ import {
type WorkflowDefinition,
type WorkflowFn,
type WorkflowRuntime,
} from "./types.js";
} from "../types.js";
import { mergeRefsWithContentHash } from "../util/index.js";
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
@@ -96,11 +97,10 @@ async function advanceOneRound<M extends RoleMeta>(
);
const contentHash = await putContentBlob(runtime.cas, raw);
const refsFromMeta = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
);
const refs = refsFromMeta.includes(contentHash) ? refsFromMeta : [...refsFromMeta, contentHash];
const step = {
role: next,
@@ -0,0 +1 @@
export { createWorkflow } from "./create-workflow.js";
@@ -0,0 +1 @@
export type { ExtractFn } from "./types.js";
@@ -0,0 +1,9 @@
import type * as z from "zod/v4";
import type { ExtractContext } from "../types.js";
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
+11 -8
View File
@@ -1,16 +1,20 @@
export { createWorkflow } from "./create-workflow.js";
export { err, ok } from "./result.js";
export type {
WorkflowDescriptor,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
} from "./bundle/types.js";
export { validateWorkflowDescriptor } from "./bundle/workflow-descriptor.js";
export type { CasStore } from "./cas/index.js";
export { createWorkflow } from "./engine/index.js";
export type { ExtractFn } from "./extract/index.js";
export type {
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
@@ -19,11 +23,10 @@ export type {
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
export type { Result } from "./util/index.js";
export { err, ok } from "./util/index.js";
+3 -35
View File
@@ -1,33 +1,12 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas/index.js";
import type { ExtractFn } from "./extract/types.js";
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
/** Expected success/failure outcome without throwing for recoverable errors. */
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
/** Maps role names → their meta types. Single generic drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
@@ -117,12 +96,6 @@ export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
};
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
/** Raw string output from an LLM/CLI adapter; meta is extracted by the engine. */
export type AgentFn = (ctx: AgentContext) => Promise<string>;
@@ -158,8 +131,3 @@ export type WorkflowDefinition<M extends RoleMeta> = {
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
/** Internal outcome of advancing one moderator round inside {@link createWorkflow}. */
export type AdvanceOutcome<M extends RoleMeta> =
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
@@ -0,0 +1,3 @@
export { mergeRefsWithContentHash } from "./refs-field.js";
export { err, ok } from "./result.js";
export type { Result } from "./types.js";
@@ -0,0 +1,8 @@
/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */
export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] {
const out = [...refs];
if (!out.includes(contentHash)) {
out.push(contentHash);
}
return out;
}
@@ -0,0 +1 @@
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
@@ -1,40 +1 @@
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
export { validateWorkflowDescriptor } from "@uncaged/workflow-runtime";