feat: WebSocket reverse-connection gateway Phase 1 (#210)
- Add AgentSocket Durable Object (holds one WS per agent name) - Add /ws/connect route with GATEWAY_SECRET auth - Add ws-client.ts with auto-reconnect (exponential backoff 1s-30s) - serve defaults to WS mode (no cloudflared needed) - Keep --tunnel-url and --no-tunnel as fallback options - Endpoints list merges KV heartbeat + DO WebSocket status Testing: #211
This commit is contained in:
@@ -1,17 +1,14 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { hostname as osHostname } from "node:os";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
import { createLogger } from "@uncaged/workflow-util";
|
||||
import { serve } from "bun";
|
||||
|
||||
import { printCliLine } from "../../cli-output.js";
|
||||
import { createApp } from "./app.js";
|
||||
import {
|
||||
registerWithGateway,
|
||||
startHeartbeat,
|
||||
startTunnel,
|
||||
unregisterFromGateway,
|
||||
} from "./tunnel.js";
|
||||
import { registerWithGateway, startHeartbeat, unregisterFromGateway } from "./tunnel.js";
|
||||
import type { ServeOptions } from "./types.js";
|
||||
import { startGatewayWsClient } from "./ws-client.js";
|
||||
|
||||
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
|
||||
const HEARTBEAT_INTERVAL_MS = 60_000;
|
||||
@@ -56,6 +53,7 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
|
||||
let hostname = "127.0.0.1";
|
||||
let name = osHostname().split(".")[0].toLowerCase();
|
||||
let noTunnel = false;
|
||||
let tunnelUrl: string | null = null;
|
||||
let gatewayUrl = DEFAULT_GATEWAY_URL;
|
||||
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
|
||||
const stringFlags: Record<string, (v: string) => void> = {
|
||||
@@ -68,6 +66,9 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
|
||||
"--gateway": (v) => {
|
||||
gatewayUrl = v;
|
||||
},
|
||||
"--tunnel-url": (v) => {
|
||||
tunnelUrl = v;
|
||||
},
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i++) {
|
||||
@@ -87,7 +88,7 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
|
||||
}
|
||||
}
|
||||
|
||||
return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret });
|
||||
return ok({ port, hostname, name, noTunnel, tunnelUrl, gatewayUrl, gatewaySecret });
|
||||
}
|
||||
|
||||
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
|
||||
@@ -107,47 +108,63 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Start cloudflared quick tunnel
|
||||
printCliLine("starting cloudflared quick tunnel...");
|
||||
const tunnel = await startTunnel(options.port);
|
||||
let resolvedTunnelUrl: string;
|
||||
let stopWsClient: (() => void) | null = null;
|
||||
|
||||
if (!tunnel) {
|
||||
printCliLine("failed to create tunnel — continuing without gateway registration");
|
||||
await new Promise(() => {});
|
||||
return 0;
|
||||
if (options.tunnelUrl !== null) {
|
||||
resolvedTunnelUrl = options.tunnelUrl;
|
||||
printCliLine(`using tunnel URL: ${resolvedTunnelUrl}`);
|
||||
} else {
|
||||
if (options.gatewaySecret === "") {
|
||||
printCliLine(
|
||||
"WORKFLOW_GATEWAY_SECRET not set — cannot use WebSocket gateway connection (set env or pass --tunnel-url)",
|
||||
);
|
||||
await new Promise(() => {});
|
||||
return 0;
|
||||
}
|
||||
resolvedTunnelUrl = `http://127.0.0.1:${options.port}`;
|
||||
const log = createLogger({ sink: { kind: "stderr" } });
|
||||
stopWsClient = startGatewayWsClient({
|
||||
gatewayUrl: options.gatewayUrl,
|
||||
name: options.name,
|
||||
secret: options.gatewaySecret,
|
||||
log,
|
||||
});
|
||||
printCliLine("gateway WebSocket reverse connection (no cloudflared)");
|
||||
}
|
||||
|
||||
printCliLine(`tunnel: ${tunnel.url}`);
|
||||
|
||||
// Register with gateway
|
||||
if (options.gatewaySecret) {
|
||||
if (agentToken === null) {
|
||||
printCliLine("internal error: agent token missing");
|
||||
await new Promise(() => {});
|
||||
return 1;
|
||||
}
|
||||
const token = agentToken;
|
||||
const registered = await registerWithGateway(
|
||||
options.gatewayUrl,
|
||||
options.name,
|
||||
tunnel.url,
|
||||
resolvedTunnelUrl,
|
||||
options.gatewaySecret,
|
||||
agentToken!,
|
||||
token,
|
||||
);
|
||||
if (registered) {
|
||||
printCliLine(`registered with gateway as "${options.name}"`);
|
||||
}
|
||||
|
||||
// Start heartbeat
|
||||
const heartbeatTimer = startHeartbeat(
|
||||
options.gatewayUrl,
|
||||
options.name,
|
||||
tunnel.url,
|
||||
resolvedTunnelUrl,
|
||||
options.gatewaySecret,
|
||||
agentToken!,
|
||||
token,
|
||||
HEARTBEAT_INTERVAL_MS,
|
||||
);
|
||||
|
||||
// Cleanup on exit
|
||||
const cleanup = async () => {
|
||||
clearInterval(heartbeatTimer);
|
||||
stopWsClient?.();
|
||||
printCliLine("unregistering from gateway...");
|
||||
await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret);
|
||||
tunnel.process.kill();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
@@ -157,7 +174,6 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
|
||||
printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration");
|
||||
}
|
||||
|
||||
// Keep process alive
|
||||
await new Promise(() => {});
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ export type ServeOptions = {
|
||||
hostname: string;
|
||||
name: string;
|
||||
noTunnel: boolean;
|
||||
tunnelUrl: string | null;
|
||||
gatewayUrl: string;
|
||||
gatewaySecret: string;
|
||||
};
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
import type { LogFn } from "@uncaged/workflow-util";
|
||||
|
||||
export type GatewayWsClientParams = {
|
||||
gatewayUrl: string;
|
||||
name: string;
|
||||
secret: string;
|
||||
log: LogFn;
|
||||
};
|
||||
|
||||
const INITIAL_BACKOFF_MS = 1000;
|
||||
const MAX_BACKOFF_MS = 30_000;
|
||||
|
||||
export function buildGatewayWsConnectUrl(gatewayUrl: string, name: string, secret: string): string {
|
||||
const u = new URL(gatewayUrl);
|
||||
if (u.protocol === "https:") {
|
||||
u.protocol = "wss:";
|
||||
} else if (u.protocol === "http:") {
|
||||
u.protocol = "ws:";
|
||||
}
|
||||
u.pathname = "/ws/connect";
|
||||
u.search = "";
|
||||
u.searchParams.set("name", name);
|
||||
u.searchParams.set("secret", secret);
|
||||
return u.href;
|
||||
}
|
||||
|
||||
/** Maintains a reverse WebSocket to the workflow gateway; reconnects with exponential backoff. */
|
||||
export function startGatewayWsClient(params: GatewayWsClientParams): () => void {
|
||||
const wsUrl = buildGatewayWsConnectUrl(params.gatewayUrl, params.name, params.secret);
|
||||
let socket: WebSocket | null = null;
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let stopped = false;
|
||||
let attempt = 0;
|
||||
|
||||
const clearReconnectTimer = (): void => {
|
||||
if (reconnectTimer !== null) {
|
||||
clearTimeout(reconnectTimer);
|
||||
reconnectTimer = null;
|
||||
}
|
||||
};
|
||||
|
||||
const scheduleReconnect = (): void => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
const delayMs = Math.min(INITIAL_BACKOFF_MS * 2 ** attempt, MAX_BACKOFF_MS);
|
||||
attempt++;
|
||||
params.log("6CJX2RLP", `gateway WebSocket reconnect in ${delayMs}ms (attempt ${attempt})`);
|
||||
reconnectTimer = setTimeout(connect, delayMs);
|
||||
};
|
||||
|
||||
const connect = (): void => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
params.log("2XK7HM9Q", "gateway WebSocket connecting...");
|
||||
try {
|
||||
socket = new WebSocket(wsUrl);
|
||||
} catch (e) {
|
||||
params.log("7NQW4HBT", `gateway WebSocket create failed: ${String(e)}`);
|
||||
scheduleReconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
const ws = socket;
|
||||
|
||||
ws.addEventListener("open", () => {
|
||||
attempt = 0;
|
||||
params.log("4PWN3V82", "gateway WebSocket connected");
|
||||
});
|
||||
|
||||
ws.addEventListener("close", (ev) => {
|
||||
socket = null;
|
||||
params.log(
|
||||
"8QTR6ZKC",
|
||||
`gateway WebSocket closed code=${String(ev.code)} reason=${ev.reason} wasClean=${String(ev.wasClean)}`,
|
||||
);
|
||||
if (!stopped) {
|
||||
scheduleReconnect();
|
||||
}
|
||||
});
|
||||
|
||||
ws.addEventListener("error", () => {
|
||||
params.log("9BWS1M7F", "gateway WebSocket error");
|
||||
});
|
||||
|
||||
ws.addEventListener("message", (ev) => {
|
||||
let preview: string;
|
||||
if (typeof ev.data === "string") {
|
||||
preview = ev.data;
|
||||
} else if (ev.data instanceof ArrayBuffer) {
|
||||
preview = `[binary ${String(ev.data.byteLength)} bytes]`;
|
||||
} else {
|
||||
preview = "[non-text message]";
|
||||
}
|
||||
params.log("3FHK5NDJ", `gateway → agent (phase 2 stub): ${preview.slice(0, 500)}`);
|
||||
});
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return (): void => {
|
||||
stopped = true;
|
||||
clearReconnectTimer();
|
||||
if (socket !== null && socket.readyState === WebSocket.OPEN) {
|
||||
socket.close(1000, "shutdown");
|
||||
}
|
||||
socket = null;
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user