Compare commits

..

18 Commits

Author SHA1 Message Date
xingyue 484ed520cd feat(dashboard): switch graph layout from Dagre to ELK
What: Replace Dagre layout engine with ELK (Eclipse Layout Kernel) for
workflow graph visualization in the dashboard.

Why: Dagre lacks support for edge label placement and orthogonal edge
routing, causing condition labels to overlap with nodes. ELK provides
proper label positioning, better edge routing, and more compact layouts.

Changes:
- packages/workflow-dashboard/package.json: add elkjs dependency
- packages/workflow-dashboard/src/components/workflow-graph/use-layout.ts:
  rewrite layout from Dagre to async ELK with layered algorithm,
  orthogonal routing, reduced spacing for compactness
- packages/workflow-dashboard/src/components/workflow-graph/condition-edge.tsx:
  use ELK-computed label positions, show all labels including FALLBACK,
  switch to getSmoothStepPath for all edges
- packages/workflow-dashboard/src/components/workflow-graph/workflow-graph.tsx:
  wrap in ReactFlowProvider, add fitView on async layout change,
  key-based remount for layout stability
- packages/workflow-dashboard/src/components/workflow-list.tsx:
  left-right layout (info left, graph right), fix toggleExpanded
  React 18 batching bug, increase graph container height
