Compare commits

..

3 Commits

Author SHA1 Message Date
xingyue 6b1e728700 fix(serve): error handling, CORS, body limit, CAS store reuse
- Global error handler (app.onError → 500 JSON)
- JSON parse validation on POST routes (400)
- CORS restricted to localhost origins
- 1MB body size limit on POST (413)
- CAS store created once per route group, not per-request
- 6 new tests covering all changes

Closes #120
2026-05-08 18:11:59 +08:00
xiaomo dedab62c49 Merge pull request 'feat(dashboard): connect thread detail to SSE live stream' (#134) from feat/131-dashboard-sse into main 2026-05-08 09:53:10 +00:00
xingyue a44f1f34a8 feat(dashboard): connect thread detail to SSE live stream
Add useSSE hook that connects to /api/threads/:id/live via EventSource.
Thread detail page now shows records in real-time with auto-scroll.

- useSSE hook: EventSource connection, record accumulation, auto-reconnect
  with exponential backoff, cleanup on unmount
- Thread detail: Live badge, SSE-first with fetch fallback, smooth scroll
- Records clear on reconnect (server replays full file)

Closes #131, testing verified per #133
Refs: #118
2026-05-08 17:52:10 +08:00
21 changed files with 417 additions and 107 deletions
@@ -77,6 +77,83 @@ describe("serve /api/cas", () => {
});
});
describe("serve error handling", () => {
test("POST /api/threads with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/cas with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/cas", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/threads with missing required fields → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ foo: "bar" }),
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toContain("required");
});
test("global error handler returns 500 with JSON", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
app.get("/test-error", () => {
throw new Error("boom");
});
const res = await app.fetch(new Request("http://localhost/test-error"));
expect(res.status).toBe(500);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Internal server error");
});
});
describe("serve security", () => {
test("CORS headers present on responses", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
const res2 = await app.fetch(
new Request("http://localhost/healthz", {
headers: { Origin: "http://localhost:5173" },
}),
);
expect(res2.headers.get("Access-Control-Allow-Origin")).toBe("http://localhost:5173");
});
test("POST with body > 1MB → 413", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const largeBody = "x".repeat(1_048_577);
const res = await fetch("/api/cas", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Content-Length": String(largeBody.length),
},
body: largeBody,
});
expect(res.status).toBe(413);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Payload too large");
});
});
describe("serve CAS round-trip", () => {
const tmpDir = `/tmp/uncaged-serve-cas-test-${Date.now()}`;
@@ -6,10 +6,36 @@ import { createLiveRoutes } from "./routes-live.js";
import { createThreadRoutes } from "./routes-thread.js";
import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string): Hono {
const app = new Hono();
app.use("*", cors());
app.onError((_err, c) => {
return c.json({ error: "Internal server error" }, 500);
});
app.use(
"*",
cors({
origin: [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:7860",
"http://127.0.0.1:7860",
],
}),
);
app.use("*", async (c, next) => {
if (c.req.method === "POST") {
const contentLength = c.req.header("content-length");
if (contentLength !== undefined && Number(contentLength) > MAX_BODY_SIZE) {
return c.json({ error: "Payload too large" }, 413);
}
}
await next();
});
app.get("/healthz", (c) => c.json({ ok: true }));
@@ -3,17 +3,15 @@ import { Hono } from "hono";
export function createCasRoutes(storageRoot: string): Hono {
const app = new Hono();
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
app.get("/", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hashes = await cas.list();
return c.json({ hashes });
});
app.get("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const content = await cas.get(c.req.param("hash"));
if (content === null) {
return c.json({ error: "not found" }, 404);
@@ -22,19 +20,20 @@ export function createCasRoutes(storageRoot: string): Hono {
});
app.post("/", async (c) => {
const body = await c.req.json<{ content: string }>();
let body: { content: string };
try {
body = (await c.req.json()) as { content: string };
} catch {
return c.json({ error: "invalid JSON body" }, 400);
}
if (typeof body.content !== "string") {
return c.json({ error: "content field required" }, 400);
}
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = await cas.put(body.content);
return c.json({ hash }, 201);
});
app.delete("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = c.req.param("hash");
const content = await cas.get(hash);
if (content === null) {
@@ -1,6 +1,7 @@
import { useState } from "react";
import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
type Props = {
threadId: string;
@@ -8,8 +9,22 @@ type Props = {
};
export function ThreadDetail({ threadId, onBack }: Props) {
const sse = useSSE(threadId);
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
const liveActive = sse.connected && !sse.completed;
const records = liveActive
? sse.records
: status === "ok"
? data.records
: ([] as typeof sse.records);
// biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows
useEffect(() => {
recordsEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [records.length]);
async function handleAction(action: "kill" | "pause" | "resume") {
setActionStatus(`${action}ing...`);
@@ -61,20 +76,34 @@ export function ThreadDetail({ threadId, onBack }: Props) {
</div>
</div>
<h2 className="text-xl font-semibold mb-2 font-mono">{threadId}</h2>
<h2 className="text-xl font-semibold mb-2 font-mono flex items-center gap-2 flex-wrap">
<span>{threadId}</span>
{sse.connected && (
<span
className="text-xs font-medium px-2 py-0.5 rounded"
style={{ background: "var(--color-success)", color: "var(--color-bg)" }}
>
Live
</span>
)}
</h2>
{actionStatus && (
<p className="text-xs mb-4" style={{ color: "var(--color-text-muted)" }}>
{actionStatus}
</p>
)}
{status === "loading" && <p style={{ color: "var(--color-text-muted)" }}>Loading...</p>}
{status === "error" && <p style={{ color: "var(--color-error)" }}>Error: {error}</p>}
{status === "ok" && (
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
{status === "error" && !liveActive && (
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{data.records.map((r) => (
{records.map((r, i) => (
<div
key={`${r.type}:${r.role ?? ""}:${r.timestamp ?? 0}:${String(r.content ?? "")}`}
key={i}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
@@ -90,7 +119,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
{r.role}
</span>
)}
{r.timestamp && (
{r.timestamp !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{new Date(r.timestamp).toLocaleTimeString()}
</span>
@@ -106,6 +135,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
</div>
))}
<div ref={recordsEndRef} aria-hidden />
</div>
)}
</div>
+161
View File
@@ -0,0 +1,161 @@
import {
type Dispatch,
type MutableRefObject,
type SetStateAction,
useEffect,
useRef,
useState,
} from "react";
import type { ThreadRecord } from "./api.ts";
export type UseSSEReturn = {
records: ThreadRecord[];
connected: boolean;
completed: boolean;
};
function isWorkflowResult(record: ThreadRecord): boolean {
return record.type === "workflow-result";
}
function parseRecord(data: string): ThreadRecord | null {
try {
return JSON.parse(data) as ThreadRecord;
} catch {
return null;
}
}
type RecordEventContext = {
cancelled: boolean;
completedRef: MutableRefObject<boolean>;
setRecords: Dispatch<SetStateAction<ThreadRecord[]>>;
setCompleted: (value: boolean) => void;
setConnected: (value: boolean) => void;
cleanupEs: () => void;
};
function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
if (ctx.cancelled) {
return;
}
const msg = ev as MessageEvent;
const raw = typeof msg.data === "string" ? msg.data : "";
const parsed = parseRecord(raw);
if (parsed === null) {
return;
}
ctx.setRecords((prev) => [...prev, parsed]);
if (!isWorkflowResult(parsed)) {
return;
}
ctx.completedRef.current = true;
ctx.setCompleted(true);
ctx.setConnected(false);
ctx.cleanupEs();
}
export function useSSE(threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
const completedRef = useRef(false);
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
return;
}
const tid = threadId;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
let es: EventSource | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let cancelled = false;
function cleanupEs(): void {
if (es !== null) {
es.close();
es = null;
}
}
function scheduleReconnect(): void {
if (cancelled || completedRef.current) {
return;
}
const delayMs = Math.min(1000 * 2 ** reconnectAttemptsRef.current, 8000);
reconnectAttemptsRef.current += 1;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (!cancelled && !completedRef.current) {
connect();
}
}, delayMs);
}
function connect(): void {
if (cancelled || completedRef.current) {
return;
}
cleanupEs();
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
es = new EventSource(url);
es.onopen = () => {
if (cancelled) {
return;
}
reconnectAttemptsRef.current = 0;
setConnected(true);
setRecords([]);
};
es.addEventListener("record", (ev: Event) =>
handleRecordEvent(ev, {
cancelled,
completedRef,
setRecords,
setCompleted,
setConnected,
cleanupEs,
}),
);
es.onerror = () => {
if (cancelled || completedRef.current) {
return;
}
setConnected(false);
cleanupEs();
scheduleReconnect();
};
}
connect();
return () => {
cancelled = true;
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
}
cleanupEs();
};
}, [threadId]);
return { records, connected, completed };
}
@@ -0,0 +1,2 @@
export type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export { validateWorkflowDescriptor } from "./workflow-descriptor.js";
@@ -0,0 +1,13 @@
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
@@ -0,0 +1,40 @@
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
@@ -0,0 +1 @@
export type { CasStore } from "./types.js";
@@ -0,0 +1,6 @@
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
@@ -1,10 +1,10 @@
import type * as z from "zod/v4";
import type { CasStore } from "../cas/types.js";
import {
type AgentBinding,
type AgentContext,
type AgentFn,
type CasStore,
END,
type ExtractContext,
type ModeratorContext,
@@ -18,7 +18,8 @@ import {
type WorkflowDefinition,
type WorkflowFn,
type WorkflowRuntime,
} from "./types.js";
} from "../types.js";
import { mergeRefsWithContentHash } from "../util/index.js";
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
@@ -96,11 +97,10 @@ async function advanceOneRound<M extends RoleMeta>(
);
const contentHash = await putContentBlob(runtime.cas, raw);
const refsFromMeta = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
);
const refs = refsFromMeta.includes(contentHash) ? refsFromMeta : [...refsFromMeta, contentHash];
const step = {
role: next,
@@ -0,0 +1 @@
export { createWorkflow } from "./create-workflow.js";
@@ -0,0 +1 @@
export type { ExtractFn } from "./types.js";
@@ -0,0 +1,9 @@
import type * as z from "zod/v4";
import type { ExtractContext } from "../types.js";
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
+11 -8
View File
@@ -1,16 +1,20 @@
export { createWorkflow } from "./create-workflow.js";
export { err, ok } from "./result.js";
export type {
WorkflowDescriptor,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
} from "./bundle/types.js";
export { validateWorkflowDescriptor } from "./bundle/workflow-descriptor.js";
export type { CasStore } from "./cas/index.js";
export { createWorkflow } from "./engine/index.js";
export type { ExtractFn } from "./extract/index.js";
export type {
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
@@ -19,11 +23,10 @@ export type {
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
export type { Result } from "./util/index.js";
export { err, ok } from "./util/index.js";
+3 -35
View File
@@ -1,33 +1,12 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas/index.js";
import type { ExtractFn } from "./extract/types.js";
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
/** JSON Schema fragment describing one role's `meta` shape (subset supported by code generation). */
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
/** Workflow metadata exported as `export const descriptor` from `.esm.js` bundles. */
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
/** Expected success/failure outcome without throwing for recoverable errors. */
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
/** Maps role names → their meta types. Single generic drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
@@ -117,12 +96,6 @@ export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
};
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
/** Raw string output from an LLM/CLI adapter; meta is extracted by the engine. */
export type AgentFn = (ctx: AgentContext) => Promise<string>;
@@ -158,8 +131,3 @@ export type WorkflowDefinition<M extends RoleMeta> = {
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
/** Internal outcome of advancing one moderator round inside {@link createWorkflow}. */
export type AdvanceOutcome<M extends RoleMeta> =
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
@@ -0,0 +1,3 @@
export { mergeRefsWithContentHash } from "./refs-field.js";
export { err, ok } from "./result.js";
export type { Result } from "./types.js";
@@ -0,0 +1,8 @@
/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */
export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] {
const out = [...refs];
if (!out.includes(contentHash)) {
out.push(contentHash);
}
return out;
}
@@ -0,0 +1 @@
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
@@ -1,40 +1 @@
import { err, ok, type Result } from "../util/index.js";
import type { WorkflowDescriptor, WorkflowRoleDescriptor, WorkflowRoleSchema } from "./types.js";
export function validateWorkflowDescriptor(value: unknown): Result<WorkflowDescriptor, string> {
if (value === null || typeof value !== "object" || Array.isArray(value)) {
return err("descriptor must be a non-array object");
}
const root = value as Record<string, unknown>;
const description = root.description;
if (typeof description !== "string") {
return err("descriptor.description must be a string");
}
const rolesRaw = root.roles;
if (rolesRaw === null || typeof rolesRaw !== "object" || Array.isArray(rolesRaw)) {
return err("descriptor.roles must be a non-array object");
}
const roles: Record<string, WorkflowRoleDescriptor> = {};
for (const [roleName, specUnknown] of Object.entries(rolesRaw)) {
if (specUnknown === null || typeof specUnknown !== "object" || Array.isArray(specUnknown)) {
return err(`descriptor.roles.${roleName} must be a non-array object`);
}
const spec = specUnknown as Record<string, unknown>;
const roleDesc = spec.description;
if (typeof roleDesc !== "string") {
return err(`descriptor.roles.${roleName}.description must be a string`);
}
const schema = spec.schema;
if (schema === null || typeof schema !== "object" || Array.isArray(schema)) {
return err(`descriptor.roles.${roleName}.schema must be a non-array object`);
}
roles[roleName] = {
description: roleDesc,
schema: schema as WorkflowRoleSchema,
};
}
return ok({ description, roles });
}
export { validateWorkflowDescriptor } from "@uncaged/workflow-runtime";