From cdcaff15ab8bccdc7b7ade37bae84283518707d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E6=9C=88?= Date: Fri, 8 May 2026 16:07:02 +0800 Subject: [PATCH] feat(serve+dashboard): write endpoints, SSE live, run dialog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serve API: - POST /api/threads — run a new thread - POST /api/threads/:id/kill — kill thread - POST /api/threads/:id/pause — pause thread - POST /api/threads/:id/resume — resume thread - GET /api/threads/:id/live — SSE stream of thread records Dashboard: - Run Thread dialog (select workflow, enter prompt, set maxRounds) - Thread detail controls (pause/resume/kill buttons) - postJson API helper for write operations 262 tests pass. Refs: #118 --- .../cli-workflow/src/commands/serve/app.ts | 2 + .../src/commands/serve/routes-live.ts | 175 ++++++++++++++++++ .../src/commands/serve/routes-thread.ts | 52 ++++++ packages/dashboard/src/api.ts | 29 +++ packages/dashboard/src/app.tsx | 14 +- .../dashboard/src/components/run-dialog.tsx | 128 +++++++++++++ .../dashboard/src/components/status-bar.tsx | 17 +- .../src/components/thread-detail.tsx | 62 ++++++- 8 files changed, 467 insertions(+), 12 deletions(-) create mode 100644 packages/cli-workflow/src/commands/serve/routes-live.ts create mode 100644 packages/dashboard/src/components/run-dialog.tsx diff --git a/packages/cli-workflow/src/commands/serve/app.ts b/packages/cli-workflow/src/commands/serve/app.ts index 76cadfc..b544c8f 100644 --- a/packages/cli-workflow/src/commands/serve/app.ts +++ b/packages/cli-workflow/src/commands/serve/app.ts @@ -2,6 +2,7 @@ import { Hono } from "hono"; import { cors } from "hono/cors"; import { createCasRoutes } from "./routes-cas.js"; +import { createLiveRoutes } from "./routes-live.js"; import { createThreadRoutes } from "./routes-thread.js"; import { createWorkflowRoutes } from "./routes-workflow.js"; @@ -14,6 +15,7 @@ export function createApp(storageRoot: string): Hono { app.route("/api/workflows", createWorkflowRoutes(storageRoot)); app.route("/api/threads", createThreadRoutes(storageRoot)); + app.route("/api/threads", createLiveRoutes(storageRoot)); app.route("/api/cas", createCasRoutes(storageRoot)); return app; diff --git a/packages/cli-workflow/src/commands/serve/routes-live.ts b/packages/cli-workflow/src/commands/serve/routes-live.ts new file mode 100644 index 0000000..8301ea3 --- /dev/null +++ b/packages/cli-workflow/src/commands/serve/routes-live.ts @@ -0,0 +1,175 @@ +import { watch } from "node:fs"; +import { readFile } from "node:fs/promises"; +import { dirname, join } from "node:path"; +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; + +import { resolveThreadDataPath } from "../../thread-scan.js"; + +type PumpState = { + contentOffset: number; + carry: string; +}; + +function parseJsonLine(line: string): unknown { + try { + return JSON.parse(line) as unknown; + } catch { + return { raw: line }; + } +} + +function isWorkflowResult(record: unknown): boolean { + return ( + record !== null && + typeof record === "object" && + "type" in (record as Record) && + (record as Record).type === "workflow-result" + ); +} + +function parseNewLines(text: string, state: PumpState): string[] { + if (text.length < state.contentOffset) { + state.contentOffset = 0; + state.carry = ""; + } + + const chunk = text.slice(state.contentOffset); + state.contentOffset = text.length; + state.carry += chunk; + + const parts = state.carry.split("\n"); + state.carry = parts.pop() ?? ""; + + const lines: string[] = []; + for (const line of parts) { + const trimmed = line.trim(); + if (trimmed !== "") { + lines.push(trimmed); + } + } + return lines; +} + +export function createLiveRoutes(storageRoot: string): Hono { + const app = new Hono(); + + app.get("/:threadId/live", async (c) => { + const threadId = c.req.param("threadId"); + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return c.json({ error: `thread not found: ${threadId}` }, 404); + } + + const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`); + + return streamSSE(c, async (stream) => { + const dataState: PumpState = { contentOffset: 0, carry: "" }; + const infoState: PumpState = { contentOffset: 0, carry: "" }; + let eventId = 0; + + async function pumpData(): Promise { + let text: string; + try { + text = await readFile(dataPath, "utf8"); + } catch { + return false; + } + + const lines = parseNewLines(text, dataState); + for (const line of lines) { + const record = parseJsonLine(line); + eventId++; + await stream.writeSSE({ + event: "record", + data: JSON.stringify(record), + id: String(eventId), + }); + + if (isWorkflowResult(record)) { + return true; + } + } + return false; + } + + async function pumpInfo(): Promise { + let text: string; + try { + text = await readFile(infoPath, "utf8"); + } catch { + return; + } + + const lines = parseNewLines(text, infoState); + for (const line of lines) { + const record = parseJsonLine(line); + if ( + typeof record === "object" && + record !== null && + "raw" in (record as Record) + ) { + continue; + } + eventId++; + await stream.writeSSE({ + event: "info", + data: JSON.stringify(record), + id: String(eventId), + }); + } + } + + // Initial pump + const done = await pumpData(); + await pumpInfo(); + if (done) { + return; + } + + // Watch for changes + const controller = new AbortController(); + let completed = false; + + const dataWatcher = watch(dataPath, async () => { + if (completed) return; + const finished = await pumpData(); + if (finished) { + completed = true; + controller.abort(); + } + }); + + let infoWatcher: ReturnType | null = null; + try { + infoWatcher = watch(infoPath, async () => { + if (completed) return; + await pumpInfo(); + }); + } catch { + // info file may not exist + } + + stream.onAbort(() => { + completed = true; + dataWatcher.close(); + infoWatcher?.close(); + }); + + // Keep stream alive until completion or client disconnect + await new Promise((resolve) => { + if (completed) { + resolve(); + return; + } + controller.signal.addEventListener("abort", () => resolve(), { once: true }); + stream.onAbort(() => resolve()); + }); + + dataWatcher.close(); + infoWatcher?.close(); + }); + }); + + return app; +} diff --git a/packages/cli-workflow/src/commands/serve/routes-thread.ts b/packages/cli-workflow/src/commands/serve/routes-thread.ts index 573d1a9..32a9113 100644 --- a/packages/cli-workflow/src/commands/serve/routes-thread.ts +++ b/packages/cli-workflow/src/commands/serve/routes-thread.ts @@ -6,6 +6,8 @@ import { listRunningThreads, resolveThreadDataPath, } from "../../thread-scan.js"; +import { cmdKill, cmdPause, cmdResume } from "../thread/control.js"; +import { cmdRun } from "../thread/run.js"; export function createThreadRoutes(storageRoot: string): Hono { const app = new Hono(); @@ -42,5 +44,55 @@ export function createThreadRoutes(storageRoot: string): Hono { return c.json({ threadId, records }); }); + app.post("/", async (c) => { + let body: Record; + try { + body = (await c.req.json()) as Record; + } catch { + return c.json({ error: "invalid JSON body" }, 400); + } + + const name = body.workflow; + const prompt = body.prompt; + const maxRounds = typeof body.maxRounds === "number" ? body.maxRounds : 10; + + if (typeof name !== "string" || typeof prompt !== "string") { + return c.json({ error: "workflow (string) and prompt (string) are required" }, 400); + } + + const result = await cmdRun(storageRoot, name, prompt, maxRounds); + if (!result.ok) { + return c.json({ error: result.error }, 400); + } + return c.json({ threadId: result.value.threadId }, 201); + }); + + app.post("/:threadId/kill", async (c) => { + const threadId = c.req.param("threadId"); + const result = await cmdKill(storageRoot, threadId); + if (!result.ok) { + return c.json({ error: result.error }, 400); + } + return c.json({ ok: true }); + }); + + app.post("/:threadId/pause", async (c) => { + const threadId = c.req.param("threadId"); + const result = await cmdPause(storageRoot, threadId); + if (!result.ok) { + return c.json({ error: result.error }, 400); + } + return c.json({ ok: true }); + }); + + app.post("/:threadId/resume", async (c) => { + const threadId = c.req.param("threadId"); + const result = await cmdResume(storageRoot, threadId); + if (!result.ok) { + return c.json({ error: result.error }, 400); + } + return c.json({ ok: true }); + }); + return app; } diff --git a/packages/dashboard/src/api.ts b/packages/dashboard/src/api.ts index c7ef011..6bf0410 100644 --- a/packages/dashboard/src/api.ts +++ b/packages/dashboard/src/api.ts @@ -1,5 +1,18 @@ const BASE = "/api"; +async function postJson(path: string, body: unknown): Promise { + const res = await fetch(`${BASE}${path}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + if (!res.ok) { + const err = await res.json().catch(() => ({ error: res.statusText })) as { error: string }; + throw new Error(err.error || `API ${res.status}`); + } + return res.json() as Promise; +} + async function fetchJson(path: string): Promise { const res = await fetch(`${BASE}${path}`); if (!res.ok) { @@ -46,6 +59,22 @@ export function getThread(id: string): Promise<{ records: ThreadRecord[] }> { return fetchJson(`/threads/${id}`); } +export function runThread(workflow: string, prompt: string, maxRounds: number = 10): Promise<{ threadId: string }> { + return postJson("/threads", { workflow, prompt, maxRounds }); +} + +export function killThread(threadId: string): Promise<{ ok: boolean }> { + return postJson(`/threads/${threadId}/kill`, {}); +} + +export function pauseThread(threadId: string): Promise<{ ok: boolean }> { + return postJson(`/threads/${threadId}/pause`, {}); +} + +export function resumeThread(threadId: string): Promise<{ ok: boolean }> { + return postJson(`/threads/${threadId}/resume`, {}); +} + export function getHealth(): Promise<{ ok: boolean }> { return fetchJson("/healthz"); } diff --git a/packages/dashboard/src/app.tsx b/packages/dashboard/src/app.tsx index 1da2601..6cf9b9d 100644 --- a/packages/dashboard/src/app.tsx +++ b/packages/dashboard/src/app.tsx @@ -4,18 +4,20 @@ import { ThreadList } from "./components/thread-list.tsx"; import { ThreadDetail } from "./components/thread-detail.tsx"; import { WorkflowList } from "./components/workflow-list.tsx"; import { StatusBar } from "./components/status-bar.tsx"; +import { RunDialog } from "./components/run-dialog.tsx"; type View = "threads" | "workflows"; export function App() { const [view, setView] = useState("threads"); const [selectedThread, setSelectedThread] = useState(null); + const [showRun, setShowRun] = useState(false); return (
- + setShowRun(true)} />
{view === "threads" && !selectedThread && ( @@ -26,6 +28,16 @@ export function App() { {view === "workflows" && }
+ {showRun && ( + setShowRun(false)} + onCreated={(id) => { + setShowRun(false); + setView("threads"); + setSelectedThread(id); + }} + /> + )}
); } diff --git a/packages/dashboard/src/components/run-dialog.tsx b/packages/dashboard/src/components/run-dialog.tsx new file mode 100644 index 0000000..d4362c3 --- /dev/null +++ b/packages/dashboard/src/components/run-dialog.tsx @@ -0,0 +1,128 @@ +import { useState } from "react"; +import { listWorkflows, runThread } from "../api.ts"; +import { useFetch } from "../hooks.ts"; + +type Props = { + onClose: () => void; + onCreated: (threadId: string) => void; +}; + +export function RunDialog({ onClose, onCreated }: Props) { + const workflows = useFetch(() => listWorkflows(), []); + const [workflow, setWorkflow] = useState(""); + const [prompt, setPrompt] = useState(""); + const [maxRounds, setMaxRounds] = useState(10); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState(null); + + async function handleSubmit(e: React.FormEvent) { + e.preventDefault(); + if (!workflow || !prompt) return; + setSubmitting(true); + setError(null); + try { + const result = await runThread(workflow, prompt, maxRounds); + onCreated(result.threadId); + } catch (err) { + setError(err instanceof Error ? err.message : String(err)); + setSubmitting(false); + } + } + + return ( +
+
+

Run Thread

+
+
+ + +
+
+ +