diff --git a/packages/cli-workflow/src/commands/serve/serve.ts b/packages/cli-workflow/src/commands/serve/serve.ts index 684f340..9456ee5 100644 --- a/packages/cli-workflow/src/commands/serve/serve.ts +++ b/packages/cli-workflow/src/commands/serve/serve.ts @@ -1,17 +1,14 @@ import { randomUUID } from "node:crypto"; import { hostname as osHostname } from "node:os"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; +import { createLogger } from "@uncaged/workflow-util"; import { serve } from "bun"; import { printCliLine } from "../../cli-output.js"; import { createApp } from "./app.js"; -import { - registerWithGateway, - startHeartbeat, - startTunnel, - unregisterFromGateway, -} from "./tunnel.js"; +import { registerWithGateway, startHeartbeat, unregisterFromGateway } from "./tunnel.js"; import type { ServeOptions } from "./types.js"; +import { startGatewayWsClient } from "./ws-client.js"; const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev"; const HEARTBEAT_INTERVAL_MS = 60_000; @@ -56,6 +53,7 @@ function parseServeArgv(argv: string[]): Result { let hostname = "127.0.0.1"; let name = osHostname().split(".")[0].toLowerCase(); let noTunnel = false; + let tunnelUrl: string | null = null; let gatewayUrl = DEFAULT_GATEWAY_URL; const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? ""; const stringFlags: Record void> = { @@ -68,6 +66,9 @@ function parseServeArgv(argv: string[]): Result { "--gateway": (v) => { gatewayUrl = v; }, + "--tunnel-url": (v) => { + tunnelUrl = v; + }, }; for (let i = 0; i < argv.length; i++) { @@ -87,7 +88,7 @@ function parseServeArgv(argv: string[]): Result { } } - return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret }); + return ok({ port, hostname, name, noTunnel, tunnelUrl, gatewayUrl, gatewaySecret }); } export async function dispatchServe(storageRoot: string, argv: string[]): Promise { @@ -107,47 +108,63 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis return 0; } - // Start cloudflared quick tunnel - printCliLine("starting cloudflared quick tunnel..."); - const tunnel = await startTunnel(options.port); + let resolvedTunnelUrl: string; + let stopWsClient: (() => void) | null = null; - if (!tunnel) { - printCliLine("failed to create tunnel — continuing without gateway registration"); - await new Promise(() => {}); - return 0; + if (options.tunnelUrl !== null) { + resolvedTunnelUrl = options.tunnelUrl; + printCliLine(`using tunnel URL: ${resolvedTunnelUrl}`); + } else { + if (options.gatewaySecret === "") { + printCliLine( + "WORKFLOW_GATEWAY_SECRET not set — cannot use WebSocket gateway connection (set env or pass --tunnel-url)", + ); + await new Promise(() => {}); + return 0; + } + resolvedTunnelUrl = `http://127.0.0.1:${options.port}`; + const log = createLogger({ sink: { kind: "stderr" } }); + stopWsClient = startGatewayWsClient({ + gatewayUrl: options.gatewayUrl, + name: options.name, + secret: options.gatewaySecret, + log, + }); + printCliLine("gateway WebSocket reverse connection (no cloudflared)"); } - printCliLine(`tunnel: ${tunnel.url}`); - - // Register with gateway if (options.gatewaySecret) { + if (agentToken === null) { + printCliLine("internal error: agent token missing"); + await new Promise(() => {}); + return 1; + } + const token = agentToken; const registered = await registerWithGateway( options.gatewayUrl, options.name, - tunnel.url, + resolvedTunnelUrl, options.gatewaySecret, - agentToken!, + token, ); if (registered) { printCliLine(`registered with gateway as "${options.name}"`); } - // Start heartbeat const heartbeatTimer = startHeartbeat( options.gatewayUrl, options.name, - tunnel.url, + resolvedTunnelUrl, options.gatewaySecret, - agentToken!, + token, HEARTBEAT_INTERVAL_MS, ); - // Cleanup on exit const cleanup = async () => { clearInterval(heartbeatTimer); + stopWsClient?.(); printCliLine("unregistering from gateway..."); await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret); - tunnel.process.kill(); process.exit(0); }; @@ -157,7 +174,6 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration"); } - // Keep process alive await new Promise(() => {}); return 0; } diff --git a/packages/cli-workflow/src/commands/serve/types.ts b/packages/cli-workflow/src/commands/serve/types.ts index 541269c..8c19cd7 100644 --- a/packages/cli-workflow/src/commands/serve/types.ts +++ b/packages/cli-workflow/src/commands/serve/types.ts @@ -3,6 +3,7 @@ export type ServeOptions = { hostname: string; name: string; noTunnel: boolean; + tunnelUrl: string | null; gatewayUrl: string; gatewaySecret: string; }; diff --git a/packages/cli-workflow/src/commands/serve/ws-client.ts b/packages/cli-workflow/src/commands/serve/ws-client.ts new file mode 100644 index 0000000..b6d2e94 --- /dev/null +++ b/packages/cli-workflow/src/commands/serve/ws-client.ts @@ -0,0 +1,112 @@ +import type { LogFn } from "@uncaged/workflow-util"; + +export type GatewayWsClientParams = { + gatewayUrl: string; + name: string; + secret: string; + log: LogFn; +}; + +const INITIAL_BACKOFF_MS = 1000; +const MAX_BACKOFF_MS = 30_000; + +export function buildGatewayWsConnectUrl(gatewayUrl: string, name: string, secret: string): string { + const u = new URL(gatewayUrl); + if (u.protocol === "https:") { + u.protocol = "wss:"; + } else if (u.protocol === "http:") { + u.protocol = "ws:"; + } + u.pathname = "/ws/connect"; + u.search = ""; + u.searchParams.set("name", name); + u.searchParams.set("secret", secret); + return u.href; +} + +/** Maintains a reverse WebSocket to the workflow gateway; reconnects with exponential backoff. */ +export function startGatewayWsClient(params: GatewayWsClientParams): () => void { + const wsUrl = buildGatewayWsConnectUrl(params.gatewayUrl, params.name, params.secret); + let socket: WebSocket | null = null; + let reconnectTimer: ReturnType | null = null; + let stopped = false; + let attempt = 0; + + const clearReconnectTimer = (): void => { + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + }; + + const scheduleReconnect = (): void => { + if (stopped) { + return; + } + clearReconnectTimer(); + const delayMs = Math.min(INITIAL_BACKOFF_MS * 2 ** attempt, MAX_BACKOFF_MS); + attempt++; + params.log("6CJX2RLP", `gateway WebSocket reconnect in ${delayMs}ms (attempt ${attempt})`); + reconnectTimer = setTimeout(connect, delayMs); + }; + + const connect = (): void => { + if (stopped) { + return; + } + clearReconnectTimer(); + params.log("2XK7HM9Q", "gateway WebSocket connecting..."); + try { + socket = new WebSocket(wsUrl); + } catch (e) { + params.log("7NQW4HBT", `gateway WebSocket create failed: ${String(e)}`); + scheduleReconnect(); + return; + } + + const ws = socket; + + ws.addEventListener("open", () => { + attempt = 0; + params.log("4PWN3V82", "gateway WebSocket connected"); + }); + + ws.addEventListener("close", (ev) => { + socket = null; + params.log( + "8QTR6ZKC", + `gateway WebSocket closed code=${String(ev.code)} reason=${ev.reason} wasClean=${String(ev.wasClean)}`, + ); + if (!stopped) { + scheduleReconnect(); + } + }); + + ws.addEventListener("error", () => { + params.log("9BWS1M7F", "gateway WebSocket error"); + }); + + ws.addEventListener("message", (ev) => { + let preview: string; + if (typeof ev.data === "string") { + preview = ev.data; + } else if (ev.data instanceof ArrayBuffer) { + preview = `[binary ${String(ev.data.byteLength)} bytes]`; + } else { + preview = "[non-text message]"; + } + params.log("3FHK5NDJ", `gateway → agent (phase 2 stub): ${preview.slice(0, 500)}`); + }); + }; + + connect(); + + return (): void => { + stopped = true; + clearReconnectTimer(); + if (socket !== null && socket.readyState === WebSocket.OPEN) { + socket.close(1000, "shutdown"); + } + socket = null; + }; +} diff --git a/packages/workflow-gateway/src/agent-socket.ts b/packages/workflow-gateway/src/agent-socket.ts new file mode 100644 index 0000000..a684801 --- /dev/null +++ b/packages/workflow-gateway/src/agent-socket.ts @@ -0,0 +1,54 @@ +/** One Durable Object instance per agent name; holds the reverse WebSocket from the agent CLI. */ +import { DurableObject } from "cloudflare:workers"; + +type AgentSocketEnv = { + GATEWAY_SECRET: string; +}; + +export const AGENT_SOCKET_INTERNAL_STATUS_PATH = "/internal/agent-socket/status"; + +export class AgentSocket extends DurableObject { + async fetch(request: Request): Promise { + const url = new URL(request.url); + + if (url.pathname === AGENT_SOCKET_INTERNAL_STATUS_PATH && request.method === "GET") { + const auth = request.headers.get("Authorization"); + if (auth !== `Bearer ${this.env.GATEWAY_SECRET}`) { + return new Response(JSON.stringify({ error: "unauthorized" }), { + status: 401, + headers: { "Content-Type": "application/json" }, + }); + } + const sockets = this.ctx.getWebSockets(); + const connected = sockets.length > 0; + return new Response(JSON.stringify({ connected, connectedCount: sockets.length }), { + headers: { "Content-Type": "application/json" }, + }); + } + + if (request.headers.get("Upgrade") !== "websocket") { + return new Response("expected WebSocket upgrade", { status: 426 }); + } + + for (const ws of this.ctx.getWebSockets()) { + ws.close(1000, "replaced by new connection"); + } + + const pair = new WebSocketPair(); + const client = pair[0]; + const server = pair[1]; + this.ctx.acceptWebSocket(server); + return new Response(null, { status: 101, webSocket: client }); + } + + async webSocketMessage(_ws: WebSocket, _message: string | ArrayBuffer): Promise {} + + async webSocketClose( + _ws: WebSocket, + _code: number, + _reason: string, + _wasClean: boolean, + ): Promise {} + + async webSocketError(_ws: WebSocket, _error: unknown): Promise {} +} diff --git a/packages/workflow-gateway/src/index.ts b/packages/workflow-gateway/src/index.ts index e303790..ce2f61a 100644 --- a/packages/workflow-gateway/src/index.ts +++ b/packages/workflow-gateway/src/index.ts @@ -1,11 +1,16 @@ import { Hono } from "hono"; import { cors } from "hono/cors"; +import { AGENT_SOCKET_INTERNAL_STATUS_PATH, AgentSocket } from "./agent-socket.js"; + +export { AgentSocket }; + type Env = { Bindings: { ENDPOINTS: KVNamespace; GATEWAY_SECRET: string; DASHBOARD_API_KEY: string; + AGENT_SOCKET: DurableObjectNamespace; }; }; @@ -33,9 +38,74 @@ function checkDashboardAuth(c: { return key === c.env.DASHBOARD_API_KEY; } +function isLocalAgentUrl(url: string): boolean { + try { + const u = new URL(url); + return u.hostname === "localhost" || u.hostname === "127.0.0.1"; + } catch { + return false; + } +} + +async function fetchAgentSocketStatus( + env: Env["Bindings"], + name: string, +): Promise<{ ok: true; connected: boolean } | { ok: false }> { + try { + const id = env.AGENT_SOCKET.idFromName(name); + const stub = env.AGENT_SOCKET.get(id); + const resp = await stub.fetch( + new Request(`https://do${AGENT_SOCKET_INTERNAL_STATUS_PATH}`, { + method: "GET", + headers: { Authorization: `Bearer ${env.GATEWAY_SECRET}` }, + }), + ); + if (!resp.ok) { + return { ok: false }; + } + const body = (await resp.json()) as { connected: boolean }; + return { ok: true, connected: body.connected }; + } catch { + return { ok: false }; + } +} + +function endpointStatusFromKvAndDo(record: EndpointRecord, doConnected: boolean | null): string { + if (doConnected === true) { + return "online"; + } + if (doConnected === false) { + if (isLocalAgentUrl(record.url)) { + return "offline"; + } + const age = Date.now() - record.lastHeartbeat; + return age < TTL_SECONDS * 1000 ? "online" : "offline"; + } + const age = Date.now() - record.lastHeartbeat; + return age < TTL_SECONDS * 1000 ? "online" : "offline"; +} + // ── Health ────────────────────────────────────────────────────────── app.get("/healthz", (c) => c.json({ ok: true })); +// ── Agent reverse WebSocket (GATEWAY_SECRET query param) ──────────── +app.get("/ws/connect", async (c) => { + const secret = c.req.query("secret"); + const name = c.req.query("name"); + if (name === undefined || name === "") { + return c.json({ error: "name required" }, 400); + } + if (secret !== c.env.GATEWAY_SECRET) { + return c.json({ error: "unauthorized" }, 401); + } + if (c.req.header("Upgrade") !== "websocket") { + return c.text("expected WebSocket upgrade", 426); + } + const id = c.env.AGENT_SOCKET.idFromName(name); + const stub = c.env.AGENT_SOCKET.get(id); + return stub.fetch(c.req.raw); +}); + // ── Gateway management (GATEWAY_SECRET auth) ──────────────────────── const gateway = new Hono(); @@ -95,11 +165,12 @@ gateway.get("/endpoints", async (c) => { for (const key of list.keys) { const record = await c.env.ENDPOINTS.get(key.name, "json"); if (record) { - const age = Date.now() - record.lastHeartbeat; + const doStatus = await fetchAgentSocketStatus(c.env, record.name); + const doConnected = doStatus.ok ? doStatus.connected : null; endpoints.push({ name: record.name, url: record.url, - status: age < TTL_SECONDS * 1000 ? "online" : "offline", + status: endpointStatusFromKvAndDo(record, doConnected), lastHeartbeat: record.lastHeartbeat, }); } @@ -149,4 +220,5 @@ app.all("/api/agents/:agent/*", async (c) => { } }); +// biome-ignore lint/style/noDefaultExport: Cloudflare Workers entry expects default export export default app; diff --git a/packages/workflow-gateway/wrangler.toml b/packages/workflow-gateway/wrangler.toml index 9688a1d..e8d6a6d 100644 --- a/packages/workflow-gateway/wrangler.toml +++ b/packages/workflow-gateway/wrangler.toml @@ -6,4 +6,11 @@ compatibility_date = "2025-04-01" binding = "ENDPOINTS" id = "88b118d1cfab4c049f9c1684848811a3" +[durable_objects] +bindings = [{ name = "AGENT_SOCKET", class_name = "AgentSocket" }] + +[[migrations]] +tag = "add-agent-socket" +new_sqlite_classes = ["AgentSocket"] + # GATEWAY_SECRET is set via `wrangler secret put`