Compare commits

...

19 Commits

Author SHA1 Message Date
xiaoju d6fe3f844c fix: detect crashed threads as failed instead of stuck running
- resolveThreadListStatus() checks CAS chain for __end__ node
- Stale .running markers no longer cause false 'running' status
- Distinguish 'failed' (returnCode != 0) from 'completed'
- Worker signal handlers (SIGINT/SIGTERM) clean up .running files
- listRunningThreads filters out terminated threads with stale markers

Fixes #170

小橘 <xiaoju@shazhou.work>
2026-05-09 12:28:33 +00:00
xiaoju d0803019b5 feat: ephemeral agent token for serve ↔ gateway auth
- serve generates random UUID on startup
- registration sends agentToken to gateway, stored in KV
- gateway injects X-Agent-Token header when proxying to agent
- serve rejects /api/* requests without valid token
- healthz remains unauthenticated
- tunnel URL is now protected — direct access returns 401

小橘 <xiaoju@shazhou.work>
2026-05-09 12:05:10 +00:00
xiaoju f16e7641fd chore: add .env.production for dashboard gateway URL
小橘 <xiaoju@shazhou.work>
2026-05-09 11:58:51 +00:00
xiaoju 3b41625001 feat: dashboard API key authentication
- Gateway: DASHBOARD_API_KEY middleware on /endpoints and /api/* routes
- Dashboard: login page with key validation, stored in localStorage
- SSE: key passed as ?key= query param (EventSource can't set headers)
- Sidebar: logout button to clear key

Refs #169
小橘 <xiaoju@shazhou.work>
2026-05-09 11:56:25 +00:00
xiaoju c602d2284b fix(dashboard): pass content as children to ReactMarkdown
Self-closing <ReactMarkdown /> renders nothing — need children.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:58:33 +00:00
xiaoju d96e10b0fc feat(dashboard): structured record rendering with markdown support (#169)
- API returns structured fields for thread-start (workflow, prompt, status)
  and workflow-result (returnCode, content, timestamp)
- New RecordCard component renders by type:
  - StartCard: workflow name badge + prompt blockquote
  - RoleMessage: role-colored badges (preparer/agent/extractor) + markdown
  - ResultCard: success/fail status badge + summary
- Added react-markdown + shiki for markdown rendering with syntax highlighting
- Replaces generic <pre> blocks with proper structured rendering

Refs #169
小橘 <xiaoju@shazhou.work>
2026-05-09 10:41:13 +00:00
xiaoju 8e36d3e1f5 fix: use getContentMerklePayload to extract prompt text
Was showing raw YAML of the CAS node instead of the payload string.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:34:43 +00:00
xiaoju bbe4fe0ed1 fix: include prompt text in thread-start record
Read prompt from StartNode refs[0] CAS blob and display it.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:32:59 +00:00
xiaoju e105c5cac1 fix: show workflow name instead of bundle hash in thread-start record
小橘 <xiaoju@shazhou.work>
2026-05-09 10:31:08 +00:00
xiaoju 578776fccf fix: add standard fields to thread-start record
小橘 <xiaoju@shazhou.work>
2026-05-09 10:27:03 +00:00
xiaoju cb756a999a fix: normalize workflow-result records to match ThreadRecord shape
Both REST and SSE endpoints now return workflow-result with standard
fields (role, content, timestamp) instead of non-standard (summary).
Fixes 'Invalid Date' and empty content in dashboard.

小橘 <xiaoju@shazhou.work>
2026-05-09 10:24:48 +00:00
xiaoju e0577ceefe fix: add /api/healthz alias for gateway proxy health check
Gateway proxies /api/neko/healthz → /api/healthz on the agent,
but healthz was only on /healthz. Dashboard status bar showed
permanent Offline.

小橘 🍊(NEKO Team)
2026-05-09 10:05:46 +00:00
xiaoju 024dd8c1e8 Merge pull request 'feat: auto-tunnel + CF Worker gateway + dashboard multi-agent' (#168) from feat/164-cf-worker-gateway into main 2026-05-09 10:02:36 +00:00
xiaoju 9e98119145 feat: dashboard multi-agent support + CF Pages deploy
Phase C of #164:
- Dashboard fetches agents from gateway /endpoints
- Sidebar shows agent selector with online/offline status
- All API calls routed through gateway /api/:agent/*
- Hash routing: #agent/threads/id format
- SSE live streaming via gateway proxy
- VITE_GATEWAY_URL env var for gateway configuration
- Deployed to CF Pages: workflow-dashboard-54r.pages.dev
- Custom domain: workflow.shazhou.work (pending SSL)

Ref: #164, closes #167

小橘 🍊(NEKO Team)
2026-05-09 10:01:27 +00:00
xiaoju fd8943f131 feat: serve auto-tunnel + gateway registration
Phase B of #164:
- serve --name <agent> starts cloudflared quick tunnel automatically
- Registers with CF Worker gateway, heartbeat every 60s
- Graceful unregister on SIGINT/SIGTERM
- --no-tunnel flag for local dev
- Default name from hostname

Ref: #164, closes #166

小橘 🍊(NEKO Team)
2026-05-09 09:53:08 +00:00
xiaoju f7253d5948 feat: CF Worker API gateway with KV endpoint registry
Phase A of #164:
- Hono-based CF Worker at workflow-gateway.shazhou.workers.dev
- POST /register — agent registration with shared secret
- DELETE /register/:name — unregister
- GET /endpoints — list online agents
- GET /api/:agent/* — proxy to agent tunnel URL
- KV-backed with TTL auto-expiry

Ref: #164, closes #165

小橘 🍊(NEKO Team)
2026-05-09 09:48:49 +00:00
xiaoju 1c5636c270 Merge pull request 'fix: content node refs field + backward compat' (#163) from fix/161-162-cas-content-refs into main 2026-05-09 09:10:09 +00:00
xiaoju ca0403c8ab fix: content node refs field + thread head update
Fixes #161

Fixes #162

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 08:53:02 +00:00
xiaoju aa25f55f63 fix: add workflow-protocol and workflow-util to bundle validator allowlist
小橘 <xiaoju@shazhou.work>
2026-05-09 08:36:39 +00:00
32 changed files with 1221 additions and 161 deletions
@@ -8,7 +8,7 @@ import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string): Hono {
export function createApp(storageRoot: string, agentToken: string | null): Hono {
const app = new Hono();
app.onError((_err, c) => {
@@ -37,7 +37,19 @@ export function createApp(storageRoot: string): Hono {
await next();
});
// ── Agent token auth (skip healthz) ───────────────────────────────
if (agentToken !== null) {
app.use("/api/*", async (c, next) => {
const token = c.req.header("X-Agent-Token");
if (token !== agentToken) {
return c.json({ error: "unauthorized" }, 401);
}
await next();
});
}
app.get("/healthz", (c) => c.json({ ok: true }));
app.get("/api/healthz", (c) => c.json({ ok: true }));
app.route("/api/workflows", createWorkflowRoutes(storageRoot));
app.route("/api/threads", createThreadRoutes(storageRoot));
@@ -118,7 +118,7 @@ async function emitRecordsForHead(params: {
params.eventId.n++;
await params.stream.writeSSE({
event: "record",
data: JSON.stringify({ type: "workflow-result", ...wf }),
data: JSON.stringify({ type: "workflow-result", returnCode: wf.returnCode, content: wf.summary, timestamp: null }),
id: String(params.eventId.n),
});
return true;
@@ -1,5 +1,5 @@
import { join } from "node:path";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createCasStore, getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
@@ -10,11 +10,29 @@ import type { ResolvedThreadRecord } from "../../thread-scan.js";
import {
listHistoricalThreads,
listRunningThreads,
resolveThreadListStatus,
resolveThreadRecord,
} from "../../thread-scan.js";
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
import { cmdRun } from "../thread/run.js";
async function readStartInfo(
cas: ReturnType<typeof createCasStore>,
startHash: string,
): Promise<{ name: string | null; prompt: string | null }> {
const raw = await cas.get(startHash);
if (raw === null) return { name: null, prompt: null };
const parsed = parseCasThreadNode(raw);
if (parsed === null || parsed.kind !== "start") return { name: null, prompt: null };
const name = parsed.node.payload.name;
const promptHash = parsed.node.refs[0] ?? null;
let prompt: string | null = null;
if (promptHash !== null) {
prompt = await getContentMerklePayload(cas, promptHash);
}
return { name, prompt };
}
async function buildThreadDetailRecords(
storageRoot: string,
resolved: ResolvedThreadRecord,
@@ -23,14 +41,16 @@ async function buildThreadDetailRecords(
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const { name: workflowName, prompt } = await readStartInfo(cas, resolved.start);
const records: unknown[] = [
{
type: "thread-start",
workflow: workflowName ?? "unknown",
prompt: prompt ?? null,
threadId: resolved.threadId,
bundleHash: resolved.bundleHash,
head: resolved.head,
start: resolved.start,
source: resolved.source,
status: resolved.source,
timestamp: null,
},
];
@@ -42,7 +62,12 @@ async function buildThreadDetailRecords(
const returnCode = fr.payload.meta.returnCode;
const summary = fr.payload.meta.summary;
if (typeof returnCode === "number" && typeof summary === "string") {
records.push({ type: "workflow-result", returnCode, summary });
records.push({
type: "workflow-result",
returnCode,
content: summary,
timestamp: fr.payload.timestamp,
});
}
continue;
}
@@ -73,8 +98,8 @@ export function createThreadRoutes(storageRoot: string): Hono {
const threads = await Promise.all(
rows.map(async (r) => {
const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`);
const isRunning = await pathExists(runningPath);
const status = r.source === "history" ? "completed" : isRunning ? "running" : "active";
const runningMarkerPresent = await pathExists(runningPath);
const status = await resolveThreadListStatus(storageRoot, r, runningMarkerPresent);
return {
threadId: r.threadId,
workflow: r.workflowName,
+103 -13
View File
@@ -1,12 +1,23 @@
import { randomUUID } from "node:crypto";
import { hostname as osHostname } from "node:os";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { serve } from "bun";
import { printCliLine } from "../../cli-output.js";
import { createApp } from "./app.js";
import {
registerWithGateway,
startHeartbeat,
startTunnel,
unregisterFromGateway,
} from "./tunnel.js";
import type { ServeOptions } from "./types.js";
export function startServer(storageRoot: string, options: ServeOptions): void {
const app = createApp(storageRoot);
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
const HEARTBEAT_INTERVAL_MS = 60_000;
export function startServer(storageRoot: string, options: ServeOptions, agentToken: string | null): void {
const app = createApp(storageRoot, agentToken);
const server = serve({
fetch: app.fetch,
@@ -28,30 +39,51 @@ function parsePortValue(value: string | undefined): Result<number, string> {
return ok(parsed);
}
function requireNextArg(argv: string[], i: number, flag: string): Result<string, string> {
const next = argv[i + 1];
if (next === undefined) {
return err(`${flag} requires a value`);
}
return ok(next);
}
function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
let port = 7860;
let hostname = "127.0.0.1";
let name = osHostname().split(".")[0].toLowerCase();
let noTunnel = false;
let gatewayUrl = DEFAULT_GATEWAY_URL;
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
const stringFlags: Record<string, (v: string) => void> = {
"--host": (v) => {
hostname = v;
},
"--name": (v) => {
name = v;
},
"--gateway": (v) => {
gatewayUrl = v;
},
};
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === "--port" || arg === "-p") {
const portResult = parsePortValue(argv[i + 1]);
if (!portResult.ok) {
return portResult;
}
if (!portResult.ok) return portResult;
port = portResult.value;
i++;
} else if (arg === "--host") {
const next = argv[i + 1];
if (next === undefined) {
return err("--host requires a value");
}
hostname = next;
} else if (arg === "--no-tunnel") {
noTunnel = true;
} else if (arg in stringFlags) {
const r = requireNextArg(argv, i, arg);
if (!r.ok) return r;
stringFlags[arg](r.value);
i++;
}
}
return ok({ port, hostname });
return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret });
}
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
@@ -61,7 +93,65 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
return 1;
}
startServer(storageRoot, parsed.value);
const options = parsed.value;
const agentToken = options.noTunnel ? null : randomUUID();
startServer(storageRoot, options, agentToken);
if (options.noTunnel) {
printCliLine("tunnel disabled (--no-tunnel)");
await new Promise(() => {});
return 0;
}
// Start cloudflared quick tunnel
printCliLine("starting cloudflared quick tunnel...");
const tunnel = await startTunnel(options.port);
if (!tunnel) {
printCliLine("failed to create tunnel — continuing without gateway registration");
await new Promise(() => {});
return 0;
}
printCliLine(`tunnel: ${tunnel.url}`);
// Register with gateway
if (options.gatewaySecret) {
const registered = await registerWithGateway(
options.gatewayUrl,
options.name,
tunnel.url,
options.gatewaySecret,
agentToken!,
);
if (registered) {
printCliLine(`registered with gateway as "${options.name}"`);
}
// Start heartbeat
const heartbeatTimer = startHeartbeat(
options.gatewayUrl,
options.name,
tunnel.url,
options.gatewaySecret,
agentToken!,
HEARTBEAT_INTERVAL_MS,
);
// Cleanup on exit
const cleanup = async () => {
clearInterval(heartbeatTimer);
printCliLine("unregistering from gateway...");
await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret);
tunnel.process.kill();
process.exit(0);
};
process.on("SIGINT", cleanup);
process.on("SIGTERM", cleanup);
} else {
printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration");
}
// Keep process alive
await new Promise(() => {});
@@ -0,0 +1,88 @@
import { printCliLine } from "../../cli-output.js";
type TunnelHandle = {
process: ReturnType<typeof Bun.spawn>;
url: string;
};
export async function startTunnel(port: number): Promise<TunnelHandle | null> {
const proc = Bun.spawn(["cloudflared", "tunnel", "--url", `http://localhost:${port}`], {
stdout: "pipe",
stderr: "pipe",
});
// cloudflared prints the URL to stderr
const reader = proc.stderr.getReader();
const decoder = new TextDecoder();
let buffer = "";
const deadline = Date.now() + 30_000;
while (Date.now() < deadline) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const match = buffer.match(/https:\/\/[a-z0-9-]+\.trycloudflare\.com/);
if (match) {
// Release the reader so stderr keeps flowing without backpressure
reader.releaseLock();
return { process: proc, url: match[0] };
}
}
reader.releaseLock();
proc.kill();
return null;
}
export async function registerWithGateway(
gatewayUrl: string,
name: string,
tunnelUrl: string,
secret: string,
agentToken: string,
): Promise<boolean> {
try {
const resp = await fetch(`${gatewayUrl}/register`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name, url: tunnelUrl, secret, agentToken }),
});
if (!resp.ok) {
const body = await resp.text();
printCliLine(`gateway registration failed: ${resp.status} ${body}`);
return false;
}
return true;
} catch (e) {
printCliLine(`gateway registration error: ${e}`);
return false;
}
}
export async function unregisterFromGateway(
gatewayUrl: string,
name: string,
secret: string,
): Promise<void> {
try {
await fetch(`${gatewayUrl}/register/${name}`, {
method: "DELETE",
headers: { Authorization: `Bearer ${secret}` },
});
} catch {
// Best effort — process is exiting
}
}
export function startHeartbeat(
gatewayUrl: string,
name: string,
tunnelUrl: string,
secret: string,
agentToken: string,
intervalMs: number,
): ReturnType<typeof setInterval> {
return setInterval(() => {
registerWithGateway(gatewayUrl, name, tunnelUrl, secret, agentToken).catch(() => {});
}, intervalMs);
}
@@ -1,4 +1,8 @@
export type ServeOptions = {
port: number;
hostname: string;
name: string;
noTunnel: boolean;
gatewayUrl: string;
gatewaySecret: string;
};
+74 -4
View File
@@ -5,7 +5,9 @@ import {
readThreadsIndex,
type ThreadHistoryEntry,
type ThreadIndex,
walkStateFramesNewestFirst,
} from "@uncaged/workflow-execute";
import { END } from "@uncaged/workflow-runtime";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
@@ -98,6 +100,8 @@ export type HistoricalThreadRow = {
source: "active" | "history";
/** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */
activityTs: number;
/** Current CAS head (`threads.json` / history row). */
head: string;
};
export type ResolvedThreadRecord = {
@@ -172,6 +176,73 @@ export async function resolveThreadRecord(
return null;
}
export type ThreadHeadTerminal =
| { kind: "non-terminal" }
| { kind: "terminal"; returnCode: number };
/** True when the newest frame at `headHash` is `__end__` (workflow finished in CAS). */
export async function readThreadTerminalFromHead(
storageRoot: string,
headHash: string,
): Promise<ThreadHeadTerminal> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const frames = await walkStateFramesNewestFirst(cas, headHash);
const newest = frames[0];
if (newest === undefined) {
return { kind: "non-terminal" };
}
if (newest.payload.role !== END) {
return { kind: "non-terminal" };
}
const rc = newest.payload.meta.returnCode;
if (typeof rc !== "number") {
return { kind: "terminal", returnCode: 1 };
}
return { kind: "terminal", returnCode: rc };
}
export type ThreadListStatus = "running" | "active" | "completed" | "failed";
/** Combines `.running` marker with CAS head: stale markers do not imply `running`. */
export async function resolveThreadListStatus(
storageRoot: string,
row: HistoricalThreadRow,
runningMarkerPresent: boolean,
): Promise<ThreadListStatus> {
const terminal = await readThreadTerminalFromHead(storageRoot, row.head);
if (terminal.kind === "terminal") {
return terminal.returnCode !== 0 ? "failed" : "completed";
}
if (row.source === "history") {
return "completed";
}
if (runningMarkerPresent) {
return "running";
}
return "active";
}
async function appendRunningThreadRowIfLive(
storageRoot: string,
hash: string,
threadId: string,
out: RunningThreadRow[],
): Promise<void> {
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null && resolved.bundleHash !== hash) {
return;
}
if (resolved !== null) {
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return;
}
}
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
@@ -196,10 +267,7 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
continue;
}
const threadId = fileName.slice(0, -".running".length);
const resolved = await resolveThreadRecord(storageRoot, threadId);
const workflowName =
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
out.push({ threadId, hash, workflowName });
await appendRunningThreadRowIfLive(storageRoot, hash, threadId, out);
}
}
@@ -253,6 +321,7 @@ export async function listHistoricalThreads(
workflowName,
source: "active",
activityTs: entry.updatedAt,
head: entry.head,
});
}
@@ -287,6 +356,7 @@ export async function listHistoricalThreads(
workflowName,
source: "history",
activityTs: e.completedAt,
head: e.head,
});
}
}
+20 -1
View File
@@ -6,6 +6,7 @@ import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
import { readThreadTerminalFromHead, resolveThreadRecord } from "./thread-scan.js";
export type WorkerCtl = {
pid: number;
@@ -269,7 +270,25 @@ export async function resolveRunningHashForThread(
if (!(await pathExists(logsRoot))) {
return err(`thread not running (no logs dir): ${threadId}`);
}
const hashes = await readdir(logsRoot);
const resolved = await resolveThreadRecord(storageRoot, threadId);
if (resolved !== null) {
const runningPath = join(logsRoot, resolved.bundleHash, `${threadId}.running`);
if (!(await pathExists(runningPath))) {
return err(`thread not running: ${threadId}`);
}
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
if (terminal.kind === "terminal") {
return err(`thread not running: ${threadId}`);
}
return ok(resolved.bundleHash);
}
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch {
return err(`thread not running: ${threadId}`);
}
for (const hash of hashes) {
const runningPath = join(logsRoot, hash, `${threadId}.running`);
if (await pathExists(runningPath)) {
+42 -14
View File
@@ -1,8 +1,41 @@
import { parse, stringify } from "yaml";
import type { CasStore, MerkleNode, StepMerklePayload, ThreadMerklePayload } from "./types.js";
import type {
CasStore,
MerkleNode,
MerkleNodeType,
StepMerklePayload,
ThreadMerklePayload,
} from "./types.js";
function requireStringHashArray(value: unknown, notArrayMessage: string): string[] {
if (!Array.isArray(value)) {
throw new Error(notArrayMessage);
}
const out: string[] = [];
for (const c of value) {
if (typeof c !== "string") {
throw new Error("merkle: hash entry must be a string");
}
out.push(c);
}
return out;
}
function edgeListRaw(rec: Record<string, unknown>, type: MerkleNodeType): unknown {
if (type === "content") {
return rec.refs !== undefined ? rec.refs : rec.children;
}
return rec.children;
}
export function serializeMerkleNode(node: MerkleNode): string {
if (node.type === "content") {
return stringify(
{ type: node.type, payload: node.payload, refs: node.children },
{ indent: 2 },
);
}
return stringify(
{ type: node.type, payload: node.payload, children: node.children },
{ indent: 2 },
@@ -17,23 +50,18 @@ export function parseMerkleNode(yamlText: string): MerkleNode {
const rec = raw as Record<string, unknown>;
const type = rec.type;
const payload = rec.payload;
const children = rec.children;
if (type !== "content" && type !== "step" && type !== "thread") {
throw new Error("merkle: invalid or missing type");
}
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
throw new Error("merkle: payload must be a string or object");
}
if (!Array.isArray(children)) {
throw new Error("merkle: children must be an array");
}
const childHashes: string[] = [];
for (const c of children) {
if (typeof c !== "string") {
throw new Error("merkle: child hash must be a string");
}
childHashes.push(c);
}
const notArrayMsg =
type === "content"
? "merkle: content node requires refs or children array"
: "merkle: children must be an array";
const childHashes = requireStringHashArray(edgeListRaw(rec, type), notArrayMsg);
return {
type,
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
@@ -85,8 +113,8 @@ export async function putContentMerkleNode(store: CasStore, content: string): Pr
/**
* Loads a CAS blob and returns the payload string for a `content` node.
*
* Accepts both the legacy `{type:content, payload, children}` Merkle layout
* and the RFC v3 `{type:content, payload, refs}` content node layout.
* Accepts both the legacy `{ type:content, payload, children }` Merkle layout
* and the RFC-aligned `{ type:content, payload, refs }` content node layout.
*/
export async function getContentMerklePayload(
store: CasStore,
+4 -1
View File
@@ -9,7 +9,10 @@ function refsFromBlob(content: string): string[] {
return [];
}
const rec = raw as Record<string, unknown>;
const refs = rec.refs;
let refs = rec.refs;
if (!Array.isArray(refs) && Array.isArray(rec.children)) {
refs = rec.children;
}
if (!Array.isArray(refs)) {
return [];
}
@@ -0,0 +1 @@
VITE_GATEWAY_URL=https://workflow-gateway.shazhou.workers.dev
+3 -1
View File
@@ -10,7 +10,9 @@
},
"dependencies": {
"react": "^19.2.6",
"react-dom": "^19.2.6"
"react-dom": "^19.2.6",
"react-markdown": "^10.1.0",
"shiki": "^4.0.2"
},
"devDependencies": {
"@tailwindcss/vite": "^4.2.4",
+100 -29
View File
@@ -1,9 +1,43 @@
const BASE = "/api";
const GATEWAY_URL = import.meta.env.VITE_GATEWAY_URL || "";
async function postJson<T>(path: string, body: unknown): Promise<T> {
const res = await fetch(`${BASE}${path}`, {
export function getApiKey(): string | null {
try {
return localStorage.getItem("workflow-api-key");
} catch {
return null;
}
}
export function setApiKey(key: string): void {
localStorage.setItem("workflow-api-key", key);
}
export function clearApiKey(): void {
localStorage.removeItem("workflow-api-key");
}
export function hasApiKey(): boolean {
return getApiKey() !== null && getApiKey() !== "";
}
function authHeaders(): Record<string, string> {
const key = getApiKey();
if (key) return { Authorization: `Bearer ${key}` };
return {};
}
function agentBase(agent: string): string {
if (GATEWAY_URL) {
return `${GATEWAY_URL}/api/${agent}`;
}
// Local dev: proxy via vite, no agent prefix
return "/api";
}
async function postJson<T>(base: string, path: string, body: unknown): Promise<T> {
const res = await fetch(`${base}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
headers: { "Content-Type": "application/json", ...authHeaders() },
body: JSON.stringify(body),
});
if (!res.ok) {
@@ -13,14 +47,23 @@ async function postJson<T>(path: string, body: unknown): Promise<T> {
return res.json() as Promise<T>;
}
async function fetchJson<T>(path: string): Promise<T> {
const res = await fetch(`${BASE}${path}`);
async function fetchJson<T>(base: string, path: string): Promise<T> {
const res = await fetch(`${base}${path}`, { headers: authHeaders() });
if (!res.ok) {
throw new Error(`API ${res.status}: ${path}`);
}
return res.json() as Promise<T>;
}
// ── Endpoint types ──────────────────────────────────────────────────
export type AgentEndpoint = {
name: string;
url: string;
status: string;
lastHeartbeat: number;
};
export type WorkflowSummary = {
name: string;
currentHash: string;
@@ -35,50 +78,78 @@ export type ThreadSummary = {
status: string | null;
};
export type ThreadRecord = {
type: string;
role: string | null;
content: string | null;
timestamp: number | null;
[key: string]: unknown;
export type ThreadStartRecord = {
type: "thread-start";
workflow: string;
prompt: string | null;
threadId: string;
status: string;
timestamp: null;
};
export function listWorkflows(): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson("/workflows");
export type RoleRecord = {
type: "role";
role: string;
content: string;
timestamp: number | null;
meta: Record<string, unknown>;
};
export type WorkflowResultRecord = {
type: "workflow-result";
returnCode: number;
content: string;
timestamp: number | null;
};
export type ThreadRecord = ThreadStartRecord | RoleRecord | WorkflowResultRecord;
// ── Gateway endpoints ───────────────────────────────────────────────
export function listAgents(): Promise<AgentEndpoint[]> {
const url = GATEWAY_URL || "";
return fetchJson(url, "/endpoints");
}
export function listThreads(): Promise<{ threads: ThreadSummary[] }> {
return fetchJson("/threads");
// ── Agent-scoped endpoints ──────────────────────────────────────────
export function listWorkflows(agent: string): Promise<{ workflows: WorkflowSummary[] }> {
return fetchJson(agentBase(agent), "/workflows");
}
export function listRunningThreads(): Promise<{ threads: ThreadSummary[] }> {
return fetchJson("/threads/running");
export function listThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads");
}
export function getThread(id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(`/threads/${id}`);
export function listRunningThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads/running");
}
export function getThread(agent: string, id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(agentBase(agent), `/threads/${id}`);
}
export function runThread(
agent: string,
workflow: string,
prompt: string,
maxRounds: number = 10,
): Promise<{ threadId: string }> {
return postJson("/threads", { workflow, prompt, maxRounds });
return postJson(agentBase(agent), "/threads", { workflow, prompt, maxRounds });
}
export function killThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/kill`, {});
export function killThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/kill`, {});
}
export function pauseThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/pause`, {});
export function pauseThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/pause`, {});
}
export function resumeThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/resume`, {});
export function resumeThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
return postJson(agentBase(agent), `/threads/${threadId}/resume`, {});
}
export function getHealth(): Promise<{ ok: boolean }> {
return fetchJson("/healthz");
export function getAgentHealth(agent: string): Promise<{ ok: boolean }> {
return fetchJson(agentBase(agent), "/healthz");
}
+25 -8
View File
@@ -1,4 +1,6 @@
import { useState } from "react";
import { hasApiKey, clearApiKey } from "./api.ts";
import { LoginPage } from "./components/login.tsx";
import { RunDialog } from "./components/run-dialog.tsx";
import { Sidebar } from "./components/sidebar.tsx";
import { StatusBar } from "./components/status-bar.tsx";
@@ -8,24 +10,39 @@ import { WorkflowList } from "./components/workflow-list.tsx";
import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const { view, threadId, setView, setThreadId } = useHashRoute();
const [authed, setAuthed] = useState(hasApiKey());
const { view, agent, threadId, setView, setAgent, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
if (!authed) {
return <LoginPage onLogin={() => setAuthed(true)} />;
}
return (
<div className="flex h-screen">
<Sidebar view={view} onViewChange={setView} />
<Sidebar view={view} agent={agent} onViewChange={setView} onAgentChange={setAgent} onLogout={() => { clearApiKey(); setAuthed(false); }} />
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar onRun={() => setShowRun(true)} />
<StatusBar agent={agent} onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{view === "threads" && threadId === null && <ThreadList onSelect={setThreadId} />}
{view === "threads" && threadId !== null && (
<ThreadDetail threadId={threadId} onBack={() => setThreadId(null)} />
{!agent && (
<div className="flex items-center justify-center h-full">
<p style={{ color: "var(--color-text-muted)" }}>
Select an agent from the sidebar to get started.
</p>
</div>
)}
{view === "workflows" && <WorkflowList />}
{agent && view === "threads" && threadId === null && (
<ThreadList agent={agent} onSelect={setThreadId} />
)}
{agent && view === "threads" && threadId !== null && (
<ThreadDetail agent={agent} threadId={threadId} onBack={() => setThreadId(null)} />
)}
{agent && view === "workflows" && <WorkflowList agent={agent} />}
</div>
</main>
{showRun && (
{showRun && agent && (
<RunDialog
agent={agent}
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
@@ -0,0 +1,93 @@
import { useState } from "react";
import { setApiKey } from "../api.ts";
type Props = {
onLogin: () => void;
};
export function LoginPage({ onLogin }: Props) {
const [key, setKey] = useState("");
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!key.trim()) return;
setLoading(true);
setError(null);
// Test the key by hitting the endpoints list
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
try {
const res = await fetch(`${gatewayUrl}/endpoints`, {
headers: { Authorization: `Bearer ${key.trim()}` },
});
if (res.status === 401) {
setError("Invalid API key");
setLoading(false);
return;
}
if (!res.ok) {
setError(`Server error: ${res.status}`);
setLoading(false);
return;
}
} catch (err) {
setError(`Connection failed: ${err instanceof Error ? err.message : String(err)}`);
setLoading(false);
return;
}
setApiKey(key.trim());
onLogin();
}
return (
<div className="min-h-screen flex items-center justify-center" style={{ background: "var(--color-bg)" }}>
<div
className="p-8 rounded-lg border w-full max-w-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h1 className="text-xl font-bold mb-1" style={{ color: "var(--color-accent)" }}>
Workflow Dashboard
</h1>
<p className="text-sm mb-6" style={{ color: "var(--color-text-muted)" }}>
Enter your API key to continue
</p>
<form onSubmit={handleSubmit}>
<input
type="password"
value={key}
onChange={(e) => setKey(e.target.value)}
placeholder="API Key"
className="w-full px-3 py-2 rounded border text-sm mb-3 outline-none"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-border)",
color: "var(--color-text)",
}}
autoFocus
/>
{error && (
<p className="text-xs mb-3" style={{ color: "var(--color-error)" }}>
{error}
</p>
)}
<button
type="submit"
disabled={loading || !key.trim()}
className="w-full px-3 py-2 rounded text-sm font-medium"
style={{
background: "var(--color-accent)",
color: "var(--color-bg)",
opacity: loading || !key.trim() ? 0.5 : 1,
}}
>
{loading ? "Verifying..." : "Login"}
</button>
</form>
</div>
</div>
);
}
@@ -0,0 +1,108 @@
import ReactMarkdown from "react-markdown";
import { useEffect, useState } from "react";
import { createHighlighter, type HighlighterGeneric, type BundledLanguage, type BundledTheme } from "shiki";
let highlighterPromise: Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> | null = null;
const LANGS: BundledLanguage[] = ["typescript", "javascript", "json", "yaml", "bash", "python", "markdown"];
function getHighlighter(): Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> {
if (highlighterPromise === null) {
highlighterPromise = createHighlighter({
themes: ["github-dark"],
langs: LANGS,
});
}
return highlighterPromise;
}
function CodeBlock({ className, children }: { className?: string; children?: React.ReactNode }) {
const [html, setHtml] = useState<string | null>(null);
const code = String(children).replace(/\n$/, "");
const lang = className?.replace("language-", "") ?? "text";
useEffect(() => {
let cancelled = false;
getHighlighter().then((hl) => {
if (cancelled) return;
try {
const result = hl.codeToHtml(code, { lang, theme: "github-dark" });
setHtml(result);
} catch {
setHtml(null);
}
});
return () => { cancelled = true; };
}, [code, lang]);
if (html !== null) {
return (
<div
className="rounded overflow-x-auto text-xs my-2"
// biome-ignore lint/security/noDangerouslySetInnerHtml: shiki output is safe
dangerouslySetInnerHTML={{ __html: html }}
/>
);
}
return (
<pre className="rounded overflow-x-auto text-xs my-2 p-3" style={{ background: "var(--color-bg)" }}>
<code>{code}</code>
</pre>
);
}
export function Markdown({ content }: { content: string }) {
return (
<div className="prose prose-invert prose-sm max-w-none">
<ReactMarkdown
components={{
code({ className, children, ...props }) {
const isInline = !className;
if (isInline) {
return (
<code
className="text-xs px-1 py-0.5 rounded"
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
{...props}
>
{children}
</code>
);
}
return <CodeBlock className={className}>{children}</CodeBlock>;
},
p({ children }) {
return <p className="my-1.5 leading-relaxed">{children}</p>;
},
ul({ children }) {
return <ul className="list-disc pl-4 my-1.5">{children}</ul>;
},
ol({ children }) {
return <ol className="list-decimal pl-4 my-1.5">{children}</ol>;
},
h1({ children }) {
return <h1 className="text-lg font-bold mt-3 mb-1">{children}</h1>;
},
h2({ children }) {
return <h2 className="text-base font-bold mt-2 mb-1">{children}</h2>;
},
h3({ children }) {
return <h3 className="text-sm font-bold mt-2 mb-1">{children}</h3>;
},
blockquote({ children }) {
return (
<blockquote
className="border-l-2 pl-3 my-2 text-sm"
style={{ borderColor: "var(--color-accent)", color: "var(--color-text-muted)" }}
>
{children}
</blockquote>
);
},
}}>
{content}
</ReactMarkdown>
</div>
);
}
@@ -0,0 +1,128 @@
import type { ThreadStartRecord, RoleRecord, WorkflowResultRecord, ThreadRecord } from "../api.ts";
import { Markdown } from "./markdown.tsx";
const ROLE_COLORS: Record<string, string> = {
preparer: "#8b5cf6",
agent: "#3b82f6",
extractor: "#f59e0b",
};
function roleColor(role: string): string {
return ROLE_COLORS[role] ?? "var(--color-accent)";
}
function formatTime(ts: number | null): string | null {
if (ts === null) return null;
return new Date(ts).toLocaleTimeString();
}
function StartCard({ record }: { record: ThreadStartRecord }) {
return (
<div
className="p-4 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-2">
<span className="text-lg">🚀</span>
<span className="font-semibold" style={{ color: "var(--color-accent)" }}>
{record.workflow}
</span>
<span
className="text-xs px-2 py-0.5 rounded"
style={{
background: record.status === "active" ? "var(--color-success)" : "var(--color-border)",
color: record.status === "active" ? "var(--color-bg)" : "var(--color-text-muted)",
}}
>
{record.status}
</span>
</div>
{record.prompt !== null && (
<div
className="mt-2 p-3 rounded text-sm border-l-2"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-accent)",
color: "var(--color-text)",
}}
>
<div className="text-xs mb-1" style={{ color: "var(--color-text-muted)" }}>
Prompt
</div>
<Markdown content={record.prompt} />
</div>
)}
</div>
);
}
function RoleMessage({ record }: { record: RoleRecord }) {
const color = roleColor(record.role);
return (
<div
className="p-3 rounded-lg border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-2">
<span
className="text-xs px-2 py-0.5 rounded font-mono font-medium"
style={{ background: color, color: "#fff" }}
>
{record.role}
</span>
{formatTime(record.timestamp) !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{formatTime(record.timestamp)}
</span>
)}
</div>
<Markdown content={record.content} />
</div>
);
}
function ResultCard({ record }: { record: WorkflowResultRecord }) {
const success = record.returnCode === 0;
return (
<div
className="p-4 rounded-lg border"
style={{
background: "var(--color-surface)",
borderColor: success ? "var(--color-success)" : "var(--color-error)",
}}
>
<div className="flex items-center gap-2 mb-2">
<span className="text-lg">{success ? "✅" : "❌"}</span>
<span className="font-semibold text-sm">
{success ? "Completed" : "Failed"}
</span>
<span
className="text-xs px-2 py-0.5 rounded font-mono"
style={{
background: success ? "var(--color-success)" : "var(--color-error)",
color: "#fff",
}}
>
exit {record.returnCode}
</span>
{formatTime(record.timestamp) !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{formatTime(record.timestamp)}
</span>
)}
</div>
<Markdown content={record.content} />
</div>
);
}
export function RecordCard({ record }: { record: ThreadRecord }) {
switch (record.type) {
case "thread-start":
return <StartCard record={record} />;
case "role":
return <RoleMessage record={record} />;
case "workflow-result":
return <ResultCard record={record} />;
}
}
@@ -3,12 +3,13 @@ import { listWorkflows, runThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
onClose: () => void;
onCreated: (threadId: string) => void;
};
export function RunDialog({ onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(), []);
export function RunDialog({ agent, onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(agent), [agent]);
const [workflow, setWorkflow] = useState("");
const [prompt, setPrompt] = useState("");
const [maxRounds, setMaxRounds] = useState(10);
@@ -21,7 +22,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
setSubmitting(true);
setError(null);
try {
const result = await runThread(workflow, prompt, maxRounds);
const result = await runThread(agent, workflow, prompt, maxRounds);
onCreated(result.threadId);
} catch (err) {
setError(err instanceof Error ? err.message : String(err));
@@ -38,7 +39,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
className="w-full max-w-lg p-6 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h3 className="text-lg font-semibold mb-4">Run Thread</h3>
<h3 className="text-lg font-semibold mb-4">Run Thread on {agent}</h3>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label
@@ -1,10 +1,22 @@
import { useState } from "react";
import type { AgentEndpoint } from "../api.ts";
import { listAgents } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
view: "threads" | "workflows";
agent: string | null;
onViewChange: (v: "threads" | "workflows") => void;
onAgentChange: (a: string | null) => void;
onLogout: () => void;
};
export function Sidebar({ view, onViewChange }: Props) {
const items = [
export function Sidebar({ view, agent, onViewChange, onAgentChange, onLogout }: Props) {
const { status, data } = useFetch(() => listAgents(), []);
const [expanded, setExpanded] = useState(true);
const agents: AgentEndpoint[] = status === "ok" ? data : [];
const viewItems = [
{ key: "threads" as const, label: "Threads", icon: "⚡" },
{ key: "workflows" as const, label: "Workflows", icon: "📦" },
];
@@ -22,8 +34,56 @@ export function Sidebar({ view, onViewChange }: Props) {
Dashboard
</p>
</div>
{/* Agent selector */}
<div className="border-b" style={{ borderColor: "var(--color-border)" }}>
<button
type="button"
onClick={() => setExpanded(!expanded)}
className="w-full text-left px-4 py-2 text-xs font-medium"
style={{ color: "var(--color-text-muted)" }}
>
{expanded ? "▾" : "▸"} Agents
{agent && (
<span className="ml-2 text-xs" style={{ color: "var(--color-accent)" }}>
({agent})
</span>
)}
</button>
{expanded && (
<div className="px-2 pb-2 space-y-0.5">
{agents.length === 0 && (
<p className="text-xs px-2 py-1" style={{ color: "var(--color-text-muted)" }}>
{status === "loading" ? "Loading..." : "No agents online"}
</p>
)}
{agents.map((a) => (
<button
type="button"
key={a.name}
onClick={() => onAgentChange(a.name)}
className="w-full text-left px-3 py-1.5 rounded text-xs transition-colors flex items-center gap-2"
style={{
background: agent === a.name ? "var(--color-accent-dim)" : "transparent",
color: agent === a.name ? "#fff" : "var(--color-text-muted)",
}}
>
<span
className="inline-block w-1.5 h-1.5 rounded-full"
style={{
background: a.status === "online" ? "var(--color-success)" : "var(--color-error)",
}}
/>
{a.name}
</button>
))}
</div>
)}
</div>
{/* View navigation */}
<nav className="flex-1 p-2 space-y-1">
{items.map((item) => (
{viewItems.map((item) => (
<button
type="button"
key={item.key}
@@ -38,6 +98,17 @@ export function Sidebar({ view, onViewChange }: Props) {
</button>
))}
</nav>
<div className="p-2 border-t" style={{ borderColor: "var(--color-border)" }}>
<button
type="button"
onClick={onLogout}
className="w-full text-left px-3 py-2 rounded text-xs transition-colors"
style={{ color: "var(--color-text-muted)" }}
>
🚪 Logout
</button>
</div>
</aside>
);
}
@@ -1,9 +1,10 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { getHealth } from "../api.ts";
import { getAgentHealth } from "../api.ts";
type HealthStatus = "connected" | "disconnected" | "reconnecting";
type Props = {
agent: string | null;
onRun: () => void;
};
@@ -17,13 +18,17 @@ function statusLabel(status: HealthStatus): { text: string; color: string } {
return { text: "● Offline", color: "var(--color-error)" };
}
export function StatusBar({ onRun }: Props) {
export function StatusBar({ agent, onRun }: Props) {
const [status, setStatus] = useState<HealthStatus>("disconnected");
const wasConnectedRef = useRef(false);
const checkHealth = useCallback(async () => {
if (!agent) {
setStatus("disconnected");
return;
}
try {
await getHealth();
await getAgentHealth(agent);
wasConnectedRef.current = true;
setStatus("connected");
} catch {
@@ -33,9 +38,11 @@ export function StatusBar({ onRun }: Props) {
setStatus("disconnected");
}
}
}, []);
}, [agent]);
useEffect(() => {
wasConnectedRef.current = false;
setStatus("disconnected");
checkHealth();
const interval = setInterval(checkHealth, 10_000);
return () => clearInterval(interval);
@@ -49,12 +56,19 @@ export function StatusBar({ onRun }: Props) {
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="flex items-center gap-4">
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
<span style={{ color: "var(--color-text-muted)" }}>
{agent ? `Agent: ${agent}` : "No agent selected"}
</span>
<button
type="button"
onClick={onRun}
disabled={!agent}
className="px-3 py-1 rounded text-xs font-medium"
style={{ background: "var(--color-accent)", color: "#fff" }}
style={{
background: agent ? "var(--color-accent)" : "var(--color-border)",
color: "#fff",
opacity: agent ? 1 : 0.5,
}}
>
Run Thread
</button>
@@ -2,15 +2,17 @@ import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
import { RecordCard } from "./record-card.tsx";
type Props = {
agent: string;
threadId: string;
onBack: () => void;
};
export function ThreadDetail({ threadId, onBack }: Props) {
const sse = useSSE(threadId);
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
export function ThreadDetail({ agent, threadId, onBack }: Props) {
const sse = useSSE(agent, threadId);
const { status, data, error } = useFetch(() => getThread(agent, threadId), [agent, threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
@@ -30,7 +32,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
setActionStatus(`${action}ing...`);
try {
const fn = action === "kill" ? killThread : action === "pause" ? pauseThread : resumeThread;
await fn(threadId);
await fn(agent, threadId);
setActionStatus(`${action} sent ✓`);
} catch (e) {
setActionStatus(`${action} failed: ${e instanceof Error ? e.message : String(e)}`);
@@ -101,39 +103,8 @@ export function ThreadDetail({ threadId, onBack }: Props) {
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r) => (
<div
key={`${threadId}-${r.type}-${String(r.timestamp)}-${r.role ?? ""}-${r.content ?? ""}`}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-1">
<span
className="text-xs px-1.5 py-0.5 rounded font-mono"
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
>
{r.type}
</span>
{r.role && (
<span className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{r.role}
</span>
)}
{r.timestamp !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{new Date(r.timestamp).toLocaleTimeString()}
</span>
)}
</div>
{r.content && (
<pre
className="whitespace-pre-wrap text-xs mt-1"
style={{ color: "var(--color-text)" }}
>
{typeof r.content === "string" ? r.content : JSON.stringify(r.content, null, 2)}
</pre>
)}
</div>
{records.map((r, i) => (
<RecordCard key={`${threadId}-${i}`} record={r} />
))}
<div ref={recordsEndRef} aria-hidden />
</div>
@@ -2,11 +2,12 @@ import { listThreads } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
agent: string;
onSelect: (id: string) => void;
};
export function ThreadList({ onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(), []);
export function ThreadList({ agent, onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(agent), [agent]);
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
@@ -1,8 +1,12 @@
import { listWorkflows } from "../api.ts";
import { useFetch } from "../hooks.ts";
export function WorkflowList() {
const { status, data, error } = useFetch(() => listWorkflows(), []);
type Props = {
agent: string;
};
export function WorkflowList({ agent }: Props) {
const { status, data, error } = useFetch(() => listWorkflows(agent), [agent]);
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
@@ -4,37 +4,50 @@ type View = "threads" | "workflows";
type HashRoute = {
view: View;
agent: string | null;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
if (raw.startsWith("threads/")) {
const id = raw.slice("threads/".length);
if (id.length > 0) {
return { view: "threads", threadId: id };
}
// Format: #agent/threads/id or #agent/workflows or #threads or #workflows
const parts = raw.split("/");
// Check if first part is a known view
if (parts[0] === "threads" || parts[0] === "workflows") {
return {
view: parts[0] as View,
agent: null,
threadId: parts[0] === "threads" && parts.length > 1 ? parts.slice(1).join("/") : null,
};
}
if (raw === "workflows") {
return { view: "workflows", threadId: null };
}
return { view: "threads", threadId: null };
// First part is agent name
const agent = parts[0] || null;
const viewPart = parts[1] ?? "threads";
const view: View = viewPart === "workflows" ? "workflows" : "threads";
const threadId = view === "threads" && parts.length > 2 ? parts.slice(2).join("/") : null;
return { view, agent, threadId };
}
function buildHash(route: HashRoute): string {
const prefix = route.agent ? `${route.agent}/` : "";
if (route.view === "workflows") {
return "#workflows";
return `#${prefix}workflows`;
}
if (route.threadId !== null) {
return `#threads/${route.threadId}`;
return `#${prefix}threads/${route.threadId}`;
}
return "#threads";
return `#${prefix}threads`;
}
export function useHashRoute(): {
view: View;
agent: string | null;
threadId: string | null;
setView: (v: View) => void;
setAgent: (a: string | null) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
@@ -53,12 +66,20 @@ export function useHashRoute(): {
setRoute(next);
}, []);
const setView = useCallback((v: View) => navigate({ view: v, threadId: null }), [navigate]);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", threadId: id }),
[navigate],
const setView = useCallback(
(v: View) => navigate({ view: v, agent: route.agent, threadId: null }),
[navigate, route.agent],
);
return { view: route.view, threadId: route.threadId, setView, setThreadId };
const setAgent = useCallback(
(a: string | null) => navigate({ view: route.view, agent: a, threadId: null }),
[navigate, route.view],
);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", agent: route.agent, threadId: id }),
[navigate, route.agent],
);
return { view: route.view, agent: route.agent, threadId: route.threadId, setView, setAgent, setThreadId };
}
+16 -4
View File
@@ -8,6 +8,7 @@ import {
} from "react";
import type { ThreadRecord } from "./api.ts";
import { getApiKey } from "./api.ts";
export type UseSSEReturn = {
records: ThreadRecord[];
@@ -56,7 +57,17 @@ function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
ctx.cleanupEs();
}
export function useSSE(threadId: string | null): UseSSEReturn {
function sseUrl(agent: string, threadId: string): string {
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
const key = getApiKey();
const keyParam = key ? `?key=${encodeURIComponent(key)}` : "";
if (gatewayUrl) {
return `${gatewayUrl}/api/${agent}/threads/${encodeURIComponent(threadId)}/live${keyParam}`;
}
return `/api/threads/${encodeURIComponent(threadId)}/live`;
}
export function useSSE(agent: string | null, threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
@@ -65,7 +76,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null) {
if (threadId === null || agent === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
@@ -75,6 +86,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
const tid = threadId;
const agentName = agent;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
@@ -113,7 +125,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
cleanupEs();
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
const url = sseUrl(agentName, tid);
es = new EventSource(url);
es.onopen = () => {
@@ -155,7 +167,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
}
cleanupEs();
};
}, [threadId]);
}, [agent, threadId]);
return { records, connected, completed };
}
+19 -1
View File
@@ -1,3 +1,4 @@
import { unlinkSync } from "node:fs";
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
@@ -382,6 +383,23 @@ async function main(): Promise<void> {
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
function cleanupAllRunningMarkersSync(): void {
for (const threadId of threads.keys()) {
try {
unlinkSync(join(storageRoot, "logs", hash, `${threadId}.running`));
} catch {
// ignore missing file or other fs errors
}
}
}
for (const sig of ["SIGINT", "SIGTERM"] as const) {
process.on(sig, () => {
cleanupAllRunningMarkersSync();
process.exit(sig === "SIGINT" ? 130 : 143);
});
}
const cas = createCasStore(getGlobalCasDir(storageRoot));
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
@@ -498,8 +516,8 @@ async function main(): Promise<void> {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
threads.delete(threadId);
bumpDone();
socket?.end();
}
@@ -20,7 +20,7 @@ const CAS_GET_TOOL_DEFINITION = {
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
@@ -102,7 +102,7 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, refs for content nodes or children for step/thread legacy nodes) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
+17
View File
@@ -0,0 +1,17 @@
{
"name": "@uncaged/workflow-gateway",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy"
},
"dependencies": {
"hono": "^4.7.11"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20260425.1",
"wrangler": "^4.20.0"
}
}
+148
View File
@@ -0,0 +1,148 @@
import { Hono } from "hono";
import { cors } from "hono/cors";
type Env = {
Bindings: {
ENDPOINTS: KVNamespace;
GATEWAY_SECRET: string;
DASHBOARD_API_KEY: string;
};
};
type EndpointRecord = {
name: string;
url: string;
agentToken: string;
registeredAt: number;
lastHeartbeat: number;
};
const TTL_SECONDS = 300; // 5 min — offline if no heartbeat
const app = new Hono<Env>();
app.use("*", cors());
// ── Dashboard API key auth (skip healthz + register) ─────────────
app.use("/endpoints", async (c, next) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
await next();
});
app.use("/api/*", async (c, next) => {
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
await next();
});
function checkDashboardAuth(c: { req: { header: (n: string) => string | undefined; query: (n: string) => string | undefined }; env: Env["Bindings"] }): boolean {
const bearer = c.req.header("Authorization")?.replace("Bearer ", "");
const query = c.req.query("key");
const key = bearer ?? query;
return key === c.env.DASHBOARD_API_KEY;
}
// ── Health ──────────────────────────────────────────────────────────
app.get("/healthz", (c) => c.json({ ok: true }));
// ── Register / heartbeat ────────────────────────────────────────────
app.post("/register", async (c) => {
const body = await c.req.json<{ name?: string; url?: string; secret?: string; agentToken?: string }>();
const { name, url, secret, agentToken } = body;
if (!name || !url) {
return c.json({ error: "name and url required" }, 400);
}
if (secret !== c.env.GATEWAY_SECRET) {
return c.json({ error: "unauthorized" }, 401);
}
const existing = await c.env.ENDPOINTS.get<EndpointRecord>(name, "json");
const now = Date.now();
const record: EndpointRecord = {
name,
url: url.replace(/\/+$/, ""), // strip trailing slash
agentToken: agentToken ?? existing?.agentToken ?? "",
registeredAt: existing?.registeredAt ?? now,
lastHeartbeat: now,
};
await c.env.ENDPOINTS.put(name, JSON.stringify(record), {
expirationTtl: TTL_SECONDS,
});
const status = existing ? 200 : 201;
return c.json({ registered: name }, status);
});
// ── Unregister ──────────────────────────────────────────────────────
app.delete("/register/:name", async (c) => {
const auth = c.req.header("Authorization");
if (auth !== `Bearer ${c.env.GATEWAY_SECRET}`) {
return c.json({ error: "unauthorized" }, 401);
}
const name = c.req.param("name");
await c.env.ENDPOINTS.delete(name);
return c.json({ unregistered: name });
});
// ── List endpoints ──────────────────────────────────────────────────
app.get("/endpoints", async (c) => {
const list = await c.env.ENDPOINTS.list();
const endpoints: Array<{ name: string; url: string; status: string; lastHeartbeat: number }> = [];
for (const key of list.keys) {
const record = await c.env.ENDPOINTS.get<EndpointRecord>(key.name, "json");
if (record) {
const age = Date.now() - record.lastHeartbeat;
endpoints.push({
name: record.name,
url: record.url,
status: age < TTL_SECONDS * 1000 ? "online" : "offline",
lastHeartbeat: record.lastHeartbeat,
});
}
}
return c.json(endpoints);
});
// ── API proxy: /api/:agent/* → agent's tunnel URL ───────────────────
app.all("/api/:agent/*", async (c) => {
const agent = c.req.param("agent");
const record = await c.env.ENDPOINTS.get<EndpointRecord>(agent, "json");
if (!record) {
return c.json({ error: "agent not found" }, 404);
}
// Build target URL: strip /api/:agent prefix, forward the rest
const url = new URL(c.req.url);
const pathAfterAgent = url.pathname.replace(`/api/${agent}`, "");
const targetUrl = `${record.url}/api${pathAfterAgent}${url.search}`;
const headers = new Headers(c.req.raw.headers);
headers.delete("host");
headers.delete("Authorization"); // don't forward dashboard key to agent
if (record.agentToken) {
headers.set("X-Agent-Token", record.agentToken);
}
try {
const resp = await fetch(targetUrl, {
method: c.req.method,
headers,
body: c.req.method !== "GET" && c.req.method !== "HEAD" ? c.req.raw.body : undefined,
});
// Stream response back
return new Response(resp.body, {
status: resp.status,
headers: resp.headers,
});
} catch (err) {
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
}
});
export default app;
+12
View File
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "bundler",
"types": ["@cloudflare/workers-types"],
"strict": true,
"noEmit": true,
"skipLibCheck": true
},
"include": ["src"]
}
+9
View File
@@ -0,0 +1,9 @@
name = "workflow-gateway"
main = "src/index.ts"
compatibility_date = "2025-04-01"
[[kv_namespaces]]
binding = "ENDPOINTS"
id = "88b118d1cfab4c049f9c1684848811a3"
# GATEWAY_SECRET is set via `wrangler secret put`
@@ -40,7 +40,9 @@ function isAllowedImportSpecifier(spec: string): boolean {
if (
spec === "@uncaged/workflow" ||
spec === "@uncaged/workflow-runtime" ||
spec === "@uncaged/workflow-cas"
spec === "@uncaged/workflow-protocol" ||
spec === "@uncaged/workflow-cas" ||
spec === "@uncaged/workflow-util"
) {
return true;
}