2026-05-13 16:26:03 +08:00
xingyue 5b7c9b844b fix(engine): abort signal races gen.next() to fix flaky kill test (#209)
Root cause: executeThread awaited gen.next() without racing against
the abort signal. When a workflow bundle awaited a long setTimeout
between yields, the engine could not respond to kill until the
Promise resolved — causing the kill test to flake when the thread
completed before kill arrived.

Fix: Promise.race gen.next() with an abort listener so kill takes
effect immediately, even mid-yield. Also move the bundle's delay
to after the first yield (between planner and coder) to ensure the
thread is killable while running.

Closes #209
2026-05-13 11:31:40 +08:00
xiaoju f0d1bb9ae8 chore: bump all to 0.3.11
小橘 🍊
2026-05-13 03:28:33 +00:00
xiaoju 04cfd33f99 chore: bump all to 0.3.10 (regenerate lockfile)
小橘 🍊
2026-05-13 03:27:05 +00:00
xiaoju a8c00f169b chore: bump all packages to 0.3.9 (fix workspace:* dep resolution)
小橘 🍊
2026-05-13 03:25:50 +00:00
xiaoju c4d34530e8 chore: bump cli-workflow 0.3.8 (fix gateway dep resolution)
小橘 🍊
2026-05-13 03:23:09 +00:00
xiaoju 90a410c00a chore: bump cli-workflow to 0.3.7 (fix gateway dep version)
小橘 🍊
2026-05-13 03:21:34 +00:00
xiaoju 6276ca5a4a chore: publish workflow-gateway (remove private flag)
小橘 🍊
2026-05-13 03:20:33 +00:00
xiaoju 8e63f99eb6 chore: bump all public packages to 0.3.6
小橘 🍊
2026-05-13 03:18:25 +00:00
xiaomo 9ca70bbb69 Merge pull request 'feat: minimal tool set for workflow-agent-react (#222 Phase 3)' (#229) from feat/222-tools-smoke-test-phase3 into main 2026-05-13 03:16:37 +00:00
xiaomo ed1f38c7da Merge pull request 'refactor(dashboard): side-by-side graph + cards layout' (#215) from refactor/thread-detail-side-by-side-layout into main 2026-05-13 03:06:35 +00:00
xiaomo 1664d68b50 Merge pull request 'feat: WS request proxy — Phase 2 (#210)' (#214) from feat/210-ws-gateway-phase2 into main 2026-05-13 03:06:29 +00:00
xingyue 1871ef31b4 refactor(dashboard): replace vertical layout with side-by-side graph+cards
Change thread-detail from vertical (graph on top, cards below) to a
side-by-side layout:
- Left panel (280px, sticky): workflow graph, always visible
- Right panel (flex-1, scrollable): record cards
- Remove collapsible GraphPanel wrapper
- Graph acts as navigation (click node → scroll to card)

Refs: workflow thread 06F1NX4C9ET6HPXJAH7CWWF8MR
2026-05-13 11:05:03 +08:00
xingyue ec3c97b200 feat: WS request proxy — Gateway proxies HTTP via WebSocket (#210 Phase 2)
- Add ws-protocol.ts with WsRequest/WsResponse types + parsers
- AgentSocket DO: proxy POST handler, pending request map, 30s timeout
- /api/agents/:agent/* routes through DO WS when connected, falls back to HTTP
- ws-client handles incoming WsRequest, fetches local serve, returns WsResponse
- startGatewayWsClient accepts localPort for request handling

Testing: #213
2026-05-13 11:05:03 +08:00
xingyue 18e3dc7603 feat: WebSocket reverse-connection gateway Phase 1 (#210)
- Add AgentSocket Durable Object (holds one WS per agent name)
- Add /ws/connect route with GATEWAY_SECRET auth
- Add ws-client.ts with auto-reconnect (exponential backoff 1s-30s)
- serve defaults to WS mode (no cloudflared needed)
- Keep --tunnel-url and --no-tunnel as fallback options
- Endpoints list merges KV heartbeat + DO WebSocket status

Testing: #211
2026-05-13 11:05:03 +08:00
xiaoju fc229cac79 test: add tool handler unit tests (#222) 2026-05-13 02:57:47 +00:00
xiaoju ec555b43d1 feat: add minimal tool set (read/write/patch/shell) to workflow-agent-react (#222) 2026-05-13 02:57:47 +00:00
xiaomo c8de86d7c9 Merge pull request 'feat: workflow-agent-react + wrapAgentAsAdapter shared + childThread support (#222 Phase 2)' (#226) from feat/222-react-adapter-phase2 into main 2026-05-13 02:51:07 +00:00
40 changed files with 1315 additions and 249 deletions
@@ -70,10 +70,10 @@ const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `${threadFixtureDescriptor}
${wfPutImport}
export const run = async function* (input, options) {
await new Promise((r) => setTimeout(r, 600));
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
await new Promise((r) => setTimeout(r, 10000));
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
+2 -1
View File
@@ -1,11 +1,12 @@
{
"name": "@uncaged/cli-workflow",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"bin": {
"uncaged-workflow": "src/cli.ts"
},
"dependencies": {
"@uncaged/workflow-gateway": "workspace:*",
"@uncaged/workflow-protocol": "workspace:*",
"@uncaged/workflow-util": "workspace:*",
"@uncaged/workflow-cas": "workspace:*",
@@ -1,17 +1,14 @@
import { randomUUID } from "node:crypto";
import { hostname as osHostname } from "node:os";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { createLogger } from "@uncaged/workflow-util";
import { serve } from "bun";
import { printCliLine } from "../../cli-output.js";
import { createApp } from "./app.js";
import {
registerWithGateway,
startHeartbeat,
startTunnel,
unregisterFromGateway,
} from "./tunnel.js";
import { registerWithGateway, startHeartbeat, unregisterFromGateway } from "./tunnel.js";
import type { ServeOptions } from "./types.js";
import { startGatewayWsClient } from "./ws-client.js";
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
const HEARTBEAT_INTERVAL_MS = 60_000;
@@ -56,6 +53,7 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
let hostname = "127.0.0.1";
let name = osHostname().split(".")[0].toLowerCase();
let noTunnel = false;
let tunnelUrl: string | null = null;
let gatewayUrl = DEFAULT_GATEWAY_URL;
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
const stringFlags: Record<string, (v: string) => void> = {
@@ -68,6 +66,9 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
"--gateway": (v) => {
gatewayUrl = v;
},
"--tunnel-url": (v) => {
tunnelUrl = v;
},
};
for (let i = 0; i < argv.length; i++) {
@@ -87,7 +88,7 @@ function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
}
}
return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret });
return ok({ port, hostname, name, noTunnel, tunnelUrl, gatewayUrl, gatewaySecret });
}
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
@@ -107,47 +108,64 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
return 0;
}
// Start cloudflared quick tunnel
printCliLine("starting cloudflared quick tunnel...");
const tunnel = await startTunnel(options.port);
let resolvedTunnelUrl: string;
let stopWsClient: (() => void) | null = null;
if (!tunnel) {
printCliLine("failed to create tunnel — continuing without gateway registration");
await new Promise(() => {});
return 0;
if (options.tunnelUrl !== null) {
resolvedTunnelUrl = options.tunnelUrl;
printCliLine(`using tunnel URL: ${resolvedTunnelUrl}`);
} else {
if (options.gatewaySecret === "") {
printCliLine(
"WORKFLOW_GATEWAY_SECRET not set — cannot use WebSocket gateway connection (set env or pass --tunnel-url)",
);
await new Promise(() => {});
return 0;
}
resolvedTunnelUrl = `http://127.0.0.1:${options.port}`;
const log = createLogger({ sink: { kind: "stderr" } });
stopWsClient = startGatewayWsClient({
gatewayUrl: options.gatewayUrl,
name: options.name,
secret: options.gatewaySecret,
localPort: options.port,
log,
});
printCliLine("gateway WebSocket reverse connection (no cloudflared)");
}
printCliLine(`tunnel: ${tunnel.url}`);
// Register with gateway
if (options.gatewaySecret) {
if (agentToken === null) {
printCliLine("internal error: agent token missing");
await new Promise(() => {});
return 1;
}
const token = agentToken;
const registered = await registerWithGateway(
options.gatewayUrl,
options.name,
tunnel.url,
resolvedTunnelUrl,
options.gatewaySecret,
agentToken!,
token,
);
if (registered) {
printCliLine(`registered with gateway as "${options.name}"`);
}
// Start heartbeat
const heartbeatTimer = startHeartbeat(
options.gatewayUrl,
options.name,
tunnel.url,
resolvedTunnelUrl,
options.gatewaySecret,
agentToken!,
token,
HEARTBEAT_INTERVAL_MS,
);
// Cleanup on exit
const cleanup = async () => {
clearInterval(heartbeatTimer);
stopWsClient?.();
printCliLine("unregistering from gateway...");
await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret);
tunnel.process.kill();
process.exit(0);
};
@@ -157,7 +175,6 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration");
}
// Keep process alive
await new Promise(() => {});
return 0;
}
@@ -3,6 +3,7 @@ export type ServeOptions = {
hostname: string;
name: string;
noTunnel: boolean;
tunnelUrl: string | null;
gatewayUrl: string;
gatewaySecret: string;
};
@@ -0,0 +1,165 @@
import { parseWsRequestJson, type WsResponse } from "@uncaged/workflow-gateway/ws-protocol";
import type { LogFn } from "@uncaged/workflow-util";
export type GatewayWsClientParams = {
gatewayUrl: string;
name: string;
secret: string;
localPort: number;
log: LogFn;
};
const INITIAL_BACKOFF_MS = 1000;
const MAX_BACKOFF_MS = 30_000;
export function buildGatewayWsConnectUrl(gatewayUrl: string, name: string, secret: string): string {
const u = new URL(gatewayUrl);
if (u.protocol === "https:") {
u.protocol = "wss:";
} else if (u.protocol === "http:") {
u.protocol = "ws:";
}
u.pathname = "/ws/connect";
u.search = "";
u.searchParams.set("name", name);
u.searchParams.set("secret", secret);
return u.href;
}
function headersToRecord(h: Headers): Record<string, string> {
const out: Record<string, string> = {};
for (const [k, v] of h) {
out[k] = v;
}
return out;
}
async function handleGatewayMessage(
ws: WebSocket,
raw: string,
params: GatewayWsClientParams,
): Promise<void> {
const req = parseWsRequestJson(raw);
if (req === null) {
params.log("ZM8K2PQ1", "gateway WebSocket dropped non-request message");
return;
}
const localUrl = `http://127.0.0.1:${String(params.localPort)}${req.path}`;
const initHeaders = new Headers();
for (const [k, v] of Object.entries(req.headers)) {
initHeaders.set(k, v);
}
let resp: Response;
try {
resp = await fetch(localUrl, {
method: req.method,
headers: initHeaders,
body: req.body === null ? undefined : req.body,
});
} catch (e) {
params.log("R4N7BQ3C", `local proxy fetch failed: ${String(e)}`);
const errBody: WsResponse = {
id: req.id,
status: 502,
headers: { "content-type": "application/json" },
body: JSON.stringify({ error: "local fetch failed", detail: String(e) }),
};
ws.send(JSON.stringify(errBody));
return;
}
const bodyText = await resp.text();
const headerRecord = headersToRecord(resp.headers);
const out: WsResponse = {
id: req.id,
status: resp.status,
headers: headerRecord,
body: bodyText,
};
ws.send(JSON.stringify(out));
}
/** Maintains a reverse WebSocket to the workflow gateway; reconnects with exponential backoff. */
export function startGatewayWsClient(params: GatewayWsClientParams): () => void {
const wsUrl = buildGatewayWsConnectUrl(params.gatewayUrl, params.name, params.secret);
let socket: WebSocket | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let stopped = false;
let attempt = 0;
const clearReconnectTimer = (): void => {
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
};
const scheduleReconnect = (): void => {
if (stopped) {
return;
}
clearReconnectTimer();
const delayMs = Math.min(INITIAL_BACKOFF_MS * 2 ** attempt, MAX_BACKOFF_MS);
attempt++;
params.log("6CJX2RLP", `gateway WebSocket reconnect in ${delayMs}ms (attempt ${attempt})`);
reconnectTimer = setTimeout(connect, delayMs);
};
const connect = (): void => {
if (stopped) {
return;
}
clearReconnectTimer();
params.log("2XK7HM9Q", "gateway WebSocket connecting...");
try {
socket = new WebSocket(wsUrl);
} catch (e) {
params.log("7NQW4HBT", `gateway WebSocket create failed: ${String(e)}`);
scheduleReconnect();
return;
}
const ws = socket;
ws.addEventListener("open", () => {
attempt = 0;
params.log("4PWN3V82", "gateway WebSocket connected");
});
ws.addEventListener("close", (ev) => {
socket = null;
params.log(
"8QTR6ZKC",
`gateway WebSocket closed code=${String(ev.code)} reason=${ev.reason} wasClean=${String(ev.wasClean)}`,
);
if (!stopped) {
scheduleReconnect();
}
});
ws.addEventListener("error", () => {
params.log("9BWS1M7F", "gateway WebSocket error");
});
ws.addEventListener("message", (ev) => {
const data = ev.data;
if (typeof data !== "string") {
params.log("T9W2KL5H", "gateway WebSocket non-text frame ignored");
return;
}
void handleGatewayMessage(ws, data, params).catch((e: unknown) => {
params.log("V7KX2M9P", `gateway WebSocket handler error: ${String(e)}`);
});
});
};
connect();
return (): void => {
stopped = true;
clearReconnectTimer();
if (socket !== null && socket.readyState === WebSocket.OPEN) {
socket.close(1000, "shutdown");
}
socket = null;
};
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-agent-cursor",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-agent-hermes",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-agent-llm",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
@@ -0,0 +1,101 @@
import { describe, test, expect, afterAll } from "bun:test";
import { readFileTool, writeFileTool, patchFileTool, shellExecTool } from "../src/tools/index.js";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { readFileSync, unlinkSync, mkdirSync } from "node:fs";
import { randomBytes } from "node:crypto";
const TMP_DIR = join(tmpdir(), `tools-test-${randomBytes(4).toString("hex")}`);
mkdirSync(TMP_DIR, { recursive: true });
const tmpFile = (name: string) => join(TMP_DIR, name);
const cleanupFiles: string[] = [];
afterAll(() => {
for (const f of cleanupFiles) {
try { unlinkSync(f); } catch { /* ignore */ }
}
try { unlinkSync(TMP_DIR); } catch { /* ignore */ }
});
describe("read_file", () => {
test("reads file with line numbers", async () => {
const p = tmpFile("read-test.txt");
cleanupFiles.push(p);
const content = "line1\nline2\nline3\n";
require("node:fs").writeFileSync(p, content);
const result = await readFileTool.handler(JSON.stringify({ path: p, offset: null, limit: null }));
expect(result).toContain("1|line1");
expect(result).toContain("2|line2");
expect(result).toContain("3|line3");
});
test("reads with offset and limit", async () => {
const p = tmpFile("read-test2.txt");
cleanupFiles.push(p);
require("node:fs").writeFileSync(p, "a\nb\nc\nd\ne\n");
const result = await readFileTool.handler(JSON.stringify({ path: p, offset: 2, limit: 2 }));
expect(result).toBe("2|b\n3|c");
});
test("returns error for missing file", async () => {
const result = await readFileTool.handler(JSON.stringify({ path: "/nonexistent/file.txt", offset: null, limit: null }));
expect(result).toContain("Error:");
});
});
describe("write_file", () => {
test("writes file and creates dirs", async () => {
const p = tmpFile("sub/write-test.txt");
cleanupFiles.push(p);
const result = await writeFileTool.handler(JSON.stringify({ path: p, content: "hello world" }));
expect(result).toContain("11 bytes");
expect(readFileSync(p, "utf-8")).toBe("hello world");
});
});
describe("patch_file", () => {
test("patches file content", async () => {
const p = tmpFile("patch-test.txt");
cleanupFiles.push(p);
require("node:fs").writeFileSync(p, "foo bar baz");
const result = await patchFileTool.handler(JSON.stringify({ path: p, old_string: "bar", new_string: "qux" }));
expect(result).toContain("Successfully");
expect(readFileSync(p, "utf-8")).toBe("foo qux baz");
});
test("errors on not found", async () => {
const p = tmpFile("patch-test2.txt");
cleanupFiles.push(p);
require("node:fs").writeFileSync(p, "foo");
const result = await patchFileTool.handler(JSON.stringify({ path: p, old_string: "xyz", new_string: "abc" }));
expect(result).toContain("not found");
});
test("errors on non-unique match", async () => {
const p = tmpFile("patch-test3.txt");
cleanupFiles.push(p);
require("node:fs").writeFileSync(p, "aaa bbb aaa");
const result = await patchFileTool.handler(JSON.stringify({ path: p, old_string: "aaa", new_string: "ccc" }));
expect(result).toContain("not unique");
});
});
describe("shell_exec", () => {
test("runs echo", async () => {
const result = await shellExecTool.handler(JSON.stringify({ command: "echo hello", timeout: null }));
expect(result.trim()).toBe("hello");
});
test("handles timeout", async () => {
const result = await shellExecTool.handler(JSON.stringify({ command: "sleep 10", timeout: 1 }));
expect(result).toContain("timed out");
});
});
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-agent-react",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
@@ -1,2 +1,4 @@
export { createReactAdapter } from "./create-react-adapter.js";
export type { ReactAdapterConfig, ReactToolHandler } from "./types.js";
export { defaultTools, defaultToolHandler } from "./tools/index.js";
export type { ToolEntry, ToolHandler } from "./tools/index.js";
@@ -0,0 +1,16 @@
import type { ToolDefinition } from "@uncaged/workflow-reactor";
import type { ToolEntry } from "./types.js";
import { readFileTool } from "./read-file.js";
import { writeFileTool } from "./write-file.js";
import { patchFileTool } from "./patch-file.js";
import { shellExecTool } from "./shell-exec.js";
const ALL_TOOLS: ToolEntry[] = [readFileTool, writeFileTool, patchFileTool, shellExecTool];
export const defaultTools: readonly ToolDefinition[] = ALL_TOOLS.map((t) => t.definition);
export async function defaultToolHandler(name: string, args: string): Promise<string> {
const entry = ALL_TOOLS.find((t) => t.definition.function.name === name);
if (!entry) return `Unknown tool: ${name}`;
return entry.handler(args);
}
@@ -0,0 +1,6 @@
export { readFileTool } from "./read-file.js";
export { writeFileTool } from "./write-file.js";
export { patchFileTool } from "./patch-file.js";
export { shellExecTool } from "./shell-exec.js";
export { defaultTools, defaultToolHandler } from "./defaults.js";
export type { ToolEntry, ToolHandler } from "./types.js";
@@ -0,0 +1,40 @@
import { readFile, writeFile } from "node:fs/promises";
import type { ToolEntry } from "./types.js";
export const patchFileTool: ToolEntry = {
definition: {
type: "function",
function: {
name: "patch_file",
description: "Find and replace a string in a file (first occurrence only).",
parameters: {
type: "object",
properties: {
path: { type: "string", description: "Path to the file" },
old_string: { type: "string", description: "Text to find" },
new_string: { type: "string", description: "Replacement text" },
},
required: ["path", "old_string", "new_string"],
},
},
},
handler: async (args: string): Promise<string> => {
try {
const parsed = JSON.parse(args) as { path: string; old_string: string; new_string: string };
const content = await readFile(parsed.path, "utf-8");
const firstIdx = content.indexOf(parsed.old_string);
if (firstIdx === -1) {
return `Error: old_string not found in ${parsed.path}`;
}
const secondIdx = content.indexOf(parsed.old_string, firstIdx + 1);
if (secondIdx !== -1) {
return `Error: old_string is not unique in ${parsed.path} (found multiple occurrences)`;
}
const updated = content.slice(0, firstIdx) + parsed.new_string + content.slice(firstIdx + parsed.old_string.length);
await writeFile(parsed.path, updated);
return `Successfully patched ${parsed.path}`;
} catch (err) {
return `Error: ${err instanceof Error ? err.message : String(err)}`;
}
},
};
@@ -0,0 +1,35 @@
import { readFile } from "node:fs/promises";
import type { ToolEntry } from "./types.js";
export const readFileTool: ToolEntry = {
definition: {
type: "function",
function: {
name: "read_file",
description: "Read a text file and return lines with line numbers.",
parameters: {
type: "object",
properties: {
path: { type: "string", description: "Path to the file to read" },
offset: { type: ["number", "null"], description: "Start line number (1-indexed, default: 1)" },
limit: { type: ["number", "null"], description: "Max lines to read (default: all)" },
},
required: ["path"],
},
},
},
handler: async (args: string): Promise<string> => {
try {
const parsed = JSON.parse(args) as { path: string; offset: number | null; limit: number | null };
const content = await readFile(parsed.path, "utf-8");
const allLines = content.split("\n");
const offset = parsed.offset ?? 1;
const start = Math.max(0, offset - 1);
const end = parsed.limit != null ? Math.min(allLines.length, start + parsed.limit) : allLines.length;
const lines = allLines.slice(start, end);
return lines.map((line, i) => `${start + i + 1}|${line}`).join("\n");
} catch (err) {
return `Error: ${err instanceof Error ? err.message : String(err)}`;
}
},
};
@@ -0,0 +1,45 @@
import { execSync } from "node:child_process";
import type { ToolEntry } from "./types.js";
const MAX_OUTPUT = 10000;
export const shellExecTool: ToolEntry = {
definition: {
type: "function",
function: {
name: "shell_exec",
description: "Execute a shell command and return stdout + stderr.",
parameters: {
type: "object",
properties: {
command: { type: "string", description: "Shell command to run" },
timeout: { type: ["number", "null"], description: "Timeout in seconds (default: 30)" },
},
required: ["command"],
},
},
},
handler: async (args: string): Promise<string> => {
try {
const parsed = JSON.parse(args) as { command: string; timeout: number | null };
const timeoutMs = (parsed.timeout ?? 30) * 1000;
const output = execSync(parsed.command, {
encoding: "utf-8",
timeout: timeoutMs,
stdio: ["pipe", "pipe", "pipe"],
maxBuffer: MAX_OUTPUT * 2,
});
return output.length > MAX_OUTPUT ? `${output.slice(0, MAX_OUTPUT)}\n...(truncated)` : output;
} catch (err: unknown) {
if (err && typeof err === "object" && "status" in err && (err as { status: unknown }).status === null) {
return "Error: command timed out";
}
if (err && typeof err === "object" && "stderr" in err) {
const e = err as { stderr: string; stdout: string; status: number };
const combined = `${e.stdout ?? ""}${e.stderr ?? ""}`;
return combined.length > MAX_OUTPUT ? `${combined.slice(0, MAX_OUTPUT)}\n...(truncated)` : combined || `Error: command exited with status ${e.status}`;
}
return `Error: ${err instanceof Error ? err.message : String(err)}`;
}
},
};
@@ -0,0 +1,8 @@
import type { ToolDefinition } from "@uncaged/workflow-reactor";
export type ToolHandler = (args: string) => Promise<string>;
export type ToolEntry = {
definition: ToolDefinition;
handler: ToolHandler;
};
@@ -0,0 +1,32 @@
import { mkdir, writeFile } from "node:fs/promises";
import { dirname } from "node:path";
import type { ToolEntry } from "./types.js";
export const writeFileTool: ToolEntry = {
definition: {
type: "function",
function: {
name: "write_file",
description: "Write content to a file, creating parent directories as needed.",
parameters: {
type: "object",
properties: {
path: { type: "string", description: "Path to write" },
content: { type: "string", description: "File content" },
},
required: ["path", "content"],
},
},
},
handler: async (args: string): Promise<string> => {
try {
const parsed = JSON.parse(args) as { path: string; content: string };
await mkdir(dirname(parsed.path), { recursive: true });
const buf = Buffer.from(parsed.content, "utf-8");
await writeFile(parsed.path, buf);
return `Successfully wrote ${buf.length} bytes to ${parsed.path}`;
} catch (err) {
return `Error: ${err instanceof Error ? err.message : String(err)}`;
}
},
};
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-cas",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"scripts": {
"test": "bun test"
+1
View File
@@ -11,6 +11,7 @@
"dependencies": {
"@dagrejs/dagre": "^3.0.0",
"@xyflow/react": "^12.10.2",
"elkjs": "^0.11.1",
"react": "^19.2.6",
"react-dom": "^19.2.6",
"react-markdown": "^10.1.0",
@@ -26,53 +26,6 @@ function extractWorkflowName(records: readonly ThreadRecord[]): string | null {
return null;
}
type GraphPanelProps = {
descriptor: WorkflowDescriptor;
workflowName: string | null;
nodeStates: Map<string, NodeState>;
onNodeClick: ((roleName: string) => void) | null;
};
function GraphPanel({ descriptor, workflowName, nodeStates, onNodeClick }: GraphPanelProps) {
const [open, setOpen] = useState(true);
const edgeCount = descriptor.graph.edges.length;
return (
<div
className="mb-4 rounded-lg border overflow-hidden"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<button
type="button"
onClick={() => setOpen((v) => !v)}
className="w-full flex items-center justify-between px-3 py-2 text-xs"
style={{ color: "var(--color-text-muted)" }}
>
<span className="font-mono">
{open ? "▼" : "▶"} Workflow graph
{workflowName !== null && (
<span className="ml-2" style={{ color: "var(--color-text)" }}>
{workflowName}
</span>
)}
</span>
<span>
{edgeCount} edge{edgeCount === 1 ? "" : "s"}
</span>
</button>
{open && (
<div style={{ height: 300, width: "100%" }}>
<WorkflowGraph
graph={descriptor.graph}
roles={descriptor.roles}
nodeStates={nodeStates}
onNodeClick={onNodeClick}
/>
</div>
)}
</div>
);
}
function computeNodeStates(records: readonly ThreadRecord[]): Map<string, NodeState> {
const states = new Map<string, NodeState>();
const roleRecords = records.filter(
@@ -227,46 +180,85 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
</p>
)}
{descriptor !== null && descriptor.graph.edges.length > 0 && (
<GraphPanel
descriptor={descriptor}
workflowName={workflowName}
nodeStates={nodeStates}
onNodeClick={handleGraphNodeClick}
/>
)}
<div className="flex gap-4" style={{ minHeight: "calc(100vh - 120px)" }}>
{descriptor !== null && descriptor.graph.edges.length > 0 && (
<div
className="shrink-0"
style={{
width: 280,
position: "sticky",
top: 16,
height: "calc(100vh - 120px)",
alignSelf: "flex-start",
}}
>
<div
className="rounded-lg border h-full flex flex-col overflow-hidden"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div
className="flex items-center justify-between px-3 py-2 text-xs"
style={{ color: "var(--color-text-muted)" }}
>
<span className="font-mono">
Workflow graph
{workflowName !== null && (
<span className="ml-2" style={{ color: "var(--color-text)" }}>
{workflowName}
</span>
)}
</span>
<span>
{descriptor.graph.edges.length} edge
{descriptor.graph.edges.length === 1 ? "" : "s"}
</span>
</div>
<div className="flex-1">
<WorkflowGraph
graph={descriptor.graph}
roles={descriptor.roles}
nodeStates={nodeStates}
onNodeClick={handleGraphNodeClick}
/>
</div>
</div>
</div>
)}
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
{status === "error" && !liveActive && (
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r, i) => {
const key = `${threadId}-${i}`;
if (r.type === "role") {
const isFirstForRole = firstIndexByRole.get(r.role) === i;
const flash = highlightedRole === r.role;
return (
<div
key={key}
ref={(el) => {
if (!isFirstForRole) return;
if (el !== null) firstCardByRoleRef.current.set(r.role, el);
else firstCardByRoleRef.current.delete(r.role);
}}
>
<RecordCard record={r} highlighted={flash} />
</div>
);
}
return <RecordCard key={key} record={r} highlighted={false} />;
})}
<div ref={recordsEndRef} aria-hidden />
<div className="flex-1 min-w-0">
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
{status === "error" && !liveActive && (
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r, i) => {
const key = `${threadId}-${i}`;
if (r.type === "role") {
const isFirstForRole = firstIndexByRole.get(r.role) === i;
const flash = highlightedRole === r.role;
return (
<div
key={key}
ref={(el) => {
if (!isFirstForRole) return;
if (el !== null) firstCardByRoleRef.current.set(r.role, el);
else firstCardByRoleRef.current.delete(r.role);
}}
>
<RecordCard record={r} highlighted={flash} />
</div>
);
}
return <RecordCard key={key} record={r} highlighted={false} />;
})}
<div ref={recordsEndRef} aria-hidden />
</div>
)}
</div>
)}
</div>
</div>
);
}
@@ -2,7 +2,6 @@ import {
BaseEdge,
EdgeLabelRenderer,
type EdgeProps,
getBezierPath,
getSmoothStepPath,
} from "@xyflow/react";
import type { ConditionEdgeData } from "./types.ts";
@@ -21,31 +20,28 @@ export function ConditionEdge(props: EdgeProps) {
data,
markerEnd,
} = props;
const edgeData = data as ConditionEdgeData | undefined;
const edgeData = data as (ConditionEdgeData & { elkLabelX?: number | null; elkLabelY?: number | null }) | undefined;
const isFallback = edgeData?.isFallback ?? false;
const isSelfLoop = source === target;
const [path, labelX, labelY] = isSelfLoop
? getSmoothStepPath({
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
borderRadius: 20,
})
: getBezierPath({
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
});
const [path, defaultLabelX, defaultLabelY] = getSmoothStepPath({
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
borderRadius: isSelfLoop ? 20 : 8,
offset: isSelfLoop ? 50 : undefined,
});
const stroke = isFallback ? "var(--color-text-muted)" : "var(--color-text)";
const stroke = isFallback ? "var(--color-text-muted)" : "var(--color-accent)";
const strokeDasharray = isFallback ? "5 4" : undefined;
const label = edgeData?.condition ?? "";
// Use ELK-computed label position if available, otherwise fall back to ReactFlow default
const labelX = edgeData?.elkLabelX ?? defaultLabelX;
const labelY = edgeData?.elkLabelY ?? defaultLabelY;
return (
<>
@@ -55,19 +51,21 @@ export function ConditionEdge(props: EdgeProps) {
markerEnd={markerEnd}
style={{ stroke, strokeWidth: 1.5, strokeDasharray }}
/>
{edgeData && !isFallback && edgeData.condition !== "" && (
{label !== "" && (
<EdgeLabelRenderer>
<div
className="absolute px-1.5 py-0.5 rounded text-[10px] font-mono pointer-events-auto"
style={{
transform: `translate(-50%, -50%) translate(${labelX}px, ${labelY}px)`,
background: "var(--color-bg)",
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
color: "var(--color-text)",
color: isFallback ? "var(--color-text-muted)" : "var(--color-text)",
whiteSpace: "nowrap",
zIndex: 10,
}}
title={edgeData.conditionDescription ?? undefined}
title={edgeData?.conditionDescription ?? undefined}
>
{edgeData.condition}
{label}
</div>
</EdgeLabelRenderer>
)}
@@ -1,6 +1,6 @@
import Dagre from "@dagrejs/dagre";
import type { Edge, Node } from "@xyflow/react";
import { useMemo } from "react";
import ELK, { type ElkExtendedEdge, type ElkNode } from "elkjs/lib/elk.bundled.js";
import { useEffect, useState } from "react";
import type { WorkflowGraphEdge } from "../../api.ts";
import type { ConditionEdgeData, NodeState, RoleNodeData, TerminalNodeData } from "./types.ts";
@@ -37,6 +37,10 @@ function nodeSize(id: string): { width: number; height: number } {
return { width: ROLE_NODE_WIDTH, height: ROLE_NODE_HEIGHT };
}
function edgeKey(e: WorkflowGraphEdge): string {
return `${e.from}->${e.to}::${e.condition}`;
}
function buildRoleNode(
id: string,
pos: { x: number; y: number },
@@ -68,14 +72,24 @@ function buildTerminalNode(
};
}
function edgeKey(e: WorkflowGraphEdge): string {
return `${e.from}->${e.to}::${e.condition}`;
}
function buildEdge(e: WorkflowGraphEdge): Edge<ConditionEdgeData> {
function buildEdge(e: WorkflowGraphEdge, elkEdgeMap: Map<string, ElkExtendedEdge>): Edge<ConditionEdgeData> {
const isFallback = e.condition === "FALLBACK";
const key = edgeKey(e);
const elkEdge = elkEdgeMap.get(key);
// Extract ELK's computed label position
let labelX: number | null = null;
let labelY: number | null = null;
if (elkEdge?.labels && elkEdge.labels.length > 0) {
const label = elkEdge.labels[0];
if (label.x !== undefined && label.y !== undefined) {
labelX = label.x + (label.width ?? 0) / 2;
labelY = label.y + (label.height ?? 0) / 2;
}
}
return {
id: edgeKey(e),
id: key,
source: e.from,
target: e.to,
type: "condition",
@@ -83,45 +97,111 @@ function buildEdge(e: WorkflowGraphEdge): Edge<ConditionEdgeData> {
condition: e.condition,
conditionDescription: e.conditionDescription,
isFallback,
},
elkLabelX: labelX,
elkLabelY: labelY,
} as ConditionEdgeData,
};
}
export function useLayout(input: LayoutInput): LayoutResult {
return useMemo(() => {
const ids = collectNodeIds(input.edges);
const elk = new ELK();
const g = new Dagre.graphlib.Graph({ multigraph: true }).setDefaultEdgeLabel(() => ({}));
g.setGraph({ rankdir: "TB", nodesep: 60, ranksep: 80 });
async function computeLayout(input: LayoutInput): Promise<LayoutResult> {
const ids = collectNodeIds(input.edges);
for (const id of ids) {
const size = nodeSize(id);
g.setNode(id, { width: size.width, height: size.height });
}
for (const e of input.edges) {
if (e.from === e.to) {
continue;
}
g.setEdge(e.from, e.to, {}, edgeKey(e));
const elkNodes: ElkNode[] = [];
for (const id of ids) {
const size = nodeSize(id);
elkNodes.push({ id, width: size.width, height: size.height });
}
const elkEdges: ElkExtendedEdge[] = input.edges
.filter((e) => e.from !== e.to)
.map((e) => ({
id: edgeKey(e),
sources: [e.from],
targets: [e.to],
labels: e.condition !== ""
? [{ text: e.condition, width: Math.max(e.condition.length * 7 + 16, 60), height: 22 }]
: [],
}));
const graph: ElkNode = {
id: "root",
layoutOptions: {
"elk.algorithm": "layered",
"elk.direction": "DOWN",
// Node spacing
"elk.spacing.nodeNode": "30",
"elk.layered.spacing.nodeNodeBetweenLayers": "50",
// Edge spacing — keep edges apart from each other and from nodes
"elk.spacing.edgeNode": "25",
"elk.spacing.edgeEdge": "15",
"elk.layered.spacing.edgeNodeBetweenLayers": "25",
"elk.layered.spacing.edgeEdgeBetweenLayers": "15",
// Edge routing
"elk.edgeRouting": "ORTHOGONAL",
"elk.layered.mergeEdges": "false",
// Node placement
"elk.layered.nodePlacement.strategy": "NETWORK_SIMPLEX",
// Edge label placement
"elk.edgeLabels.placement": "CENTER",
// Crossing minimization
"elk.layered.crossingMinimization.strategy": "LAYER_SWEEP",
// Compaction
"elk.layered.compaction.postCompaction.strategy": "EDGE_LENGTH",
// Cycle breaking — keep main flow top-to-bottom
"elk.layered.cycleBreaking.strategy": "DEPTH_FIRST",
},
children: elkNodes,
edges: elkEdges,
};
const laid = await elk.layout(graph);
// Build map of ELK edge results for label positions
const elkEdgeMap = new Map<string, ElkExtendedEdge>();
for (const e of laid.edges ?? []) {
elkEdgeMap.set(e.id, e);
}
const nodes: Node[] = [];
for (const child of laid.children ?? []) {
const pos = { x: child.x ?? 0, y: child.y ?? 0 };
const state = input.nodeStates.get(child.id) ?? "default";
if (child.id === START_ID || child.id === END_ID) {
nodes.push(buildTerminalNode(child.id, pos, state));
} else {
nodes.push(buildRoleNode(child.id, pos, input.roles, state));
}
}
Dagre.layout(g);
const edges: Edge[] = input.edges.map((e) => buildEdge(e, elkEdgeMap));
const nodes: Node[] = [];
for (const id of ids) {
const dagNode = g.node(id);
const size = nodeSize(id);
const pos = { x: dagNode.x - size.width / 2, y: dagNode.y - size.height / 2 };
const state = input.nodeStates.get(id) ?? "default";
if (id === START_ID || id === END_ID) {
nodes.push(buildTerminalNode(id, pos, state));
} else {
nodes.push(buildRoleNode(id, pos, input.roles, state));
}
}
const edges: Edge[] = input.edges.map(buildEdge);
return { nodes, edges };
}, [input.edges, input.roles, input.nodeStates]);
return { nodes, edges };
}
const EMPTY_LAYOUT: LayoutResult = { nodes: [], edges: [] };
export function useLayout(input: LayoutInput): LayoutResult {
const [layout, setLayout] = useState<LayoutResult>(EMPTY_LAYOUT);
const edgeJson = JSON.stringify(input.edges);
const roleJson = JSON.stringify(input.roles);
useEffect(() => {
let cancelled = false;
const parsed = {
edges: JSON.parse(edgeJson) as readonly WorkflowGraphEdge[],
roles: JSON.parse(roleJson) as Record<string, { description: string }>,
nodeStates: input.nodeStates,
};
computeLayout(parsed).then((result) => {
if (!cancelled) setLayout(result);
});
return () => {
cancelled = true;
};
}, [edgeJson, roleJson, input.nodeStates]);
return layout;
}
@@ -6,9 +6,11 @@ import {
type NodeTypes,
type OnNodeClick,
ReactFlow,
ReactFlowProvider,
useReactFlow,
} from "@xyflow/react";
import "@xyflow/react/dist/style.css";
import { useMemo } from "react";
import { useEffect, useMemo } from "react";
import type { WorkflowGraph as WorkflowGraphData } from "../../api.ts";
import { ConditionEdge } from "./condition-edge.tsx";
import { RoleNode } from "./role-node.tsx";
@@ -37,12 +39,30 @@ function handleRoleNodeClick(onRoleClick: (roleName: string) => void, node: Node
onRoleClick(node.id);
}
export function WorkflowGraph({ graph, roles, nodeStates, onNodeClick }: Props) {
function WorkflowGraphInner({ graph, roles, nodeStates, onNodeClick }: Props) {
const layout = useLayout({ edges: graph.edges, roles, nodeStates });
const { fitView } = useReactFlow();
const onNodeClickHandler: OnNodeClick | undefined =
onNodeClick !== null ? (_e, node) => handleRoleNodeClick(onNodeClick, node) : undefined;
// Re-fit when layout changes (ELK is async)
// Use requestAnimationFrame + setTimeout to ensure ReactFlow has processed nodes
useEffect(() => {
if (layout.nodes.length > 0) {
let cancelled = false;
requestAnimationFrame(() => {
if (cancelled) return;
setTimeout(() => {
if (!cancelled) fitView({ padding: 0.1, duration: 300 });
}, 300);
});
return () => {
cancelled = true;
};
}
}, [layout.nodes, layout.edges, fitView]);
const styledEdges = useMemo(
() =>
layout.edges.map((e) => ({
@@ -57,15 +77,25 @@ export function WorkflowGraph({ graph, roles, nodeStates, onNodeClick }: Props)
[layout.edges],
);
// Generate a stable key that changes when layout changes, to force ReactFlow remount + fitView
const layoutKey = useMemo(
() => layout.nodes.map((n) => `${n.id}:${n.position.x}:${n.position.y}`).join(","),
[layout.nodes],
);
return (
<ReactFlow
key={layoutKey}
nodes={layout.nodes}
edges={styledEdges}
nodeTypes={nodeTypes}
edgeTypes={edgeTypes}
onNodeClick={onNodeClickHandler}
fitView
fitViewOptions={{ padding: 0.15 }}
fitViewOptions={{ padding: 0.1, minZoom: 0.1, maxZoom: 1.5 }}
minZoom={0.1}
maxZoom={1.5}
defaultViewport={{ x: 0, y: 0, zoom: 0.5 }}
nodesDraggable={false}
nodesConnectable={false}
elementsSelectable={false}
@@ -77,3 +107,11 @@ export function WorkflowGraph({ graph, roles, nodeStates, onNodeClick }: Props)
</ReactFlow>
);
}
export function WorkflowGraph(props: Props) {
return (
<ReactFlowProvider>
<WorkflowGraphInner {...props} />
</ReactFlowProvider>
);
}
@@ -45,38 +45,45 @@ function ExpandedWorkflowBody({
const edgeCount = descriptor !== null ? descriptor.graph.edges.length : 0;
const vc = versionCount(detail);
const hasGraph = descriptor !== null && edgeCount > 0;
return (
<div className="pt-3 space-y-3 border-t" style={{ borderColor: "var(--color-border)" }}>
<div>
<p className="text-sm font-medium" style={{ color: "var(--color-text)" }}>
{detail.name}
<div
className="pt-3 border-t flex gap-4"
style={{ borderColor: "var(--color-border)" }}
>
<div className="space-y-3 shrink-0" style={{ minWidth: 200, maxWidth: 280 }}>
<div>
<p className="text-sm font-medium" style={{ color: "var(--color-text)" }}>
{detail.name}
</p>
<p className="text-xs mt-1 mb-1" style={{ color: "var(--color-text-muted)" }}>
Hash
</p>
<code className="text-xs font-mono block" style={{ color: "var(--color-accent)" }}>
{detail.hash}
</code>
</div>
<p className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{vc} version{vc !== 1 ? "s" : ""}
</p>
<p className="text-xs mt-1 mb-1" style={{ color: "var(--color-text-muted)" }}>
Hash
</p>
<code className="text-xs font-mono block" style={{ color: "var(--color-accent)" }}>
{detail.hash}
</code>
<div>
<p className="text-xs mb-1 font-medium" style={{ color: "var(--color-text-muted)" }}>
Description
</p>
<p className="text-sm whitespace-pre-wrap" style={{ color: "var(--color-text)" }}>
{descriptor !== null && descriptor.description !== ""
? descriptor.description
: descriptor !== null
? "—"
: "No descriptor available for this workflow version."}
</p>
</div>
</div>
<p className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{vc} version{vc !== 1 ? "s" : ""}
</p>
<div>
<p className="text-xs mb-1 font-medium" style={{ color: "var(--color-text-muted)" }}>
Description
</p>
<p className="text-sm whitespace-pre-wrap" style={{ color: "var(--color-text)" }}>
{descriptor !== null && descriptor.description !== ""
? descriptor.description
: descriptor !== null
? "—"
: "No descriptor available for this workflow version."}
</p>
</div>
{descriptor !== null && edgeCount > 0 ? (
{hasGraph ? (
<div
className="rounded-lg border overflow-hidden"
style={{ borderColor: "var(--color-border)", background: "var(--color-bg)" }}
className="rounded-lg border overflow-hidden flex-1"
style={{ borderColor: "var(--color-border)", background: "var(--color-bg)", minHeight: 500 }}
>
<div
className="px-3 py-2 text-xs flex justify-between items-center"
@@ -87,7 +94,7 @@ function ExpandedWorkflowBody({
{edgeCount} edge{edgeCount === 1 ? "" : "s"}
</span>
</div>
<div style={{ height: 300, width: "100%" }}>
<div style={{ height: 600, width: "100%" }}>
<WorkflowGraph
graph={descriptor.graph}
roles={descriptor.roles}
@@ -148,18 +155,17 @@ export function WorkflowList({ agent }: Props) {
);
function toggleExpanded(name: string) {
let shouldLoad = false;
const wasExpanded = expanded.has(name);
setExpanded((prev) => {
const next = new Set(prev);
if (next.has(name)) {
next.delete(name);
return next;
} else {
next.add(name);
}
next.add(name);
shouldLoad = true;
return next;
});
if (shouldLoad) {
if (!wasExpanded) {
ensureDetailLoaded(name);
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-execute",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
+31 -1
View File
@@ -299,7 +299,37 @@ async function driveWorkflowGenerator(params: {
});
}
const iterResult = await gen.next();
const iterResult = await Promise.race([
gen.next(),
new Promise<never>((_, reject) => {
if (executeOptions.signal.aborted) {
reject(new DOMException("The operation was aborted", "AbortError"));
return;
}
executeOptions.signal.addEventListener(
"abort",
() => reject(new DOMException("The operation was aborted", "AbortError")),
{ once: true },
);
}),
]).catch((e) => {
if (e instanceof DOMException && e.name === "AbortError") {
return { done: true as const, value: { returnCode: 130, summary: "thread aborted" } };
}
throw e;
});
if (executeOptions.signal.aborted || (iterResult.done && iterResult.value.returnCode === 130)) {
return await finalizeAbortedThread({
cas,
bundleDir,
threadId,
startHash,
chain,
logger,
abortLogTag: "H4KQ7RW3",
});
}
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
+5 -2
View File
@@ -1,8 +1,11 @@
{
"name": "@uncaged/workflow-gateway",
"version": "0.1.0",
"private": true,
"version": "0.3.11",
"type": "module",
"exports": {
".": "./src/index.ts",
"./ws-protocol": "./src/ws-protocol.ts"
},
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy"
@@ -0,0 +1,162 @@
/** One Durable Object instance per agent name; holds the reverse WebSocket from the agent CLI. */
import { DurableObject } from "cloudflare:workers";
import { parseWsRequestJson, parseWsResponseJson, type WsResponse } from "./ws-protocol.js";
type AgentSocketEnv = {
GATEWAY_SECRET: string;
};
export const AGENT_SOCKET_INTERNAL_STATUS_PATH = "/internal/agent-socket/status";
export const AGENT_SOCKET_INTERNAL_PROXY_PATH = "/internal/agent-socket/proxy";
const PROXY_TIMEOUT_MS = 30_000;
type PendingEntry = {
resolve: (r: Response) => void;
timer: ReturnType<typeof setTimeout>;
};
function jsonResponse(status: number, body: unknown): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "Content-Type": "application/json" },
});
}
function wsResponseToHttp(wr: WsResponse): Response {
const headers = new Headers();
for (const [k, v] of Object.entries(wr.headers)) {
headers.set(k, v);
}
return new Response(wr.body, { status: wr.status, headers });
}
export class AgentSocket extends DurableObject<AgentSocketEnv> {
private readonly pending = new Map<string, PendingEntry>();
private requireAuth(request: Request): Response | null {
const auth = request.headers.get("Authorization");
if (auth !== `Bearer ${this.env.GATEWAY_SECRET}`) {
return jsonResponse(401, { error: "unauthorized" });
}
return null;
}
private handleStatusGet(request: Request): Response {
const denied = this.requireAuth(request);
if (denied !== null) {
return denied;
}
const sockets = this.ctx.getWebSockets();
const connected = sockets.length > 0;
return new Response(JSON.stringify({ connected, connectedCount: sockets.length }), {
headers: { "Content-Type": "application/json" },
});
}
private async handleProxyPost(request: Request): Promise<Response> {
const denied = this.requireAuth(request);
if (denied !== null) {
return denied;
}
const raw = await request.text();
const wsRequest = parseWsRequestJson(raw);
if (wsRequest === null) {
return jsonResponse(400, { error: "invalid proxy body" });
}
const sockets = this.ctx.getWebSockets();
const ws = sockets[0];
if (ws === undefined) {
return jsonResponse(503, { error: "no active websocket" });
}
return await new Promise<Response>((resolve) => {
const timer = setTimeout(() => {
this.pending.delete(wsRequest.id);
resolve(jsonResponse(504, { error: "gateway timeout" }));
}, PROXY_TIMEOUT_MS);
this.pending.set(wsRequest.id, {
resolve: (r: Response) => {
clearTimeout(timer);
this.pending.delete(wsRequest.id);
resolve(r);
},
timer,
});
try {
ws.send(JSON.stringify(wsRequest));
} catch {
clearTimeout(timer);
this.pending.delete(wsRequest.id);
resolve(jsonResponse(502, { error: "websocket send failed" }));
}
});
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === AGENT_SOCKET_INTERNAL_STATUS_PATH && request.method === "GET") {
return this.handleStatusGet(request);
}
if (url.pathname === AGENT_SOCKET_INTERNAL_PROXY_PATH && request.method === "POST") {
return this.handleProxyPost(request);
}
if (request.headers.get("Upgrade") !== "websocket") {
return new Response("expected WebSocket upgrade", { status: 426 });
}
for (const ws of this.ctx.getWebSockets()) {
ws.close(1000, "replaced by new connection");
}
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
this.ctx.acceptWebSocket(server);
return new Response(null, { status: 101, webSocket: client });
}
async webSocketMessage(_ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const text = typeof message === "string" ? message : new TextDecoder().decode(message);
const wr = parseWsResponseJson(text);
if (wr === null) {
return;
}
const entry = this.pending.get(wr.id);
if (entry === undefined) {
return;
}
clearTimeout(entry.timer);
this.pending.delete(wr.id);
entry.resolve(wsResponseToHttp(wr));
}
async webSocketClose(
_ws: WebSocket,
_code: number,
_reason: string,
_wasClean: boolean,
): Promise<void> {
this.rejectAllPending("agent websocket closed");
}
async webSocketError(_ws: WebSocket, _error: unknown): Promise<void> {
this.rejectAllPending("agent websocket error");
}
private rejectAllPending(message: string): void {
const entries = [...this.pending.values()];
this.pending.clear();
for (const entry of entries) {
clearTimeout(entry.timer);
entry.resolve(jsonResponse(502, { error: message }));
}
}
}
+203 -16
View File
@@ -1,11 +1,21 @@
import { Hono } from "hono";
import { cors } from "hono/cors";
import {
AGENT_SOCKET_INTERNAL_PROXY_PATH,
AGENT_SOCKET_INTERNAL_STATUS_PATH,
AgentSocket,
} from "./agent-socket.js";
import type { WsRequest } from "./ws-protocol.js";
export { AgentSocket };
type Env = {
Bindings: {
ENDPOINTS: KVNamespace;
GATEWAY_SECRET: string;
DASHBOARD_API_KEY: string;
AGENT_SOCKET: DurableObjectNamespace<AgentSocket>;
};
};
@@ -33,9 +43,165 @@ function checkDashboardAuth(c: {
return key === c.env.DASHBOARD_API_KEY;
}
function isLocalAgentUrl(url: string): boolean {
try {
const u = new URL(url);
return u.hostname === "localhost" || u.hostname === "127.0.0.1";
} catch {
return false;
}
}
function buildForwardHeaders(raw: Headers, agentToken: string): Record<string, string> {
const out: Record<string, string> = {};
for (const [key, value] of raw) {
const lower = key.toLowerCase();
if (lower === "host" || lower === "authorization") {
continue;
}
if (
lower === "connection" ||
lower === "keep-alive" ||
lower === "proxy-connection" ||
lower === "transfer-encoding" ||
lower === "upgrade"
) {
continue;
}
out[key] = value;
}
if (agentToken !== "") {
out["X-Agent-Token"] = agentToken;
}
return out;
}
function buildDashboardProxyHeaders(raw: Headers, token: string): Headers {
const headers = new Headers(raw);
headers.delete("host");
headers.delete("Authorization");
if (token !== "") {
headers.set("X-Agent-Token", token);
}
return headers;
}
async function readBodyForWsProxy(method: string, req: Request): Promise<string | null> {
if (method === "GET" || method === "HEAD") {
return null;
}
const buf = await req.arrayBuffer();
return buf.byteLength === 0 ? null : new TextDecoder().decode(buf);
}
async function fetchThroughAgentSocket(
bindings: Env["Bindings"],
agent: string,
gateSecret: string,
wsRequest: WsRequest,
): Promise<Response> {
const stub = bindings.AGENT_SOCKET.get(bindings.AGENT_SOCKET.idFromName(agent));
return stub.fetch(
new Request(`https://do.internal${AGENT_SOCKET_INTERNAL_PROXY_PATH}`, {
method: "POST",
headers: {
Authorization: `Bearer ${gateSecret}`,
"Content-Type": "application/json",
},
body: JSON.stringify(wsRequest),
}),
);
}
async function fetchAgentWithRecordHeaders(
targetUrl: string,
method: string,
forwardRecord: Record<string, string>,
bodyStr: string | null,
): Promise<Response> {
const headers = new Headers();
for (const [k, v] of Object.entries(forwardRecord)) {
headers.set(k, v);
}
return fetch(targetUrl, {
method,
headers,
body: method !== "GET" && method !== "HEAD" ? (bodyStr ?? undefined) : undefined,
});
}
async function fetchAgentWithDashboardHeaders(
targetUrl: string,
method: string,
headers: Headers,
rawBody: BodyInit | null | undefined,
): Promise<Response> {
return fetch(targetUrl, {
method,
headers,
body: method !== "GET" && method !== "HEAD" ? rawBody : undefined,
});
}
async function fetchAgentSocketStatus(
env: Env["Bindings"],
name: string,
): Promise<{ ok: true; connected: boolean } | { ok: false }> {
try {
const id = env.AGENT_SOCKET.idFromName(name);
const stub = env.AGENT_SOCKET.get(id);
const resp = await stub.fetch(
new Request(`https://do${AGENT_SOCKET_INTERNAL_STATUS_PATH}`, {
method: "GET",
headers: { Authorization: `Bearer ${env.GATEWAY_SECRET}` },
}),
);
if (!resp.ok) {
return { ok: false };
}
const body = (await resp.json()) as { connected: boolean };
return { ok: true, connected: body.connected };
} catch {
return { ok: false };
}
}
function endpointStatusFromKvAndDo(record: EndpointRecord, doConnected: boolean | null): string {
if (doConnected === true) {
return "online";
}
if (doConnected === false) {
if (isLocalAgentUrl(record.url)) {
return "offline";
}
const age = Date.now() - record.lastHeartbeat;
return age < TTL_SECONDS * 1000 ? "online" : "offline";
}
const age = Date.now() - record.lastHeartbeat;
return age < TTL_SECONDS * 1000 ? "online" : "offline";
}
// ── Health ──────────────────────────────────────────────────────────
app.get("/healthz", (c) => c.json({ ok: true }));
// ── Agent reverse WebSocket (GATEWAY_SECRET query param) ────────────
app.get("/ws/connect", async (c) => {
const secret = c.req.query("secret");
const name = c.req.query("name");
if (name === undefined || name === "") {
return c.json({ error: "name required" }, 400);
}
if (secret !== c.env.GATEWAY_SECRET) {
return c.json({ error: "unauthorized" }, 401);
}
if (c.req.header("Upgrade") !== "websocket") {
return c.text("expected WebSocket upgrade", 426);
}
const id = c.env.AGENT_SOCKET.idFromName(name);
const stub = c.env.AGENT_SOCKET.get(id);
return stub.fetch(c.req.raw);
});
// ── Gateway management (GATEWAY_SECRET auth) ────────────────────────
const gateway = new Hono<Env>();
@@ -95,11 +261,12 @@ gateway.get("/endpoints", async (c) => {
for (const key of list.keys) {
const record = await c.env.ENDPOINTS.get<EndpointRecord>(key.name, "json");
if (record) {
const age = Date.now() - record.lastHeartbeat;
const doStatus = await fetchAgentSocketStatus(c.env, record.name);
const doConnected = doStatus.ok ? doStatus.connected : null;
endpoints.push({
name: record.name,
url: record.url,
status: age < TTL_SECONDS * 1000 ? "online" : "offline",
status: endpointStatusFromKvAndDo(record, doConnected),
lastHeartbeat: record.lastHeartbeat,
});
}
@@ -110,7 +277,7 @@ gateway.get("/endpoints", async (c) => {
app.route("/api/gateway", gateway);
// ── API proxy: /api/agents/:agent/* → agent's tunnel URL (dashboard auth) ──
// ── API proxy: /api/agents/:agent/* → WebSocket (preferred) or agent tunnel URL (dashboard auth) ──
app.all("/api/agents/:agent/*", async (c) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
const agent = c.req.param("agent");
@@ -120,26 +287,45 @@ app.all("/api/agents/:agent/*", async (c) => {
return c.json({ error: "agent not found" }, 404);
}
// Build target URL: strip /api/:agent prefix, forward the rest
const url = new URL(c.req.url);
const pathAfterAgent = url.pathname.replace(`/api/agents/${agent}`, "");
const targetUrl = `${record.url}/api${pathAfterAgent}${url.search}`;
const proxyPath = `/api${pathAfterAgent}${url.search}`;
const method = c.req.method;
const token = record.agentToken ?? "";
const forwardRecord = buildForwardHeaders(c.req.raw.headers, token);
const headers = new Headers(c.req.raw.headers);
headers.delete("host");
headers.delete("Authorization"); // don't forward dashboard key to agent
if (record.agentToken) {
headers.set("X-Agent-Token", record.agentToken);
const doStatus = await fetchAgentSocketStatus(c.env, agent);
if (doStatus.ok && doStatus.connected) {
const bodyStr = await readBodyForWsProxy(method, c.req.raw);
const wsRequest: WsRequest = {
id: crypto.randomUUID(),
method,
path: proxyPath,
headers: forwardRecord,
body: bodyStr,
};
const proxyResp = await fetchThroughAgentSocket(c.env, agent, c.env.GATEWAY_SECRET, wsRequest);
if (proxyResp.status !== 503) {
return new Response(proxyResp.body, {
status: proxyResp.status,
headers: proxyResp.headers,
});
}
try {
const resp = await fetchAgentWithRecordHeaders(targetUrl, method, forwardRecord, bodyStr);
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
});
} catch (err) {
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
}
}
const headers = buildDashboardProxyHeaders(c.req.raw.headers, token);
try {
const resp = await fetch(targetUrl, {
method: c.req.method,
headers,
body: c.req.method !== "GET" && c.req.method !== "HEAD" ? c.req.raw.body : undefined,
});
// Stream response back
const resp = await fetchAgentWithDashboardHeaders(targetUrl, method, headers, c.req.raw.body);
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
@@ -149,4 +335,5 @@ app.all("/api/agents/:agent/*", async (c) => {
}
});
// biome-ignore lint/style/noDefaultExport: Cloudflare Workers entry expects default export
export default app;
@@ -0,0 +1,93 @@
/** Wire format for HTTP-over-WebSocket proxy between gateway Durable Object and local serve. */
export type WsRequest = {
id: string;
method: string;
path: string;
headers: Record<string, string>;
body: string | null;
};
export type WsResponse = {
id: string;
status: number;
headers: Record<string, string>;
body: string;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isNonEmptyString(value: unknown): value is string {
return typeof value === "string" && value.length > 0;
}
/** Parse and validate a JSON payload as {@link WsRequest}. */
export function parseWsRequestJson(raw: string): WsRequest | null {
let parsed: unknown;
try {
parsed = JSON.parse(raw) as unknown;
} catch {
return null;
}
if (!isRecord(parsed)) {
return null;
}
const id = parsed.id;
const method = parsed.method;
const path = parsed.path;
const headers = parsed.headers;
const body = parsed.body;
if (!isNonEmptyString(id) || !isNonEmptyString(method) || !isNonEmptyString(path)) {
return null;
}
if (!isRecord(headers)) {
return null;
}
const headerRecord: Record<string, string> = {};
for (const [k, v] of Object.entries(headers)) {
if (typeof v !== "string") {
return null;
}
headerRecord[k] = v;
}
if (body !== null && typeof body !== "string") {
return null;
}
return { id, method, path, headers: headerRecord, body: body === null ? null : body };
}
/** Parse and validate a JSON payload as {@link WsResponse}. */
export function parseWsResponseJson(raw: string): WsResponse | null {
let parsed: unknown;
try {
parsed = JSON.parse(raw) as unknown;
} catch {
return null;
}
if (!isRecord(parsed)) {
return null;
}
const id = parsed.id;
const status = parsed.status;
const headers = parsed.headers;
const respBody = parsed.body;
if (!isNonEmptyString(id) || typeof status !== "number" || !Number.isFinite(status)) {
return null;
}
if (!isRecord(headers)) {
return null;
}
const headerRecord: Record<string, string> = {};
for (const [k, v] of Object.entries(headers)) {
if (typeof v !== "string") {
return null;
}
headerRecord[k] = v;
}
if (typeof respBody !== "string") {
return null;
}
return { id, status: Math.trunc(status), headers: headerRecord, body: respBody };
}
+7
View File
@@ -6,4 +6,11 @@ compatibility_date = "2025-04-01"
binding = "ENDPOINTS"
id = "88b118d1cfab4c049f9c1684848811a3"
[durable_objects]
bindings = [{ name = "AGENT_SOCKET", class_name = "AgentSocket" }]
[[migrations]]
tag = "add-agent-socket"
new_sqlite_classes = ["AgentSocket"]
# GATEWAY_SECRET is set via `wrangler secret put`
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-protocol",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-reactor",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-register",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-runtime",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-template-develop",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-template-solve-issue",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-util-agent",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@uncaged/workflow-util",
"version": "0.3.5",
"version": "0.3.11",
"type": "module",
"exports": {
".": {