diff --git a/packages/cli-workflow/src/commands/serve/serve.ts b/packages/cli-workflow/src/commands/serve/serve.ts index 1d894fd..214aab0 100644 --- a/packages/cli-workflow/src/commands/serve/serve.ts +++ b/packages/cli-workflow/src/commands/serve/serve.ts @@ -1,10 +1,20 @@ +import { hostname as osHostname } from "node:os"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; import { serve } from "bun"; import { printCliLine } from "../../cli-output.js"; import { createApp } from "./app.js"; +import { + registerWithGateway, + startHeartbeat, + startTunnel, + unregisterFromGateway, +} from "./tunnel.js"; import type { ServeOptions } from "./types.js"; +const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev"; +const HEARTBEAT_INTERVAL_MS = 60_000; + export function startServer(storageRoot: string, options: ServeOptions): void { const app = createApp(storageRoot); @@ -28,30 +38,51 @@ function parsePortValue(value: string | undefined): Result { return ok(parsed); } +function requireNextArg(argv: string[], i: number, flag: string): Result { + const next = argv[i + 1]; + if (next === undefined) { + return err(`${flag} requires a value`); + } + return ok(next); +} + function parseServeArgv(argv: string[]): Result { let port = 7860; let hostname = "127.0.0.1"; + let name = osHostname().split(".")[0].toLowerCase(); + let noTunnel = false; + let gatewayUrl = DEFAULT_GATEWAY_URL; + const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? ""; + const stringFlags: Record void> = { + "--host": (v) => { + hostname = v; + }, + "--name": (v) => { + name = v; + }, + "--gateway": (v) => { + gatewayUrl = v; + }, + }; for (let i = 0; i < argv.length; i++) { const arg = argv[i]; if (arg === "--port" || arg === "-p") { const portResult = parsePortValue(argv[i + 1]); - if (!portResult.ok) { - return portResult; - } + if (!portResult.ok) return portResult; port = portResult.value; i++; - } else if (arg === "--host") { - const next = argv[i + 1]; - if (next === undefined) { - return err("--host requires a value"); - } - hostname = next; + } else if (arg === "--no-tunnel") { + noTunnel = true; + } else if (arg in stringFlags) { + const r = requireNextArg(argv, i, arg); + if (!r.ok) return r; + stringFlags[arg](r.value); i++; } } - return ok({ port, hostname }); + return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret }); } export async function dispatchServe(storageRoot: string, argv: string[]): Promise { @@ -61,7 +92,62 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis return 1; } - startServer(storageRoot, parsed.value); + const options = parsed.value; + startServer(storageRoot, options); + + if (options.noTunnel) { + printCliLine("tunnel disabled (--no-tunnel)"); + await new Promise(() => {}); + return 0; + } + + // Start cloudflared quick tunnel + printCliLine("starting cloudflared quick tunnel..."); + const tunnel = await startTunnel(options.port); + + if (!tunnel) { + printCliLine("failed to create tunnel — continuing without gateway registration"); + await new Promise(() => {}); + return 0; + } + + printCliLine(`tunnel: ${tunnel.url}`); + + // Register with gateway + if (options.gatewaySecret) { + const registered = await registerWithGateway( + options.gatewayUrl, + options.name, + tunnel.url, + options.gatewaySecret, + ); + if (registered) { + printCliLine(`registered with gateway as "${options.name}"`); + } + + // Start heartbeat + const heartbeatTimer = startHeartbeat( + options.gatewayUrl, + options.name, + tunnel.url, + options.gatewaySecret, + HEARTBEAT_INTERVAL_MS, + ); + + // Cleanup on exit + const cleanup = async () => { + clearInterval(heartbeatTimer); + printCliLine("unregistering from gateway..."); + await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret); + tunnel.process.kill(); + process.exit(0); + }; + + process.on("SIGINT", cleanup); + process.on("SIGTERM", cleanup); + } else { + printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration"); + } // Keep process alive await new Promise(() => {}); diff --git a/packages/cli-workflow/src/commands/serve/tunnel.ts b/packages/cli-workflow/src/commands/serve/tunnel.ts new file mode 100644 index 0000000..dd253df --- /dev/null +++ b/packages/cli-workflow/src/commands/serve/tunnel.ts @@ -0,0 +1,86 @@ +import { printCliLine } from "../../cli-output.js"; + +type TunnelHandle = { + process: ReturnType; + url: string; +}; + +export async function startTunnel(port: number): Promise { + const proc = Bun.spawn(["cloudflared", "tunnel", "--url", `http://localhost:${port}`], { + stdout: "pipe", + stderr: "pipe", + }); + + // cloudflared prints the URL to stderr + const reader = proc.stderr.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + const deadline = Date.now() + 30_000; + + while (Date.now() < deadline) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const match = buffer.match(/https:\/\/[a-z0-9-]+\.trycloudflare\.com/); + if (match) { + // Release the reader so stderr keeps flowing without backpressure + reader.releaseLock(); + return { process: proc, url: match[0] }; + } + } + + reader.releaseLock(); + proc.kill(); + return null; +} + +export async function registerWithGateway( + gatewayUrl: string, + name: string, + tunnelUrl: string, + secret: string, +): Promise { + try { + const resp = await fetch(`${gatewayUrl}/register`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ name, url: tunnelUrl, secret }), + }); + if (!resp.ok) { + const body = await resp.text(); + printCliLine(`gateway registration failed: ${resp.status} ${body}`); + return false; + } + return true; + } catch (e) { + printCliLine(`gateway registration error: ${e}`); + return false; + } +} + +export async function unregisterFromGateway( + gatewayUrl: string, + name: string, + secret: string, +): Promise { + try { + await fetch(`${gatewayUrl}/register/${name}`, { + method: "DELETE", + headers: { Authorization: `Bearer ${secret}` }, + }); + } catch { + // Best effort — process is exiting + } +} + +export function startHeartbeat( + gatewayUrl: string, + name: string, + tunnelUrl: string, + secret: string, + intervalMs: number, +): ReturnType { + return setInterval(() => { + registerWithGateway(gatewayUrl, name, tunnelUrl, secret).catch(() => {}); + }, intervalMs); +} diff --git a/packages/cli-workflow/src/commands/serve/types.ts b/packages/cli-workflow/src/commands/serve/types.ts index ad8ef10..541269c 100644 --- a/packages/cli-workflow/src/commands/serve/types.ts +++ b/packages/cli-workflow/src/commands/serve/types.ts @@ -1,4 +1,8 @@ export type ServeOptions = { port: number; hostname: string; + name: string; + noTunnel: boolean; + gatewayUrl: string; + gatewaySecret: string; };