Merge pull request 'refactor(serve): remove tunnel + eliminate HTTP round-trip in gateway mode' (#245) from refactor/serve-remove-http-tunnel into main

This commit is contained in:
2026-05-13 15:29:05 +00:00
29 changed files with 224 additions and 269 deletions
@@ -2,14 +2,14 @@ import { describe, expect, test } from "bun:test";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow-cas";
import { createApp } from "../src/commands/serve/app.js";
import { createApp } from "../src/commands/connect/app.js";
function casStoredForm(raw: string): string {
return serializeMerkleNode(createContentMerkleNode(raw));
}
function buildApp(storageRoot: string) {
const app = createApp(storageRoot);
const app = createApp(storageRoot, null);
return {
fetch: (path: string, init?: RequestInit) =>
app.fetch(new Request(`http://localhost${path}`, init)),
@@ -115,7 +115,7 @@ describe("serve error handling", () => {
});
test("global error handler returns 500 with JSON", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
const app = createApp("/tmp/uncaged-serve-test-nonexistent", null);
app.get("/test-error", () => {
throw new Error("boom");
});
@@ -128,7 +128,7 @@ describe("serve error handling", () => {
describe("serve security", () => {
test("CORS headers present on responses", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
const app = createApp("/tmp/uncaged-serve-test-nonexistent", null);
const res2 = await app.fetch(
new Request("http://localhost/healthz", {
headers: { Origin: "http://localhost:5173" },
+2 -2
View File
@@ -4,7 +4,7 @@ import { getCommandRegistry } from "./cli-registry.js";
import { formatCliUsage as formatCliUsageWithGroups } from "./cli-usage.js";
import { createCasDispatcher } from "./commands/cas/index.js";
import { createInitDispatcher } from "./commands/init/index.js";
import { dispatchServe } from "./commands/serve/index.js";
import { dispatchConnect } from "./commands/connect/index.js";
import { dispatchSetup } from "./commands/setup/index.js";
import { createThreadDispatcher, dispatchLive, dispatchRun } from "./commands/thread/index.js";
import { createWorkflowDispatcher } from "./commands/workflow/index.js";
@@ -71,7 +71,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
skill: dispatchSkill,
run: dispatchRun,
live: dispatchLive,
serve: dispatchServe,
connect: dispatchConnect,
};
export async function runCli(storageRoot: string, argv: string[]): Promise<number> {
+3 -3
View File
@@ -59,12 +59,12 @@ export function formatCliUsage(
);
lines.push("");
lines.push("Server:");
lines.push("Gateway:");
lines.push(
...formatUsageCommandLines([
{
prefix: "serve [--port N] [--host ADDR]",
description: "Start HTTP API server (default: 127.0.0.1:7860)",
prefix: "connect [--name NAME] [--gateway URL]",
description: "Connect to workflow gateway via WebSocket",
},
]),
);
@@ -8,7 +8,7 @@ import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string, agentToken: string | null): Hono {
export function createApp(storageRoot: string, clientToken: string | null): Hono {
const app = new Hono();
app.onError((_err, c) => {
@@ -37,11 +37,11 @@ export function createApp(storageRoot: string, agentToken: string | null): Hono
await next();
});
// ── Agent token auth (skip healthz) ───────────────────────────────
if (agentToken !== null) {
// ── Client token auth (skip healthz) ───────────────────────────────
if (clientToken !== null) {
app.use("/api/*", async (c, next) => {
const token = c.req.header("X-Agent-Token");
if (token !== agentToken) {
const token = c.req.header("X-Client-Token");
if (token !== clientToken) {
return c.json({ error: "unauthorized" }, 401);
}
await next();
@@ -1,63 +1,30 @@
import { randomUUID } from "node:crypto";
import { hostname as osHostname } from "node:os";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { createLogger } from "@uncaged/workflow-util";
import { serve } from "bun";
import { printCliLine } from "../../cli-output.js";
import { createApp } from "./app.js";
import { registerWithGateway, startHeartbeat, unregisterFromGateway } from "./gateway.js";
import type { ServeOptions } from "./types.js";
import type { ConnectOptions } from "./types.js";
import { startGatewayWsClient } from "./ws-client.js";
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
const HEARTBEAT_INTERVAL_MS = 60_000;
export function startServer(
storageRoot: string,
options: ServeOptions,
agentToken: string | null,
): void {
const app = createApp(storageRoot, agentToken);
const server = serve({
fetch: app.fetch,
port: options.port,
hostname: options.hostname,
});
printCliLine(`uncaged-workflow API server listening on http://${server.hostname}:${server.port}`);
}
function parsePortValue(value: string | undefined): Result<number, string> {
if (value === undefined) {
return err("--port requires a value");
}
const parsed = Number.parseInt(value, 10);
if (!Number.isFinite(parsed) || parsed < 0 || parsed > 65535) {
return err(`invalid port: ${value}`);
}
return ok(parsed);
}
function requireNextArg(argv: string[], i: number, flag: string): Result<string, string> {
const next = argv[i + 1];
if (next === undefined) {
return err(`${flag} requires a value`);
return { ok: false, error: `${flag} requires a value` };
}
return ok(next);
}
function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
let port = 7860;
let hostname = "127.0.0.1";
function parseConnectArgv(argv: string[]): Result<ConnectOptions, string> {
let name = osHostname().split(".")[0].toLowerCase();
let gatewayUrl = DEFAULT_GATEWAY_URL;
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
const stringFlags: Record<string, (v: string) => void> = {
"--host": (v) => {
hostname = v;
},
"--name": (v) => {
name = v;
},
@@ -68,12 +35,7 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === "--port" || arg === "-p") {
const portResult = parsePortValue(argv[i + 1]);
if (!portResult.ok) return portResult;
port = portResult.value;
i++;
} else if (arg in stringFlags) {
if (arg in stringFlags) {
const r = requireNextArg(argv, i, arg);
if (!r.ok) return r;
stringFlags[arg](r.value);
@@ -81,11 +43,11 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
}
}
return ok({ port, hostname, name, gatewayUrl, gatewaySecret });
return ok({ name, gatewayUrl, gatewaySecret });
}
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseServeArgv(argv);
export async function dispatchConnect(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseConnectArgv(argv);
if (!parsed.ok) {
printCliLine(`error: ${parsed.error}`);
return 1;
@@ -94,36 +56,31 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
const options = parsed.value;
if (options.gatewaySecret === "") {
// No gateway — local-only mode
startServer(storageRoot, options, null);
printCliLine("no WORKFLOW_GATEWAY_SECRET — running in local-only mode");
await new Promise(() => {});
return 0;
printCliLine("error: WORKFLOW_GATEWAY_SECRET is required");
return 1;
}
const agentToken = randomUUID();
startServer(storageRoot, options, agentToken);
const clientToken = randomUUID();
const app = createApp(storageRoot, clientToken);
// Start WebSocket reverse connection to gateway
const log = createLogger({ sink: { kind: "stderr" } });
const stopWsClient = startGatewayWsClient({
gatewayUrl: options.gatewayUrl,
name: options.name,
secret: options.gatewaySecret,
localPort: options.port,
appFetch: app.fetch,
log,
});
printCliLine("connected to gateway via WebSocket");
// Register with gateway for discovery
const localUrl = `http://127.0.0.1:${options.port}`;
const registered = await registerWithGateway(
options.gatewayUrl,
options.name,
localUrl,
`ws://${options.name}`,
options.gatewaySecret,
agentToken,
clientToken,
);
if (registered) {
printCliLine(`registered with gateway as "${options.name}"`);
@@ -132,9 +89,9 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
const heartbeatTimer = startHeartbeat(
options.gatewayUrl,
options.name,
localUrl,
`ws://${options.name}`,
options.gatewaySecret,
agentToken,
clientToken,
HEARTBEAT_INTERVAL_MS,
);
@@ -5,13 +5,13 @@ export async function registerWithGateway(
name: string,
localUrl: string,
secret: string,
agentToken: string,
clientToken: string,
): Promise<boolean> {
try {
const resp = await fetch(`${gatewayUrl}/api/gateway/register`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name, url: localUrl, secret, agentToken }),
body: JSON.stringify({ name, url: localUrl, secret, clientToken }),
});
if (!resp.ok) {
const body = await resp.text();
@@ -45,10 +45,10 @@ export function startHeartbeat(
name: string,
localUrl: string,
secret: string,
agentToken: string,
clientToken: string,
intervalMs: number,
): ReturnType<typeof setInterval> {
return setInterval(() => {
registerWithGateway(gatewayUrl, name, localUrl, secret, agentToken).catch(() => {});
registerWithGateway(gatewayUrl, name, localUrl, secret, clientToken).catch(() => {});
}, intervalMs);
}
@@ -0,0 +1,2 @@
export { dispatchConnect } from "./connect.js";
export type { ConnectOptions } from "./types.js";
@@ -1,6 +1,4 @@
export type ServeOptions = {
port: number;
hostname: string;
export type ConnectOptions = {
name: string;
gatewayUrl: string;
gatewaySecret: string;
@@ -5,7 +5,7 @@ export type GatewayWsClientParams = {
gatewayUrl: string;
name: string;
secret: string;
localPort: number;
appFetch: (request: Request) => Response | Promise<Response>;
log: LogFn;
};
@@ -44,20 +44,17 @@ async function handleGatewayMessage(
params.log("ZM8K2PQ1", "gateway WebSocket dropped non-request message");
return;
}
const localUrl = `http://127.0.0.1:${String(params.localPort)}${req.path}`;
const initHeaders = new Headers();
for (const [k, v] of Object.entries(req.headers)) {
initHeaders.set(k, v);
}
const localUrl = `http://localhost${req.path}`;
const headers = new Headers(req.headers);
let resp: Response;
try {
resp = await fetch(localUrl, {
resp = await params.appFetch(new Request(localUrl, {
method: req.method,
headers: initHeaders,
headers,
body: req.body === null ? undefined : req.body,
});
}));
} catch (e) {
params.log("R4N7BQ3C", `local proxy fetch failed: ${String(e)}`);
params.log("R4N7BQ3C", `app.fetch failed: ${String(e)}`);
const errBody: WsResponse = {
id: req.id,
status: 502,
@@ -1,3 +0,0 @@
export { createApp } from "./app.js";
export { dispatchServe, startServer } from "./serve.js";
export type { ServeOptions } from "./types.js";
+2 -2
View File
@@ -86,11 +86,11 @@ ${commandSections.join("\n\n")}
| \`run\` | \`thread run\` | Shortcut to start a thread |
| \`live\` | \`thread live\` | Shortcut to attach to a thread |
### serve
### connect
| Command | Args | Description |
|---------|------|-------------|
| \`serve\` | \`[--port N] [--host ADDR] [--name NAME]\` | Start HTTP API server with WebSocket gateway connection. \`--name\` registers with the gateway. |
| \`connect\` | \`[--name NAME] [--gateway URL]\` | Connect to workflow gateway via WebSocket. \`--name\` registers with the gateway. |
## Typical Workflow
+28 -28
View File
@@ -26,11 +26,11 @@ function authHeaders(): Record<string, string> {
return {};
}
function agentBase(agent: string): string {
function clientBase(client: string): string {
if (GATEWAY_URL) {
return `${GATEWAY_URL}/api/agents/${agent}`;
return `${GATEWAY_URL}/api/clients/${client}`;
}
// Local dev: proxy via vite, no agent prefix
// Local dev: proxy via vite, no client prefix
return "/api";
}
@@ -57,7 +57,7 @@ async function fetchJson<T>(base: string, path: string): Promise<T> {
// ── Endpoint types ──────────────────────────────────────────────────
export type AgentEndpoint = {
export type ClientEndpoint = {
name: string;
url: string;
status: string;
@@ -141,61 +141,61 @@ export type WorkflowDetail = {
// ── Gateway endpoints ───────────────────────────────────────────────
export function listAgents(): Promise<AgentEndpoint[]> {
export function listClients(): Promise<ClientEndpoint[]> {
const url = GATEWAY_URL || "";
return fetchJson(url, "/api/gateway/endpoints");
}
// ── Agent-scoped endpoints ──────────────────────────────────────────
// ── Client-scoped endpoints ──────────────────────────────────────────
export function listWorkflows(agent: string): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson(agentBase(agent), "/workflows");
export function listWorkflows(client: string): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson(clientBase(client), "/workflows");
}
export async function getWorkflowDetail(agent: string, name: string): Promise<WorkflowDetail> {
return fetchJson<WorkflowDetail>(agentBase(agent), `/workflows/${encodeURIComponent(name)}`);
export async function getWorkflowDetail(client: string, name: string): Promise<WorkflowDetail> {
return fetchJson<WorkflowDetail>(clientBase(client), `/workflows/${encodeURIComponent(name)}`);
}
export async function getWorkflowDescriptor(
agent: string,
client: string,
name: string,
): Promise<WorkflowDescriptor | null> {
const res = await getWorkflowDetail(agent, name);
const res = await getWorkflowDetail(client, name);
return res.descriptor;
}
export function listThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads");
export function listThreads(client: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(clientBase(client), "/threads");
}
export function listRunningThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads/running");
export function listRunningThreads(client: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(clientBase(client), "/threads/running");
}
export function getThread(agent: string, id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(agentBase(agent), `/threads/${id}`);
export function getThread(client: string, id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(clientBase(client), `/threads/${id}`);
}
export function runThread(
agent: string,
client: string,
workflow: string,
prompt: string,
): Promise<{ threadId: string }> {
return postJson(agentBase(agent), "/threads", { workflow, prompt });
return postJson(clientBase(client), "/threads", { workflow, prompt });
}
export function killThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/kill`, {});
export function killThread(client: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(clientBase(client), `/threads/${threadId}/kill`, {});
}
export function pauseThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/pause`, {});
export function pauseThread(client: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(clientBase(client), `/threads/${threadId}/pause`, {});
}
export function resumeThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/resume`, {});
export function resumeThread(client: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(clientBase(client), `/threads/${threadId}/resume`, {});
}
export function getAgentHealth(agent: string): Promise<{ ok: boolean }> {
return fetchJson(agentBase(agent), "/healthz");
export function getClientHealth(client: string): Promise<{ ok: boolean }> {
return fetchJson(clientBase(client), "/healthz");
}
+13 -13
View File
@@ -11,7 +11,7 @@ import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const [authed, setAuthed] = useState(hasApiKey());
const { view, agent, threadId, setView, setAgent, setThreadId } = useHashRoute();
const { view, client, threadId, setView, setClient, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
if (!authed) {
@@ -22,36 +22,36 @@ export function App() {
<div className="flex h-screen">
<Sidebar
view={view}
agent={agent}
client={client}
onViewChange={setView}
onAgentChange={setAgent}
onClientChange={setClient}
onLogout={() => {
clearApiKey();
setAuthed(false);
}}
/>
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar agent={agent} onRun={() => setShowRun(true)} />
<StatusBar client={client} onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{!agent && (
{!client && (
<div className="flex items-center justify-center h-full">
<p style={{ color: "var(--color-text-muted)" }}>
Select an agent from the sidebar to get started.
Select an client from the sidebar to get started.
</p>
</div>
)}
{agent && view === "threads" && threadId === null && (
<ThreadList agent={agent} onSelect={setThreadId} />
{client && view === "threads" && threadId === null && (
<ThreadList client={client} onSelect={setThreadId} />
)}
{agent && view === "threads" && threadId !== null && (
<ThreadDetail agent={agent} threadId={threadId} onBack={() => setThreadId(null)} />
{client && view === "threads" && threadId !== null && (
<ThreadDetail client={client} threadId={threadId} onBack={() => setThreadId(null)} />
)}
{agent && view === "workflows" && <WorkflowList agent={agent} />}
{client && view === "workflows" && <WorkflowList client={client} />}
</div>
</main>
{showRun && agent && (
{showRun && client && (
<RunDialog
agent={agent}
client={client}
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
@@ -3,7 +3,7 @@ import { Markdown } from "./markdown.tsx";
const ROLE_COLORS: Record<string, string> = {
preparer: "#8b5cf6",
agent: "#3b82f6",
client: "#3b82f6",
extractor: "#f59e0b",
};
@@ -3,13 +3,13 @@ import { listWorkflows, runThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
client: string;
onClose: () => void;
onCreated: (threadId: string) => void;
};
export function RunDialog({ agent, onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(agent), [agent]);
export function RunDialog({ client, onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(client), [client]);
const [workflow, setWorkflow] = useState("");
const [prompt, setPrompt] = useState("");
const [submitting, setSubmitting] = useState(false);
@@ -21,7 +21,7 @@ export function RunDialog({ agent, onClose, onCreated }: Props) {
setSubmitting(true);
setError(null);
try {
const result = await runThread(agent, workflow, prompt);
const result = await runThread(client, workflow, prompt);
onCreated(result.threadId);
} catch (err) {
setError(err instanceof Error ? err.message : String(err));
@@ -38,7 +38,7 @@ export function RunDialog({ agent, onClose, onCreated }: Props) {
className="w-full max-w-lg p-6 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h3 className="text-lg font-semibold mb-4">Run Thread on {agent}</h3>
<h3 className="text-lg font-semibold mb-4">Run Thread on {client}</h3>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label
@@ -1,27 +1,27 @@
import { useEffect } from "react";
import type { AgentEndpoint } from "../api.ts";
import { listAgents } from "../api.ts";
import type { ClientEndpoint } from "../api.ts";
import { listClients } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
view: "threads" | "workflows";
agent: string | null;
client: string | null;
onViewChange: (v: "threads" | "workflows") => void;
onAgentChange: (a: string | null) => void;
onClientChange: (a: string | null) => void;
onLogout: () => void;
};
export function Sidebar({ view, agent, onViewChange, onAgentChange, onLogout }: Props) {
const { status, data } = useFetch(() => listAgents(), []);
export function Sidebar({ view, client, onViewChange, onClientChange, onLogout }: Props) {
const { status, data } = useFetch(() => listClients(), []);
const agents: AgentEndpoint[] = status === "ok" ? data : [];
const clients: ClientEndpoint[] = status === "ok" ? data : [];
// Auto-select first agent when none is selected
// Auto-select first client when none is selected
useEffect(() => {
if (agent === null && agents.length > 0) {
onAgentChange(agents[0].name);
if (client === null && clients.length > 0) {
onClientChange(clients[0].name);
}
}, [agent, agents, onAgentChange]);
}, [client, clients, onClientChange]);
const viewItems = [
{ key: "threads" as const, label: "Threads", icon: "⚡" },
@@ -42,33 +42,33 @@ export function Sidebar({ view, agent, onViewChange, onAgentChange, onLogout }:
</p>
</div>
{/* Agent selector */}
{/* Client selector */}
<div className="px-4 py-3 border-b" style={{ borderColor: "var(--color-border)" }}>
<label
className="block text-xs font-medium mb-1"
style={{ color: "var(--color-text-muted)" }}
htmlFor="agent-select"
htmlFor="client-select"
>
Agent
Client
</label>
<select
id="agent-select"
id="client-select"
className="w-full rounded px-2 py-1.5 text-xs"
style={{
background: "var(--color-bg)",
color: "var(--color-text)",
border: "1px solid var(--color-border)",
}}
value={agent ?? ""}
onChange={(e) => onAgentChange(e.target.value || null)}
value={client ?? ""}
onChange={(e) => onClientChange(e.target.value || null)}
disabled={status === "loading"}
>
{status === "loading" ? (
<option value="">Loading</option>
) : agents.length === 0 ? (
<option value="">No agents online</option>
) : clients.length === 0 ? (
<option value="">No clients online</option>
) : (
agents.map((a) => (
clients.map((a) => (
<option key={a.name} value={a.name}>
{a.status === "online" ? "🟢" : "🔴"} {a.name}
</option>
@@ -1,10 +1,10 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { getAgentHealth } from "../api.ts";
import { getClientHealth } from "../api.ts";
type HealthStatus = "connected" | "disconnected" | "reconnecting";
type Props = {
agent: string | null;
client: string | null;
onRun: () => void;
};
@@ -18,17 +18,17 @@ function statusLabel(status: HealthStatus): { text: string; color: string } {
return { text: "● Offline", color: "var(--color-error)" };
}
export function StatusBar({ agent, onRun }: Props) {
export function StatusBar({ client, onRun }: Props) {
const [status, setStatus] = useState<HealthStatus>("disconnected");
const wasConnectedRef = useRef(false);
const checkHealth = useCallback(async () => {
if (!agent) {
if (!client) {
setStatus("disconnected");
return;
}
try {
await getAgentHealth(agent);
await getClientHealth(client);
wasConnectedRef.current = true;
setStatus("connected");
} catch {
@@ -38,7 +38,7 @@ export function StatusBar({ agent, onRun }: Props) {
setStatus("disconnected");
}
}
}, [agent]);
}, [client]);
useEffect(() => {
wasConnectedRef.current = false;
@@ -57,17 +57,17 @@ export function StatusBar({ agent, onRun }: Props) {
>
<div className="flex items-center gap-4">
<span style={{ color: "var(--color-text-muted)" }}>
{agent ? `Agent: ${agent}` : "No agent selected"}
{client ? `Client: ${client}` : "No client selected"}
</span>
<button
type="button"
onClick={onRun}
disabled={!agent}
disabled={!client}
className="px-3 py-1 rounded text-xs font-medium"
style={{
background: agent ? "var(--color-accent)" : "var(--color-border)",
background: client ? "var(--color-accent)" : "var(--color-border)",
color: "#fff",
opacity: agent ? 1 : 0.5,
opacity: client ? 1 : 0.5,
}}
>
Run Thread
@@ -14,7 +14,7 @@ import { RecordCard } from "./record-card.tsx";
import { type NodeState, WorkflowGraph } from "./workflow-graph/index.ts";
type Props = {
agent: string;
client: string;
threadId: string;
onBack: () => void;
};
@@ -52,9 +52,9 @@ function computeNodeStates(records: readonly ThreadRecord[]): Map<string, NodeSt
return states;
}
export function ThreadDetail({ agent, threadId, onBack }: Props) {
const sse = useSSE(agent, threadId);
const { status, data, error } = useFetch(() => getThread(agent, threadId), [agent, threadId]);
export function ThreadDetail({ client, threadId, onBack }: Props) {
const sse = useSSE(client, threadId);
const { status, data, error } = useFetch(() => getThread(client, threadId), [client, threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
const firstCardByRoleRef = useRef<Map<string, HTMLDivElement>>(new Map());
@@ -72,8 +72,8 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
const descriptorFetch = useFetch<WorkflowDescriptor | null>(
() =>
workflowName === null ? Promise.resolve(null) : getWorkflowDescriptor(agent, workflowName),
[agent, workflowName],
workflowName === null ? Promise.resolve(null) : getWorkflowDescriptor(client, workflowName),
[client, workflowName],
);
const descriptor = descriptorFetch.status === "ok" ? descriptorFetch.data : null;
@@ -117,7 +117,7 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
setActionStatus(`${action}ing...`);
try {
const fn = action === "kill" ? killThread : action === "pause" ? pauseThread : resumeThread;
await fn(agent, threadId);
await fn(client, threadId);
setActionStatus(`${action} sent ✓`);
} catch (e) {
setActionStatus(`${action} failed: ${e instanceof Error ? e.message : String(e)}`);
@@ -2,12 +2,12 @@ import { listThreads } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
client: string;
onSelect: (id: string) => void;
};
export function ThreadList({ agent, onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(agent), [agent]);
export function ThreadList({ client, onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(client), [client]);
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
@@ -5,7 +5,7 @@ import { useFetch } from "../hooks.ts";
import { type NodeState, WorkflowGraph } from "./workflow-graph/index.ts";
type Props = {
agent: string;
client: string;
};
type DetailCacheEntry =
@@ -108,8 +108,8 @@ function ExpandedWorkflowBody({
);
}
export function WorkflowList({ agent }: Props) {
const { status, data, error } = useFetch(() => listWorkflows(agent), [agent]);
export function WorkflowList({ client }: Props) {
const { status, data, error } = useFetch(() => listWorkflows(client), [client]);
const [expanded, setExpanded] = useState<Set<string>>(() => new Set());
const [detailsByName, setDetailsByName] = useState<Map<string, DetailCacheEntry>>(
() => new Map(),
@@ -117,11 +117,11 @@ export function WorkflowList({ agent }: Props) {
const staticNodeStates = useMemo(() => new Map<string, NodeState>(), []);
// biome-ignore lint/correctness/useExhaustiveDependencies: reset expansion when switching agents
// biome-ignore lint/correctness/useExhaustiveDependencies: reset expansion when switching clients
useEffect(() => {
setExpanded(new Set());
setDetailsByName(new Map());
}, [agent]);
}, [client]);
const ensureDetailLoaded = useCallback(
(name: string) => {
@@ -135,7 +135,7 @@ export function WorkflowList({ agent }: Props) {
void (async () => {
try {
const detail = await getWorkflowDetail(agent, name);
const detail = await getWorkflowDetail(client, name);
setDetailsByName((prev) => {
const next = new Map(prev);
next.set(name, { status: "ok", detail });
@@ -151,7 +151,7 @@ export function WorkflowList({ agent }: Props) {
}
})();
},
[agent],
[client],
);
function toggleExpanded(name: string) {
@@ -4,35 +4,35 @@ type View = "threads" | "workflows";
type HashRoute = {
view: View;
agent: string | null;
client: string | null;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
// Format: #agent/threads/id or #agent/workflows or #threads or #workflows
// Format: #client/threads/id or #client/workflows or #threads or #workflows
const parts = raw.split("/");
// Check if first part is a known view
if (parts[0] === "threads" || parts[0] === "workflows") {
return {
view: parts[0] as View,
agent: null,
client: null,
threadId: parts[0] === "threads" && parts.length > 1 ? parts.slice(1).join("/") : null,
};
}
// First part is agent name
const agent = parts[0] || null;
// First part is client name
const client = parts[0] || null;
const viewPart = parts[1] ?? "threads";
const view: View = viewPart === "workflows" ? "workflows" : "threads";
const threadId = view === "threads" && parts.length > 2 ? parts.slice(2).join("/") : null;
return { view, agent, threadId };
return { view, client, threadId };
}
function buildHash(route: HashRoute): string {
const prefix = route.agent ? `${route.agent}/` : "";
const prefix = route.client ? `${route.client}/` : "";
if (route.view === "workflows") {
return `#${prefix}workflows`;
}
@@ -44,10 +44,10 @@ function buildHash(route: HashRoute): string {
export function useHashRoute(): {
view: View;
agent: string | null;
client: string | null;
threadId: string | null;
setView: (v: View) => void;
setAgent: (a: string | null) => void;
setClient: (a: string | null) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
@@ -67,26 +67,26 @@ export function useHashRoute(): {
}, []);
const setView = useCallback(
(v: View) => navigate({ view: v, agent: route.agent, threadId: null }),
[navigate, route.agent],
(v: View) => navigate({ view: v, client: route.client, threadId: null }),
[navigate, route.client],
);
const setAgent = useCallback(
(a: string | null) => navigate({ view: route.view, agent: a, threadId: null }),
const setClient = useCallback(
(a: string | null) => navigate({ view: route.view, client: a, threadId: null }),
[navigate, route.view],
);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", agent: route.agent, threadId: id }),
[navigate, route.agent],
(id: string | null) => navigate({ view: "threads", client: route.client, threadId: id }),
[navigate, route.client],
);
return {
view: route.view,
agent: route.agent,
client: route.client,
threadId: route.threadId,
setView,
setAgent,
setClient,
setThreadId,
};
}
+7 -7
View File
@@ -57,17 +57,17 @@ function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
ctx.cleanupEs();
}
function sseUrl(agent: string, threadId: string): string {
function sseUrl(client: string, threadId: string): string {
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
const key = getApiKey();
const keyParam = key ? `?key=${encodeURIComponent(key)}` : "";
if (gatewayUrl) {
return `${gatewayUrl}/api/${agent}/threads/${encodeURIComponent(threadId)}/live${keyParam}`;
return `${gatewayUrl}/api/${client}/threads/${encodeURIComponent(threadId)}/live${keyParam}`;
}
return `/api/threads/${encodeURIComponent(threadId)}/live`;
}
export function useSSE(agent: string | null, threadId: string | null): UseSSEReturn {
export function useSSE(client: string | null, threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
@@ -76,7 +76,7 @@ export function useSSE(agent: string | null, threadId: string | null): UseSSERet
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null || agent === null) {
if (threadId === null || client === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
@@ -86,7 +86,7 @@ export function useSSE(agent: string | null, threadId: string | null): UseSSERet
}
const tid = threadId;
const agentName = agent;
const clientName = client;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
@@ -125,7 +125,7 @@ export function useSSE(agent: string | null, threadId: string | null): UseSSERet
}
cleanupEs();
const url = sseUrl(agentName, tid);
const url = sseUrl(clientName, tid);
es = new EventSource(url);
es.onopen = () => {
@@ -177,7 +177,7 @@ export function useSSE(agent: string | null, threadId: string | null): UseSSERet
}
cleanupEs();
};
}, [agent, threadId]);
}, [client, threadId]);
return { records, connected, completed };
}
@@ -1,14 +1,14 @@
/** One Durable Object instance per agent name; holds the reverse WebSocket from the agent CLI. */
/** One Durable Object instance per client name; holds the reverse WebSocket from the client CLI. */
import { DurableObject } from "cloudflare:workers";
import { parseWsRequestJson, parseWsResponseJson, type WsResponse } from "./ws-protocol.js";
type AgentSocketEnv = {
type ClientSocketEnv = {
GATEWAY_SECRET: string;
};
export const AGENT_SOCKET_INTERNAL_STATUS_PATH = "/internal/agent-socket/status";
export const AGENT_SOCKET_INTERNAL_PROXY_PATH = "/internal/agent-socket/proxy";
export const CLIENT_SOCKET_INTERNAL_STATUS_PATH = "/internal/client-socket/status";
export const CLIENT_SOCKET_INTERNAL_PROXY_PATH = "/internal/client-socket/proxy";
const PROXY_TIMEOUT_MS = 30_000;
@@ -32,7 +32,7 @@ function wsResponseToHttp(wr: WsResponse): Response {
return new Response(wr.body, { status: wr.status, headers });
}
export class AgentSocket extends DurableObject<AgentSocketEnv> {
export class ClientSocket extends DurableObject<ClientSocketEnv> {
private readonly pending = new Map<string, PendingEntry>();
private requireAuth(request: Request): Response | null {
@@ -100,11 +100,11 @@ export class AgentSocket extends DurableObject<AgentSocketEnv> {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === AGENT_SOCKET_INTERNAL_STATUS_PATH && request.method === "GET") {
if (url.pathname === CLIENT_SOCKET_INTERNAL_STATUS_PATH && request.method === "GET") {
return this.handleStatusGet(request);
}
if (url.pathname === AGENT_SOCKET_INTERNAL_PROXY_PATH && request.method === "POST") {
if (url.pathname === CLIENT_SOCKET_INTERNAL_PROXY_PATH && request.method === "POST") {
return this.handleProxyPost(request);
}
@@ -144,11 +144,11 @@ export class AgentSocket extends DurableObject<AgentSocketEnv> {
_reason: string,
_wasClean: boolean,
): Promise<void> {
this.rejectAllPending("agent websocket closed");
this.rejectAllPending("client websocket closed");
}
async webSocketError(_ws: WebSocket, _error: unknown): Promise<void> {
this.rejectAllPending("agent websocket error");
this.rejectAllPending("client websocket error");
}
private rejectAllPending(message: string): void {
+45 -45
View File
@@ -2,27 +2,27 @@ import { Hono } from "hono";
import { cors } from "hono/cors";
import {
AGENT_SOCKET_INTERNAL_PROXY_PATH,
AGENT_SOCKET_INTERNAL_STATUS_PATH,
AgentSocket,
} from "./agent-socket.js";
CLIENT_SOCKET_INTERNAL_PROXY_PATH,
CLIENT_SOCKET_INTERNAL_STATUS_PATH,
ClientSocket,
} from "./client-socket.js";
import type { WsRequest } from "./ws-protocol.js";
export { AgentSocket };
export { ClientSocket };
type Env = {
Bindings: {
ENDPOINTS: KVNamespace;
GATEWAY_SECRET: string;
DASHBOARD_API_KEY: string;
AGENT_SOCKET: DurableObjectNamespace<AgentSocket>;
CLIENT_SOCKET: DurableObjectNamespace<ClientSocket>;
};
};
type EndpointRecord = {
name: string;
url: string;
agentToken: string;
clientToken: string;
registeredAt: number;
lastHeartbeat: number;
};
@@ -43,7 +43,7 @@ function checkDashboardAuth(c: {
return key === c.env.DASHBOARD_API_KEY;
}
function isLocalAgentUrl(url: string): boolean {
function isLocalClientUrl(url: string): boolean {
try {
const u = new URL(url);
return u.hostname === "localhost" || u.hostname === "127.0.0.1";
@@ -52,7 +52,7 @@ function isLocalAgentUrl(url: string): boolean {
}
}
function buildForwardHeaders(raw: Headers, agentToken: string): Record<string, string> {
function buildForwardHeaders(raw: Headers, clientToken: string): Record<string, string> {
const out: Record<string, string> = {};
for (const [key, value] of raw) {
const lower = key.toLowerCase();
@@ -70,8 +70,8 @@ function buildForwardHeaders(raw: Headers, agentToken: string): Record<string, s
}
out[key] = value;
}
if (agentToken !== "") {
out["X-Agent-Token"] = agentToken;
if (clientToken !== "") {
out["X-Client-Token"] = clientToken;
}
return out;
}
@@ -81,7 +81,7 @@ function buildDashboardProxyHeaders(raw: Headers, token: string): Headers {
headers.delete("host");
headers.delete("Authorization");
if (token !== "") {
headers.set("X-Agent-Token", token);
headers.set("X-Client-Token", token);
}
return headers;
}
@@ -94,15 +94,15 @@ async function readBodyForWsProxy(method: string, req: Request): Promise<string
return buf.byteLength === 0 ? null : new TextDecoder().decode(buf);
}
async function fetchThroughAgentSocket(
async function fetchThroughClientSocket(
bindings: Env["Bindings"],
agent: string,
client: string,
gateSecret: string,
wsRequest: WsRequest,
): Promise<Response> {
const stub = bindings.AGENT_SOCKET.get(bindings.AGENT_SOCKET.idFromName(agent));
const stub = bindings.CLIENT_SOCKET.get(bindings.CLIENT_SOCKET.idFromName(client));
return stub.fetch(
new Request(`https://do.internal${AGENT_SOCKET_INTERNAL_PROXY_PATH}`, {
new Request(`https://do.internal${CLIENT_SOCKET_INTERNAL_PROXY_PATH}`, {
method: "POST",
headers: {
Authorization: `Bearer ${gateSecret}`,
@@ -113,7 +113,7 @@ async function fetchThroughAgentSocket(
);
}
async function fetchAgentWithRecordHeaders(
async function fetchClientWithRecordHeaders(
targetUrl: string,
method: string,
forwardRecord: Record<string, string>,
@@ -130,7 +130,7 @@ async function fetchAgentWithRecordHeaders(
});
}
async function fetchAgentWithDashboardHeaders(
async function fetchClientWithDashboardHeaders(
targetUrl: string,
method: string,
headers: Headers,
@@ -143,15 +143,15 @@ async function fetchAgentWithDashboardHeaders(
});
}
async function fetchAgentSocketStatus(
async function fetchClientSocketStatus(
env: Env["Bindings"],
name: string,
): Promise<{ ok: true; connected: boolean } | { ok: false }> {
try {
const id = env.AGENT_SOCKET.idFromName(name);
const stub = env.AGENT_SOCKET.get(id);
const id = env.CLIENT_SOCKET.idFromName(name);
const stub = env.CLIENT_SOCKET.get(id);
const resp = await stub.fetch(
new Request(`https://do${AGENT_SOCKET_INTERNAL_STATUS_PATH}`, {
new Request(`https://do${CLIENT_SOCKET_INTERNAL_STATUS_PATH}`, {
method: "GET",
headers: { Authorization: `Bearer ${env.GATEWAY_SECRET}` },
}),
@@ -171,7 +171,7 @@ function endpointStatusFromKvAndDo(record: EndpointRecord, doConnected: boolean
return "online";
}
if (doConnected === false) {
if (isLocalAgentUrl(record.url)) {
if (isLocalClientUrl(record.url)) {
return "offline";
}
const age = Date.now() - record.lastHeartbeat;
@@ -184,7 +184,7 @@ function endpointStatusFromKvAndDo(record: EndpointRecord, doConnected: boolean
// ── Health ──────────────────────────────────────────────────────────
app.get("/healthz", (c) => c.json({ ok: true }));
// ── Agent reverse WebSocket (GATEWAY_SECRET query param) ────────────
// ── Client reverse WebSocket (GATEWAY_SECRET query param) ────────────
app.get("/ws/connect", async (c) => {
const secret = c.req.query("secret");
const name = c.req.query("name");
@@ -197,8 +197,8 @@ app.get("/ws/connect", async (c) => {
if (c.req.header("Upgrade") !== "websocket") {
return c.text("expected WebSocket upgrade", 426);
}
const id = c.env.AGENT_SOCKET.idFromName(name);
const stub = c.env.AGENT_SOCKET.get(id);
const id = c.env.CLIENT_SOCKET.idFromName(name);
const stub = c.env.CLIENT_SOCKET.get(id);
return stub.fetch(c.req.raw);
});
@@ -210,9 +210,9 @@ gateway.post("/register", async (c) => {
name?: string;
url?: string;
secret?: string;
agentToken?: string;
clientToken?: string;
}>();
const { name, url, secret, agentToken } = body;
const { name, url, secret, clientToken } = body;
if (!name || !url) {
return c.json({ error: "name and url required" }, 400);
@@ -227,7 +227,7 @@ gateway.post("/register", async (c) => {
const record: EndpointRecord = {
name,
url: url.replace(/\/+$/, ""), // strip trailing slash
agentToken: agentToken ?? existing?.agentToken ?? "",
clientToken: clientToken ?? existing?.clientToken ?? "",
registeredAt: existing?.registeredAt ?? now,
lastHeartbeat: now,
};
@@ -261,7 +261,7 @@ gateway.get("/endpoints", async (c) => {
for (const key of list.keys) {
const record = await c.env.ENDPOINTS.get<EndpointRecord>(key.name, "json");
if (record) {
const doStatus = await fetchAgentSocketStatus(c.env, record.name);
const doStatus = await fetchClientSocketStatus(c.env, record.name);
const doConnected = doStatus.ok ? doStatus.connected : null;
endpoints.push({
name: record.name,
@@ -277,25 +277,25 @@ gateway.get("/endpoints", async (c) => {
app.route("/api/gateway", gateway);
// ── API proxy: /api/agents/:agent/* → WebSocket (preferred) or agent tunnel URL (dashboard auth) ──
app.all("/api/agents/:agent/*", async (c) => {
// ── API proxy: /api/clients/:client/* → WebSocket (preferred) or client tunnel URL (dashboard auth) ──
app.all("/api/clients/:client/*", async (c) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
const agent = c.req.param("agent");
const record = await c.env.ENDPOINTS.get<EndpointRecord>(agent, "json");
const client = c.req.param("client");
const record = await c.env.ENDPOINTS.get<EndpointRecord>(client, "json");
if (!record) {
return c.json({ error: "agent not found" }, 404);
return c.json({ error: "client not found" }, 404);
}
const url = new URL(c.req.url);
const pathAfterAgent = url.pathname.replace(`/api/agents/${agent}`, "");
const targetUrl = `${record.url}/api${pathAfterAgent}${url.search}`;
const proxyPath = `/api${pathAfterAgent}${url.search}`;
const pathAfterClient = url.pathname.replace(`/api/clients/${client}`, "");
const targetUrl = `${record.url}/api${pathAfterClient}${url.search}`;
const proxyPath = `/api${pathAfterClient}${url.search}`;
const method = c.req.method;
const token = record.agentToken ?? "";
const token = record.clientToken ?? "";
const forwardRecord = buildForwardHeaders(c.req.raw.headers, token);
const doStatus = await fetchAgentSocketStatus(c.env, agent);
const doStatus = await fetchClientSocketStatus(c.env, client);
if (doStatus.ok && doStatus.connected) {
const bodyStr = await readBodyForWsProxy(method, c.req.raw);
const wsRequest: WsRequest = {
@@ -305,7 +305,7 @@ app.all("/api/agents/:agent/*", async (c) => {
headers: forwardRecord,
body: bodyStr,
};
const proxyResp = await fetchThroughAgentSocket(c.env, agent, c.env.GATEWAY_SECRET, wsRequest);
const proxyResp = await fetchThroughClientSocket(c.env, client, c.env.GATEWAY_SECRET, wsRequest);
if (proxyResp.status !== 503) {
return new Response(proxyResp.body, {
status: proxyResp.status,
@@ -313,25 +313,25 @@ app.all("/api/agents/:agent/*", async (c) => {
});
}
try {
const resp = await fetchAgentWithRecordHeaders(targetUrl, method, forwardRecord, bodyStr);
const resp = await fetchClientWithRecordHeaders(targetUrl, method, forwardRecord, bodyStr);
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
});
} catch (err) {
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
return c.json({ error: "client unreachable", detail: String(err) }, 502);
}
}
const headers = buildDashboardProxyHeaders(c.req.raw.headers, token);
try {
const resp = await fetchAgentWithDashboardHeaders(targetUrl, method, headers, c.req.raw.body);
const resp = await fetchClientWithDashboardHeaders(targetUrl, method, headers, c.req.raw.body);
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
});
} catch (err) {
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
return c.json({ error: "client unreachable", detail: String(err) }, 502);
}
});
+5 -1
View File
@@ -7,10 +7,14 @@ binding = "ENDPOINTS"
id = "88b118d1cfab4c049f9c1684848811a3"
[durable_objects]
bindings = [{ name = "AGENT_SOCKET", class_name = "AgentSocket" }]
bindings = [{ name = "CLIENT_SOCKET", class_name = "ClientSocket" }]
[[migrations]]
tag = "add-agent-socket"
new_sqlite_classes = ["AgentSocket"]
[[migrations]]
tag = "rename-agent-to-client"
renamed_classes = [{ from = "AgentSocket", to = "ClientSocket" }]
# GATEWAY_SECRET is set via `wrangler secret put`