Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d6fe3f844c | |||
| d0803019b5 | |||
| f16e7641fd | |||
| 3b41625001 | |||
| c602d2284b | |||
| d96e10b0fc | |||
| 8e36d3e1f5 | |||
| bbe4fe0ed1 | |||
| e105c5cac1 | |||
| 578776fccf | |||
| cb756a999a | |||
| e0577ceefe | |||
| 024dd8c1e8 | |||
| 9e98119145 | |||
| fd8943f131 | |||
| f7253d5948 | |||
| 1c5636c270 | |||
| ca0403c8ab | |||
| aa25f55f63 | |||
| e29d1bf345 | |||
| f3aedf8d6c |
@@ -10,7 +10,7 @@ This monorepo implements a workflow engine that executes single-file ESM bundles
|
||||
|---------|-----------|
|
||||
| **Workflow** | A single-file ESM module that exports `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash (Crockford Base32). |
|
||||
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. State lives in CAS (linked nodes); active threads indexed in `threads.json`; completed rows in `history/*.jsonl`. Debug logs use `.info.jsonl`. |
|
||||
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. |
|
||||
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ A workflow engine that executes single-file ESM bundles. Each workflow is a self
|
||||
|---------|-------------|
|
||||
| **Workflow** | A single-file ESM module exporting `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash. |
|
||||
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. CAS-backed chain plus `threads.json` / `history/*.jsonl`; `.info.jsonl` for debug logs. |
|
||||
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. Roles live inside template packages (`src/roles/`). |
|
||||
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
||||
| **CAS** | Content-Addressed Storage — bundles are immutable and addressed by hash. |
|
||||
|
||||
+12
-13
@@ -189,11 +189,15 @@ type WorkflowFn = (
|
||||
├── cas/ # Global content-addressed blobs (see getGlobalCasDir)
|
||||
├── bundles/
|
||||
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
|
||||
│ └── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
|
||||
│ ├── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
|
||||
│ └── C9NMV6V2TQT81/ # Per-hash bundle dir (alongside or instead of loose files)
|
||||
│ ├── threads.json # Active threads: threadId → { head, start, updatedAt }
|
||||
│ └── history/
|
||||
│ └── 2026-05-09.jsonl # Completed threads (one JSON object per line)
|
||||
├── logs/ # One folder per bundle hash
|
||||
│ └── C9NMV6V2TQT81/
|
||||
│ ├── 01KQXKW…YG.data.jsonl # Thread state
|
||||
│ └── 01KQXKW…YG.info.jsonl # Debug log
|
||||
│ ├── 01KQXKW…YG.running # Present while worker executes this thread (optional)
|
||||
│ └── 01KQXKW…YG.info.jsonl # Debug log
|
||||
└── workflow.yaml # Registry
|
||||
```
|
||||
|
||||
@@ -207,18 +211,13 @@ type WorkflowFn = (
|
||||
|
||||
Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution.
|
||||
|
||||
### Thread JSONL
|
||||
### Thread storage (CAS + index)
|
||||
|
||||
**`.data.jsonl`** — Line 1: start record; following lines: role steps with CAS-backed content.
|
||||
Thread execution state is a chain of immutable CAS nodes (`StartNode`, `StateNode`, content Merkle blobs). Per bundle:
|
||||
|
||||
```jsonc
|
||||
// Start record
|
||||
{ "name": "solve-issue", "hash": "C9NMV6V2TQT81", "threadId": "01KQXKW…",
|
||||
"parameters": { "prompt": "Fix bug #3", "options": { "maxRounds": 5 } },
|
||||
"timestamp": 1714963200000 }
|
||||
// Role output (engine persists contentHash + refs; body in ~/.uncaged/workflow/cas/)
|
||||
{ "role": "planner", "contentHash": "…", "meta": { "phases": [...] }, "refs": ["…"], "timestamp": ... }
|
||||
```
|
||||
- **`threads.json`** — only in-flight threads (`head`, `start`, `updatedAt`).
|
||||
- **`history/{YYYY-MM-DD}.jsonl`** — completed threads (`threadId`, `head`, `start`, `completedAt`).
|
||||
- **CAS (`cas/`)** — payloads and refs for replay, GC, and fork sharing.
|
||||
|
||||
**`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`:
|
||||
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import { cmdFork, cmdRun } from "../src/commands/thread/index.js";
|
||||
import { cmdAdd } from "../src/commands/workflow/index.js";
|
||||
import { pathExists } from "../src/fs-utils.js";
|
||||
import { resolveThreadRecord } from "../src/thread-scan.js";
|
||||
import { addCliArgs } from "./bundle-fixture.js";
|
||||
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
|
||||
|
||||
@@ -41,27 +45,6 @@ export const run = async function* (input, options) {
|
||||
};
|
||||
`;
|
||||
|
||||
async function countDataJsonlLines(dataPath: string): Promise<number> {
|
||||
try {
|
||||
const text = await readFile(dataPath, "utf8");
|
||||
return text
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "").length;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitUntilMinDataLines(dataPath: string, minLines: number): Promise<void> {
|
||||
for (let attempt = 0; attempt < 120; attempt++) {
|
||||
if ((await countDataJsonlLines(dataPath)) >= minLines) {
|
||||
return;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
}
|
||||
|
||||
async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
|
||||
for (let attempt = 0; attempt < 120; attempt++) {
|
||||
if (!(await pathExists(runningPath))) {
|
||||
@@ -71,6 +54,41 @@ async function waitUntilRunningAbsent(runningPath: string): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
async function waitUntilThreadCompletes(storageRoot: string, threadId: string): Promise<void> {
|
||||
for (let attempt = 0; attempt < 120; attempt++) {
|
||||
const row = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (row?.source === "history") {
|
||||
return;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
}
|
||||
|
||||
async function listMeaningfulRoleContents(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
): Promise<Array<{ role: string; content: string }>> {
|
||||
const row = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (row === null) {
|
||||
return [];
|
||||
}
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const frames = await walkStateFramesNewestFirst(cas, row.head);
|
||||
const chronological = [...frames].reverse();
|
||||
const out: Array<{ role: string; content: string }> = [];
|
||||
for (const fr of chronological) {
|
||||
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
|
||||
continue;
|
||||
}
|
||||
const content = await getContentMerklePayload(cas, fr.payload.content);
|
||||
out.push({
|
||||
role: fr.payload.role,
|
||||
content: content ?? "",
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
describe("cli fork", () => {
|
||||
let prevEnv: string | undefined;
|
||||
let storageRoot: string;
|
||||
@@ -110,10 +128,12 @@ describe("cli fork", () => {
|
||||
return;
|
||||
}
|
||||
const sourceId = ran.value.threadId;
|
||||
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
||||
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
||||
await waitUntilRunningAbsent(sourceRunning);
|
||||
await waitUntilMinDataLines(sourceData, 5);
|
||||
await waitUntilThreadCompletes(storageRoot, sourceId);
|
||||
|
||||
const histBefore = await resolveThreadRecord(storageRoot, sourceId);
|
||||
expect(histBefore?.source).toBe("history");
|
||||
|
||||
const forked = await cmdFork(storageRoot, sourceId, "planner");
|
||||
expect(forked.ok).toBe(true);
|
||||
@@ -121,25 +141,18 @@ describe("cli fork", () => {
|
||||
return;
|
||||
}
|
||||
const newId = forked.value.threadId;
|
||||
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
||||
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
||||
await waitUntilRunningAbsent(newRunning);
|
||||
await waitUntilMinDataLines(newData, 5);
|
||||
await waitUntilThreadCompletes(storageRoot, newId);
|
||||
|
||||
const text = await readFile(newData, "utf8");
|
||||
const lines = text
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "");
|
||||
expect(lines.length).toBe(5);
|
||||
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
||||
expect(start.threadId).toBe(newId);
|
||||
expect(start.forkFrom).toEqual({ threadId: sourceId });
|
||||
const forkHist = await resolveThreadRecord(storageRoot, newId);
|
||||
expect(forkHist?.source).toBe("history");
|
||||
expect(forkHist?.start).toBe(histBefore?.start);
|
||||
|
||||
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
|
||||
expect(lastRoleLine.role).toBe("reviewer");
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1");
|
||||
const steps = await listMeaningfulRoleContents(storageRoot, newId);
|
||||
const tail = steps[steps.length - 1];
|
||||
expect(tail?.role).toBe("reviewer");
|
||||
expect(tail?.content).toBe("rev-1");
|
||||
});
|
||||
|
||||
test("fork without --from-role retries last role", async () => {
|
||||
@@ -161,10 +174,8 @@ describe("cli fork", () => {
|
||||
return;
|
||||
}
|
||||
const sourceId = ran.value.threadId;
|
||||
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
|
||||
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
|
||||
await waitUntilRunningAbsent(sourceRunning);
|
||||
await waitUntilMinDataLines(sourceData, 5);
|
||||
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${sourceId}.running`));
|
||||
await waitUntilThreadCompletes(storageRoot, sourceId);
|
||||
|
||||
const forked = await cmdFork(storageRoot, sourceId, null);
|
||||
expect(forked.ok).toBe(true);
|
||||
@@ -172,26 +183,17 @@ describe("cli fork", () => {
|
||||
return;
|
||||
}
|
||||
const newId = forked.value.threadId;
|
||||
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
|
||||
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
|
||||
await waitUntilRunningAbsent(newRunning);
|
||||
await waitUntilMinDataLines(newData, 5);
|
||||
await waitUntilRunningAbsent(join(storageRoot, "logs", hash, `${newId}.running`));
|
||||
await waitUntilThreadCompletes(storageRoot, newId);
|
||||
|
||||
const text = await readFile(newData, "utf8");
|
||||
const lines = text
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "");
|
||||
expect(lines.length).toBe(5);
|
||||
|
||||
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
||||
expect(replayCoder.role).toBe("coder");
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
|
||||
|
||||
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
|
||||
expect(lastRoleLine.role).toBe("reviewer");
|
||||
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2");
|
||||
const steps = await listMeaningfulRoleContents(storageRoot, newId);
|
||||
expect(steps.length).toBeGreaterThanOrEqual(3);
|
||||
const coderReplay = steps[steps.length - 2];
|
||||
expect(coderReplay?.role).toBe("coder");
|
||||
expect(coderReplay?.content).toBe("c1");
|
||||
const tail = steps[steps.length - 1];
|
||||
expect(tail?.role).toBe("reviewer");
|
||||
expect(tail?.content).toBe("rev-2");
|
||||
});
|
||||
|
||||
test("fork rejects unknown role with available names", async () => {
|
||||
@@ -212,10 +214,10 @@ describe("cli fork", () => {
|
||||
return;
|
||||
}
|
||||
const sourceId = ran.value.threadId;
|
||||
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
|
||||
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
|
||||
await waitUntilRunningAbsent(sourceRunning);
|
||||
await waitUntilMinDataLines(sourceData, 5);
|
||||
await waitUntilRunningAbsent(
|
||||
join(storageRoot, "logs", added.value.hash, `${sourceId}.running`),
|
||||
);
|
||||
await waitUntilThreadCompletes(storageRoot, sourceId);
|
||||
|
||||
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
|
||||
expect(bad.ok).toBe(false);
|
||||
|
||||
@@ -1,45 +1,17 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { mkdir, mkdtemp, rm } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
|
||||
import { garbageCollectCas } from "@uncaged/workflow-execute";
|
||||
import { createCasStore, putStartNode } from "@uncaged/workflow-cas";
|
||||
import { garbageCollectCas, getBundleDir, upsertThreadEntry } from "@uncaged/workflow-execute";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
import { cmdThreadRemove } from "../src/commands/thread/index.js";
|
||||
import { pathExists } from "../src/fs-utils.js";
|
||||
|
||||
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
|
||||
|
||||
async function writeDemoDataJsonl(params: {
|
||||
path: string;
|
||||
threadId: string;
|
||||
bundleHash: string;
|
||||
cas: ReturnType<typeof createCasStore>;
|
||||
activeHash: string;
|
||||
}): Promise<void> {
|
||||
const bodyHash = await putContentMerkleNode(params.cas, "p");
|
||||
const text = [
|
||||
JSON.stringify({
|
||||
name: "demo",
|
||||
hash: params.bundleHash,
|
||||
threadId: params.threadId,
|
||||
parameters: { prompt: "hi", options: { maxRounds: 5 } },
|
||||
timestamp: 100,
|
||||
}),
|
||||
JSON.stringify({
|
||||
role: "planner",
|
||||
contentHash: bodyHash,
|
||||
meta: {},
|
||||
refs: [params.activeHash, bodyHash],
|
||||
timestamp: 101,
|
||||
}),
|
||||
"",
|
||||
].join("\n");
|
||||
await writeFile(params.path, text, "utf8");
|
||||
}
|
||||
|
||||
describe("gc cli and garbageCollectCas", () => {
|
||||
let prevEnv: string | undefined;
|
||||
let storageRoot: string;
|
||||
@@ -59,22 +31,30 @@ describe("gc cli and garbageCollectCas", () => {
|
||||
await rm(storageRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
|
||||
test("garbageCollectCas keeps CAS entries reachable from threads.json roots", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01AAA1111111111111111111";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
const bundleDir = getBundleDir(storageRoot, bundleHash);
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("active-blob");
|
||||
const orphanHash = await cas.put("orphan-blob");
|
||||
|
||||
await writeDemoDataJsonl({
|
||||
path: join(logsDir, `${threadId}.data.jsonl`),
|
||||
threadId,
|
||||
bundleHash,
|
||||
const promptHash = await cas.put("prompt-text");
|
||||
const startHash = await putStartNode(
|
||||
cas,
|
||||
activeHash,
|
||||
{
|
||||
name: "demo",
|
||||
hash: bundleHash,
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
|
||||
await upsertThreadEntry(bundleDir, threadId, {
|
||||
head: startHash,
|
||||
start: startHash,
|
||||
updatedAt: 100,
|
||||
});
|
||||
|
||||
const gc = await garbageCollectCas(storageRoot);
|
||||
@@ -82,12 +62,12 @@ describe("gc cli and garbageCollectCas", () => {
|
||||
if (!gc.ok) {
|
||||
return;
|
||||
}
|
||||
expect(gc.value.scannedThreads).toBe(1);
|
||||
expect(gc.value.activeRefs).toBe(2);
|
||||
expect(gc.value.scannedThreads).toBe(2);
|
||||
expect(gc.value.deletedEntries).toBe(1);
|
||||
expect(gc.value.deletedHashes).toEqual([orphanHash]);
|
||||
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(true);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${startHash}.txt`))).toBe(true);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
|
||||
});
|
||||
|
||||
@@ -110,19 +90,27 @@ describe("gc cli and garbageCollectCas", () => {
|
||||
test("cli gc prints stats", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01BBB2222222222222222222";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
const bundleDir = getBundleDir(storageRoot, bundleHash);
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("keep-me");
|
||||
const promptHash = await cas.put("prompt-text");
|
||||
const startHash = await putStartNode(
|
||||
cas,
|
||||
{
|
||||
name: "demo",
|
||||
hash: bundleHash,
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
await cas.put("drop-me");
|
||||
|
||||
await writeDemoDataJsonl({
|
||||
path: join(logsDir, `${threadId}.data.jsonl`),
|
||||
threadId,
|
||||
bundleHash,
|
||||
cas,
|
||||
activeHash,
|
||||
await upsertThreadEntry(bundleDir, threadId, {
|
||||
head: startHash,
|
||||
start: startHash,
|
||||
updatedAt: 100,
|
||||
});
|
||||
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
@@ -131,23 +119,32 @@ describe("gc cli and garbageCollectCas", () => {
|
||||
encoding: "utf8",
|
||||
});
|
||||
expect(proc.status).toBe(0);
|
||||
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries");
|
||||
expect(String(proc.stdout).trim()).toBe("scanned 2 threads, 2 active refs, deleted 1 entries");
|
||||
});
|
||||
|
||||
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01CCC3333333333333333333";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
const bundleDir = getBundleDir(storageRoot, bundleHash);
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("pinned-by-ref");
|
||||
await writeDemoDataJsonl({
|
||||
path: join(logsDir, `${threadId}.data.jsonl`),
|
||||
threadId,
|
||||
bundleHash,
|
||||
const promptHash = await cas.put("prompt-text");
|
||||
const startHash = await putStartNode(
|
||||
cas,
|
||||
activeHash,
|
||||
{
|
||||
name: "demo",
|
||||
hash: bundleHash,
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
|
||||
await upsertThreadEntry(bundleDir, threadId, {
|
||||
head: startHash,
|
||||
start: startHash,
|
||||
updatedAt: 100,
|
||||
});
|
||||
|
||||
const orphanHash = await cas.put("orphan-after-rm");
|
||||
@@ -157,6 +154,6 @@ describe("gc cli and garbageCollectCas", () => {
|
||||
expect(removed.ok).toBe(true);
|
||||
|
||||
expect(await pathExists(orphanPath)).toBe(false);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${promptHash}.txt`))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { spawn, spawnSync } from "node:child_process";
|
||||
import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { mkdtemp, rm } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import {
|
||||
formatLiveDebugLine,
|
||||
formatLiveTimeLabel,
|
||||
@@ -18,11 +15,6 @@ import {
|
||||
import { parseLiveArgv } from "../src/live-argv.js";
|
||||
|
||||
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
|
||||
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
|
||||
|
||||
/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */
|
||||
const LIVE_FIXTURE_PLANNER_BODY =
|
||||
"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11";
|
||||
|
||||
describe("live helpers", () => {
|
||||
test("formatLiveTimeLabel pads HH:MM:SS", () => {
|
||||
@@ -86,28 +78,6 @@ describe("live CLI", () => {
|
||||
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-"));
|
||||
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
|
||||
await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true });
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
await cp(
|
||||
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
|
||||
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
|
||||
);
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY);
|
||||
await putContentMerkleNode(cas, "patch");
|
||||
await putContentMerkleNode(cas, "still running");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
@@ -119,170 +89,6 @@ describe("live CLI", () => {
|
||||
await rm(storageRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
test("prints role steps and summary for a completed thread", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], {
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).toContain("coder");
|
||||
expect(stdout).toContain("meta:");
|
||||
expect(stdout).toContain('"phase":"plan"');
|
||||
expect(stdout).toContain("LINE10");
|
||||
expect(stdout).not.toContain("LINE11");
|
||||
expect(stdout).toContain("more line");
|
||||
expect(stdout).toContain("completed: returnCode=0");
|
||||
expect(stdout).toContain("fixture completed");
|
||||
});
|
||||
|
||||
test("--latest tails the newest thread by start timestamp", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], {
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("fixture completed");
|
||||
expect(stdout).not.toContain("older thread");
|
||||
});
|
||||
|
||||
test("--debug prints .info.jsonl records after data output", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("[DEBUGTAG1]");
|
||||
expect(stdout).toContain("bundle loaded");
|
||||
expect(stdout).toContain("[DEBUGTAG2]");
|
||||
expect(stdout).toContain("multi line");
|
||||
});
|
||||
|
||||
test("--role filters out non-matching roles", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).not.toContain("patch");
|
||||
expect(stdout).toContain("completed: returnCode=0");
|
||||
});
|
||||
|
||||
test("--latest --debug --role combine", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawn(
|
||||
process.execPath,
|
||||
[cliEntryPath, "live", "--latest", "--debug", "--role", "planner"],
|
||||
{
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("[DEBUGTAG1]");
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).not.toContain("patch");
|
||||
expect(stdout).toContain("fixture completed");
|
||||
});
|
||||
|
||||
test("unknown thread id exits 1", () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
|
||||
@@ -292,51 +98,6 @@ describe("live CLI", () => {
|
||||
expect(r.status).toBe(1);
|
||||
expect(String(r.stderr ?? "")).toContain("thread not found");
|
||||
});
|
||||
|
||||
test("follows file until WorkflowResult is appended", async () => {
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const dataPath = join(
|
||||
storageRoot,
|
||||
"logs",
|
||||
"C9NMV6V2TQT81",
|
||||
"01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl",
|
||||
);
|
||||
|
||||
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], {
|
||||
env,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 120));
|
||||
const prior = await readFile(dataPath, "utf8");
|
||||
await writeFile(
|
||||
dataPath,
|
||||
`${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`,
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const stdout = await new Promise<string>((resolve, reject) => {
|
||||
let buf = "";
|
||||
proc.stdout?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.stderr?.on("data", (c: Buffer) => {
|
||||
buf += c.toString("utf8");
|
||||
});
|
||||
proc.on("error", reject);
|
||||
proc.on("exit", (code: number | null) => {
|
||||
if (code === 0) {
|
||||
resolve(buf);
|
||||
} else {
|
||||
reject(new Error(`exit ${code}: ${buf}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(stdout).toContain("planner");
|
||||
expect(stdout).toContain("completed: returnCode=0");
|
||||
expect(stdout).toContain("caught up");
|
||||
});
|
||||
});
|
||||
|
||||
describe("live --latest with empty storage", () => {
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { dirname, join } from "node:path";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { getBundleDir, readThreadsIndex } from "@uncaged/workflow-execute";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
import { cmdCasPut } from "../src/commands/cas/index.js";
|
||||
import {
|
||||
@@ -18,6 +19,7 @@ import {
|
||||
} from "../src/commands/thread/index.js";
|
||||
import { cmdAdd } from "../src/commands/workflow/index.js";
|
||||
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
|
||||
import { resolveThreadRecord } from "../src/thread-scan.js";
|
||||
import { addCliArgs } from "./bundle-fixture.js";
|
||||
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
|
||||
|
||||
@@ -101,34 +103,21 @@ export const run = async function* (_input, options) {
|
||||
};
|
||||
`;
|
||||
|
||||
async function countDataJsonlLines(dataPath: string): Promise<number> {
|
||||
try {
|
||||
const text = await readFile(dataPath, "utf8");
|
||||
return text
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "").length;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitUntilMinDataLines(
|
||||
dataPath: string,
|
||||
minLines: number,
|
||||
maxAttempts: number,
|
||||
): Promise<void> {
|
||||
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
if ((await countDataJsonlLines(dataPath)) >= minLines) {
|
||||
if (!(await pathExists(runningPath))) {
|
||||
return;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
}
|
||||
|
||||
async function waitUntilRunningFileAbsent(runningPath: string, maxAttempts: number): Promise<void> {
|
||||
async function waitUntilPredicate(
|
||||
predicate: () => Promise<boolean>,
|
||||
maxAttempts: number,
|
||||
): Promise<void> {
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
if (!(await pathExists(runningPath))) {
|
||||
if (await predicate()) {
|
||||
return;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
@@ -200,8 +189,7 @@ describe("cli thread commands", () => {
|
||||
const removed = await cmdThreadRemove(storageRoot, threadId);
|
||||
expect(removed.ok).toBe(true);
|
||||
|
||||
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
|
||||
expect(await pathExists(dataPath)).toBe(false);
|
||||
expect(await resolveThreadRecord(storageRoot, threadId)).toBeNull();
|
||||
});
|
||||
|
||||
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
|
||||
@@ -234,9 +222,9 @@ describe("cli thread commands", () => {
|
||||
threads = await cmdThreads(storageRoot, []);
|
||||
}
|
||||
|
||||
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
|
||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
|
||||
await waitUntilRunningFileAbsent(runningPath, 120);
|
||||
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
|
||||
|
||||
const put = await cmdCasPut(storageRoot, "keep-after-thread-rm");
|
||||
expect(put.ok).toBe(true);
|
||||
@@ -323,24 +311,20 @@ describe("cli thread commands", () => {
|
||||
const killed = await cmdKill(storageRoot, threadId);
|
||||
expect(killed.ok).toBe(true);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 900));
|
||||
await waitUntilPredicate(async () => {
|
||||
return (await resolveThreadRecord(storageRoot, threadId))?.source === "history";
|
||||
}, 120);
|
||||
|
||||
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
|
||||
const text = await readFile(dataPath, "utf8");
|
||||
const lines = text
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "");
|
||||
expect(lines.length).toBe(3);
|
||||
expect((await resolveThreadRecord(storageRoot, threadId))?.source).toBe("history");
|
||||
|
||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
|
||||
expect(await pathExists(runningPath)).toBe(false);
|
||||
});
|
||||
|
||||
test("pause stops between yields and resume completes thread", async () => {
|
||||
const bundleDir = join(storageRoot, "src");
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
const bundlePath = join(bundleDir, "demo.esm.js");
|
||||
const srcDir = join(storageRoot, "src");
|
||||
await mkdir(srcDir, { recursive: true });
|
||||
const bundlePath = join(srcDir, "demo.esm.js");
|
||||
await writeFile(bundlePath, pauseResumeBundleSource, "utf8");
|
||||
|
||||
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
|
||||
@@ -356,24 +340,33 @@ describe("cli thread commands", () => {
|
||||
}
|
||||
|
||||
const threadId = ran.value.threadId;
|
||||
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
|
||||
const bundleDir = getBundleDir(storageRoot, added.value.hash);
|
||||
|
||||
await waitUntilMinDataLines(dataPath, 2, 80);
|
||||
expect(await countDataJsonlLines(dataPath)).toBe(2);
|
||||
await waitUntilPredicate(async () => {
|
||||
const idx = await readThreadsIndex(bundleDir);
|
||||
const ent = idx[threadId];
|
||||
return ent !== undefined && ent.head !== ent.start;
|
||||
}, 80);
|
||||
|
||||
const idxBeforePause = await readThreadsIndex(bundleDir);
|
||||
const headAtPause = idxBeforePause[threadId]?.head;
|
||||
|
||||
const paused = await cmdPause(storageRoot, threadId);
|
||||
expect(paused.ok).toBe(true);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 400));
|
||||
expect(await countDataJsonlLines(dataPath)).toBe(2);
|
||||
const idxPaused = await readThreadsIndex(bundleDir);
|
||||
expect(idxPaused[threadId]?.head).toBe(headAtPause);
|
||||
|
||||
const resumed = await cmdResume(storageRoot, threadId);
|
||||
expect(resumed.ok).toBe(true);
|
||||
|
||||
await waitUntilMinDataLines(dataPath, 4, 120);
|
||||
expect(await countDataJsonlLines(dataPath)).toBe(4);
|
||||
await waitUntilPredicate(async () => {
|
||||
const row = await resolveThreadRecord(storageRoot, threadId);
|
||||
return row?.source === "history";
|
||||
}, 120);
|
||||
|
||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
|
||||
await waitUntilRunningFileAbsent(runningPath, 100);
|
||||
expect(await pathExists(runningPath)).toBe(false);
|
||||
});
|
||||
@@ -397,8 +390,7 @@ describe("cli thread commands", () => {
|
||||
}
|
||||
|
||||
const threadId = ran.value.threadId;
|
||||
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
|
||||
const runningPath = join(dirname(dataPath), `${threadId}.running`);
|
||||
const runningPath = join(storageRoot, "logs", added.value.hash, `${threadId}.running`);
|
||||
|
||||
await waitUntilRunningFileAbsent(runningPath, 100);
|
||||
expect(await pathExists(runningPath)).toBe(false);
|
||||
|
||||
@@ -8,7 +8,7 @@ import { createWorkflowRoutes } from "./routes-workflow.js";
|
||||
|
||||
const MAX_BODY_SIZE = 1_048_576; // 1 MB
|
||||
|
||||
export function createApp(storageRoot: string): Hono {
|
||||
export function createApp(storageRoot: string, agentToken: string | null): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
app.onError((_err, c) => {
|
||||
@@ -37,7 +37,19 @@ export function createApp(storageRoot: string): Hono {
|
||||
await next();
|
||||
});
|
||||
|
||||
// ── Agent token auth (skip healthz) ───────────────────────────────
|
||||
if (agentToken !== null) {
|
||||
app.use("/api/*", async (c, next) => {
|
||||
const token = c.req.header("X-Agent-Token");
|
||||
if (token !== agentToken) {
|
||||
return c.json({ error: "unauthorized" }, 401);
|
||||
}
|
||||
await next();
|
||||
});
|
||||
}
|
||||
|
||||
app.get("/healthz", (c) => c.json({ ok: true }));
|
||||
app.get("/api/healthz", (c) => c.json({ ok: true }));
|
||||
|
||||
app.route("/api/workflows", createWorkflowRoutes(storageRoot));
|
||||
app.route("/api/threads", createThreadRoutes(storageRoot));
|
||||
|
||||
@@ -1,9 +1,18 @@
|
||||
import { statSync, watch } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { join } from "node:path";
|
||||
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||
import {
|
||||
FORK_BRANCH_ROLE,
|
||||
readThreadsIndex,
|
||||
type ThreadIndex,
|
||||
walkStateFramesNewestFirst,
|
||||
} from "@uncaged/workflow-execute";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
import { Hono } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
|
||||
import { resolveThreadDataPath } from "../../thread-scan.js";
|
||||
import { resolveThreadRecord } from "../../thread-scan.js";
|
||||
|
||||
type PumpState = {
|
||||
contentOffset: number;
|
||||
@@ -21,7 +30,6 @@ function fileSize(path: string): number {
|
||||
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
|
||||
const size = fileSize(path);
|
||||
if (size < state.contentOffset) {
|
||||
// File was truncated — reset
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
}
|
||||
@@ -42,15 +50,6 @@ function parseJsonLine(line: string): unknown {
|
||||
}
|
||||
}
|
||||
|
||||
function isWorkflowResult(record: unknown): boolean {
|
||||
return (
|
||||
record !== null &&
|
||||
typeof record === "object" &&
|
||||
"type" in (record as Record<string, unknown>) &&
|
||||
(record as Record<string, unknown>).type === "workflow-result"
|
||||
);
|
||||
}
|
||||
|
||||
function parseNewLines(chunk: string, state: PumpState): string[] {
|
||||
state.carry += chunk;
|
||||
|
||||
@@ -67,52 +66,193 @@ function parseNewLines(chunk: string, state: PumpState): string[] {
|
||||
return lines;
|
||||
}
|
||||
|
||||
type CasSseState = {
|
||||
printedHashes: Set<string>;
|
||||
lastHead: string | null;
|
||||
completionEmitted: boolean;
|
||||
};
|
||||
|
||||
type LiveSseStream = {
|
||||
writeSSE: (opts: { event: string; data: string; id: string }) => Promise<void>;
|
||||
};
|
||||
|
||||
function completionFromEndMeta(meta: Record<string, unknown>): {
|
||||
returnCode: number;
|
||||
summary: string;
|
||||
} | null {
|
||||
const returnCode = meta.returnCode;
|
||||
const summary = meta.summary;
|
||||
if (typeof returnCode !== "number" || typeof summary !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { returnCode, summary };
|
||||
}
|
||||
|
||||
async function emitRecordsForHead(params: {
|
||||
storageRoot: string;
|
||||
bundleDir: string;
|
||||
threadId: string;
|
||||
headHash: string;
|
||||
sseState: CasSseState;
|
||||
stream: LiveSseStream;
|
||||
eventId: { n: number };
|
||||
}): Promise<boolean> {
|
||||
const cas = createCasStore(getGlobalCasDir(params.storageRoot));
|
||||
const frames = await walkStateFramesNewestFirst(cas, params.headHash);
|
||||
const chronological = [...frames].reverse();
|
||||
|
||||
for (const fr of chronological) {
|
||||
if (params.sseState.printedHashes.has(fr.hash)) {
|
||||
continue;
|
||||
}
|
||||
params.sseState.printedHashes.add(fr.hash);
|
||||
|
||||
const role = fr.payload.role;
|
||||
if (role === FORK_BRANCH_ROLE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (role === END) {
|
||||
const wf = completionFromEndMeta(fr.payload.meta);
|
||||
if (wf !== null) {
|
||||
params.eventId.n++;
|
||||
await params.stream.writeSSE({
|
||||
event: "record",
|
||||
data: JSON.stringify({ type: "workflow-result", returnCode: wf.returnCode, content: wf.summary, timestamp: null }),
|
||||
id: String(params.eventId.n),
|
||||
});
|
||||
return true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
|
||||
const content =
|
||||
payloadText !== null
|
||||
? payloadText
|
||||
: `(content not in CAS; contentHash=${fr.payload.content})`;
|
||||
|
||||
params.eventId.n++;
|
||||
await params.stream.writeSSE({
|
||||
event: "record",
|
||||
data: JSON.stringify({
|
||||
type: "role",
|
||||
role: fr.payload.role,
|
||||
contentHash: fr.payload.content,
|
||||
content,
|
||||
meta: fr.payload.meta,
|
||||
timestamp: fr.payload.timestamp,
|
||||
}),
|
||||
id: String(params.eventId.n),
|
||||
});
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async function pumpThreadsJsonSse(params: {
|
||||
storageRoot: string;
|
||||
bundleDir: string;
|
||||
threadId: string;
|
||||
sseState: CasSseState;
|
||||
stream: LiveSseStream;
|
||||
eventId: { n: number };
|
||||
}): Promise<boolean> {
|
||||
let idx: ThreadIndex;
|
||||
try {
|
||||
idx = await readThreadsIndex(params.bundleDir);
|
||||
} catch {
|
||||
idx = {};
|
||||
}
|
||||
|
||||
const active = idx[params.threadId];
|
||||
|
||||
if (active === undefined) {
|
||||
if (params.sseState.completionEmitted) {
|
||||
return false;
|
||||
}
|
||||
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
|
||||
if (hist === null || hist.source !== "history") {
|
||||
return false;
|
||||
}
|
||||
params.sseState.completionEmitted = true;
|
||||
return await emitRecordsForHead({
|
||||
storageRoot: params.storageRoot,
|
||||
bundleDir: params.bundleDir,
|
||||
threadId: params.threadId,
|
||||
headHash: hist.head,
|
||||
sseState: params.sseState,
|
||||
stream: params.stream,
|
||||
eventId: params.eventId,
|
||||
});
|
||||
}
|
||||
|
||||
const head = active.head;
|
||||
if (params.sseState.lastHead === null) {
|
||||
params.sseState.lastHead = head;
|
||||
return await emitRecordsForHead({
|
||||
storageRoot: params.storageRoot,
|
||||
bundleDir: params.bundleDir,
|
||||
threadId: params.threadId,
|
||||
headHash: head,
|
||||
sseState: params.sseState,
|
||||
stream: params.stream,
|
||||
eventId: params.eventId,
|
||||
});
|
||||
}
|
||||
|
||||
if (head !== params.sseState.lastHead) {
|
||||
params.sseState.lastHead = head;
|
||||
return await emitRecordsForHead({
|
||||
storageRoot: params.storageRoot,
|
||||
bundleDir: params.bundleDir,
|
||||
threadId: params.threadId,
|
||||
headHash: head,
|
||||
sseState: params.sseState,
|
||||
stream: params.stream,
|
||||
eventId: params.eventId,
|
||||
});
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export function createLiveRoutes(storageRoot: string): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
app.get("/:threadId/live", async (c) => {
|
||||
const threadId = c.req.param("threadId");
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved === null) {
|
||||
return c.json({ error: `thread not found: ${threadId}` }, 404);
|
||||
}
|
||||
const resolvedDataPath = dataPath;
|
||||
|
||||
const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`);
|
||||
const threadTarget = resolved;
|
||||
const threadsJsonPath = join(threadTarget.bundleDir, "threads.json");
|
||||
const infoPath = join(storageRoot, "logs", threadTarget.bundleHash, `${threadId}.info.jsonl`);
|
||||
|
||||
return streamSSE(c, async (stream) => {
|
||||
const dataState: PumpState = { contentOffset: 0, carry: "" };
|
||||
const infoState: PumpState = { contentOffset: 0, carry: "" };
|
||||
let eventId = 0;
|
||||
const sseThreadState: CasSseState = {
|
||||
printedHashes: new Set<string>(),
|
||||
lastHead: null,
|
||||
completionEmitted: false,
|
||||
};
|
||||
const eventId = { n: 0 };
|
||||
|
||||
async function pumpData(): Promise<boolean> {
|
||||
let chunk: string | null;
|
||||
try {
|
||||
chunk = await readNewBytes(resolvedDataPath, dataState);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
if (chunk === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const lines = parseNewLines(chunk, dataState);
|
||||
for (const line of lines) {
|
||||
const record = parseJsonLine(line);
|
||||
eventId++;
|
||||
await stream.writeSSE({
|
||||
event: "record",
|
||||
data: JSON.stringify(record),
|
||||
id: String(eventId),
|
||||
});
|
||||
|
||||
if (isWorkflowResult(record)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
const finished = await pumpThreadsJsonSse({
|
||||
storageRoot,
|
||||
bundleDir: threadTarget.bundleDir,
|
||||
threadId,
|
||||
sseState: sseThreadState,
|
||||
stream,
|
||||
eventId,
|
||||
});
|
||||
return finished;
|
||||
}
|
||||
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: SSE newline framing mirrors legacy pump
|
||||
async function pumpInfo(): Promise<void> {
|
||||
let chunk: string | null;
|
||||
try {
|
||||
@@ -134,28 +274,46 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
eventId++;
|
||||
eventId.n++;
|
||||
await stream.writeSSE({
|
||||
event: "info",
|
||||
data: JSON.stringify(record),
|
||||
id: String(eventId),
|
||||
id: String(eventId.n),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Initial pump
|
||||
eventId.n++;
|
||||
await stream.writeSSE({
|
||||
event: "record",
|
||||
data: JSON.stringify({
|
||||
type: "thread-start",
|
||||
threadId: threadTarget.threadId,
|
||||
bundleHash: threadTarget.bundleHash,
|
||||
head: threadTarget.head,
|
||||
start: threadTarget.start,
|
||||
source: threadTarget.source,
|
||||
}),
|
||||
id: String(eventId.n),
|
||||
});
|
||||
|
||||
const done = await pumpData();
|
||||
await pumpInfo();
|
||||
try {
|
||||
await pumpInfo();
|
||||
} catch {
|
||||
// optional info file
|
||||
}
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Watch for changes
|
||||
const controller = new AbortController();
|
||||
let completed = false;
|
||||
|
||||
const dataWatcher = watch(resolvedDataPath, async () => {
|
||||
if (completed) return;
|
||||
const threadsJsonWatcher = watch(threadsJsonPath, async () => {
|
||||
if (completed) {
|
||||
return;
|
||||
}
|
||||
const finished = await pumpData();
|
||||
if (finished) {
|
||||
completed = true;
|
||||
@@ -166,7 +324,9 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
let infoWatcher: ReturnType<typeof watch> | null = null;
|
||||
try {
|
||||
infoWatcher = watch(infoPath, async () => {
|
||||
if (completed) return;
|
||||
if (completed) {
|
||||
return;
|
||||
}
|
||||
await pumpInfo();
|
||||
});
|
||||
} catch {
|
||||
@@ -175,11 +335,10 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
|
||||
stream.onAbort(() => {
|
||||
completed = true;
|
||||
dataWatcher.close();
|
||||
threadsJsonWatcher.close();
|
||||
infoWatcher?.close();
|
||||
});
|
||||
|
||||
// Keep stream alive until completion or client disconnect
|
||||
await new Promise<void>((resolve) => {
|
||||
if (completed) {
|
||||
resolve();
|
||||
@@ -189,7 +348,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
||||
stream.onAbort(() => resolve());
|
||||
});
|
||||
|
||||
dataWatcher.close();
|
||||
threadsJsonWatcher.close();
|
||||
infoWatcher?.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,21 +1,115 @@
|
||||
import { join } from "node:path";
|
||||
import { createCasStore, getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
|
||||
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
import { Hono } from "hono";
|
||||
|
||||
import { readTextFileIfExists } from "../../fs-utils.js";
|
||||
import { pathExists } from "../../fs-utils.js";
|
||||
import type { ResolvedThreadRecord } from "../../thread-scan.js";
|
||||
import {
|
||||
listHistoricalThreads,
|
||||
listRunningThreads,
|
||||
resolveThreadDataPath,
|
||||
resolveThreadListStatus,
|
||||
resolveThreadRecord,
|
||||
} from "../../thread-scan.js";
|
||||
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
|
||||
import { cmdRun } from "../thread/run.js";
|
||||
|
||||
async function readStartInfo(
|
||||
cas: ReturnType<typeof createCasStore>,
|
||||
startHash: string,
|
||||
): Promise<{ name: string | null; prompt: string | null }> {
|
||||
const raw = await cas.get(startHash);
|
||||
if (raw === null) return { name: null, prompt: null };
|
||||
const parsed = parseCasThreadNode(raw);
|
||||
if (parsed === null || parsed.kind !== "start") return { name: null, prompt: null };
|
||||
const name = parsed.node.payload.name;
|
||||
const promptHash = parsed.node.refs[0] ?? null;
|
||||
let prompt: string | null = null;
|
||||
if (promptHash !== null) {
|
||||
prompt = await getContentMerklePayload(cas, promptHash);
|
||||
}
|
||||
return { name, prompt };
|
||||
}
|
||||
|
||||
async function buildThreadDetailRecords(
|
||||
storageRoot: string,
|
||||
resolved: ResolvedThreadRecord,
|
||||
): Promise<unknown[]> {
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
||||
const chronological = [...frames].reverse();
|
||||
|
||||
const { name: workflowName, prompt } = await readStartInfo(cas, resolved.start);
|
||||
|
||||
const records: unknown[] = [
|
||||
{
|
||||
type: "thread-start",
|
||||
workflow: workflowName ?? "unknown",
|
||||
prompt: prompt ?? null,
|
||||
threadId: resolved.threadId,
|
||||
status: resolved.source,
|
||||
timestamp: null,
|
||||
},
|
||||
];
|
||||
|
||||
for (const fr of chronological) {
|
||||
if (fr.payload.role === FORK_BRANCH_ROLE) {
|
||||
continue;
|
||||
}
|
||||
if (fr.payload.role === END) {
|
||||
const returnCode = fr.payload.meta.returnCode;
|
||||
const summary = fr.payload.meta.summary;
|
||||
if (typeof returnCode === "number" && typeof summary === "string") {
|
||||
records.push({
|
||||
type: "workflow-result",
|
||||
returnCode,
|
||||
content: summary,
|
||||
timestamp: fr.payload.timestamp,
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
|
||||
const content =
|
||||
payloadText !== null
|
||||
? payloadText
|
||||
: `(content not in CAS; contentHash=${fr.payload.content})`;
|
||||
records.push({
|
||||
type: "role",
|
||||
role: fr.payload.role,
|
||||
contentHash: fr.payload.content,
|
||||
content,
|
||||
meta: fr.payload.meta,
|
||||
timestamp: fr.payload.timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
return records;
|
||||
}
|
||||
|
||||
export function createThreadRoutes(storageRoot: string): Hono {
|
||||
const app = new Hono();
|
||||
|
||||
app.get("/", async (c) => {
|
||||
const nameFilter = c.req.query("workflow") ?? null;
|
||||
const rows = await listHistoricalThreads(storageRoot, nameFilter);
|
||||
return c.json({ threads: rows });
|
||||
const threads = await Promise.all(
|
||||
rows.map(async (r) => {
|
||||
const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`);
|
||||
const runningMarkerPresent = await pathExists(runningPath);
|
||||
const status = await resolveThreadListStatus(storageRoot, r, runningMarkerPresent);
|
||||
return {
|
||||
threadId: r.threadId,
|
||||
workflow: r.workflowName,
|
||||
hash: r.hash,
|
||||
startedAt: new Date(r.activityTs).toISOString(),
|
||||
status,
|
||||
};
|
||||
}),
|
||||
);
|
||||
return c.json({ threads });
|
||||
});
|
||||
|
||||
app.get("/running", async (c) => {
|
||||
@@ -25,22 +119,11 @@ export function createThreadRoutes(storageRoot: string): Hono {
|
||||
|
||||
app.get("/:threadId", async (c) => {
|
||||
const threadId = c.req.param("threadId");
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved === null) {
|
||||
return c.json({ error: `thread not found: ${threadId}` }, 404);
|
||||
}
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return c.json({ error: `thread data missing: ${threadId}` }, 404);
|
||||
}
|
||||
const lines = text.trim().split("\n");
|
||||
const records = lines.map((line) => {
|
||||
try {
|
||||
return JSON.parse(line) as unknown;
|
||||
} catch {
|
||||
return { raw: line };
|
||||
}
|
||||
});
|
||||
const records = await buildThreadDetailRecords(storageRoot, resolved);
|
||||
return c.json({ threadId, records });
|
||||
});
|
||||
|
||||
|
||||
@@ -1,12 +1,23 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { hostname as osHostname } from "node:os";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
import { serve } from "bun";
|
||||
|
||||
import { printCliLine } from "../../cli-output.js";
|
||||
import { createApp } from "./app.js";
|
||||
import {
|
||||
registerWithGateway,
|
||||
startHeartbeat,
|
||||
startTunnel,
|
||||
unregisterFromGateway,
|
||||
} from "./tunnel.js";
|
||||
import type { ServeOptions } from "./types.js";
|
||||
|
||||
export function startServer(storageRoot: string, options: ServeOptions): void {
|
||||
const app = createApp(storageRoot);
|
||||
const DEFAULT_GATEWAY_URL = "https://workflow-gateway.shazhou.workers.dev";
|
||||
const HEARTBEAT_INTERVAL_MS = 60_000;
|
||||
|
||||
export function startServer(storageRoot: string, options: ServeOptions, agentToken: string | null): void {
|
||||
const app = createApp(storageRoot, agentToken);
|
||||
|
||||
const server = serve({
|
||||
fetch: app.fetch,
|
||||
@@ -28,30 +39,51 @@ function parsePortValue(value: string | undefined): Result<number, string> {
|
||||
return ok(parsed);
|
||||
}
|
||||
|
||||
function requireNextArg(argv: string[], i: number, flag: string): Result<string, string> {
|
||||
const next = argv[i + 1];
|
||||
if (next === undefined) {
|
||||
return err(`${flag} requires a value`);
|
||||
}
|
||||
return ok(next);
|
||||
}
|
||||
|
||||
function parseServeArgv(argv: string[]): Result<ServeOptions, string> {
|
||||
let port = 7860;
|
||||
let hostname = "127.0.0.1";
|
||||
let name = osHostname().split(".")[0].toLowerCase();
|
||||
let noTunnel = false;
|
||||
let gatewayUrl = DEFAULT_GATEWAY_URL;
|
||||
const gatewaySecret = process.env.WORKFLOW_GATEWAY_SECRET ?? "";
|
||||
const stringFlags: Record<string, (v: string) => void> = {
|
||||
"--host": (v) => {
|
||||
hostname = v;
|
||||
},
|
||||
"--name": (v) => {
|
||||
name = v;
|
||||
},
|
||||
"--gateway": (v) => {
|
||||
gatewayUrl = v;
|
||||
},
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i++) {
|
||||
const arg = argv[i];
|
||||
if (arg === "--port" || arg === "-p") {
|
||||
const portResult = parsePortValue(argv[i + 1]);
|
||||
if (!portResult.ok) {
|
||||
return portResult;
|
||||
}
|
||||
if (!portResult.ok) return portResult;
|
||||
port = portResult.value;
|
||||
i++;
|
||||
} else if (arg === "--host") {
|
||||
const next = argv[i + 1];
|
||||
if (next === undefined) {
|
||||
return err("--host requires a value");
|
||||
}
|
||||
hostname = next;
|
||||
} else if (arg === "--no-tunnel") {
|
||||
noTunnel = true;
|
||||
} else if (arg in stringFlags) {
|
||||
const r = requireNextArg(argv, i, arg);
|
||||
if (!r.ok) return r;
|
||||
stringFlags[arg](r.value);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
return ok({ port, hostname });
|
||||
return ok({ port, hostname, name, noTunnel, gatewayUrl, gatewaySecret });
|
||||
}
|
||||
|
||||
export async function dispatchServe(storageRoot: string, argv: string[]): Promise<number> {
|
||||
@@ -61,7 +93,65 @@ export async function dispatchServe(storageRoot: string, argv: string[]): Promis
|
||||
return 1;
|
||||
}
|
||||
|
||||
startServer(storageRoot, parsed.value);
|
||||
const options = parsed.value;
|
||||
const agentToken = options.noTunnel ? null : randomUUID();
|
||||
startServer(storageRoot, options, agentToken);
|
||||
|
||||
if (options.noTunnel) {
|
||||
printCliLine("tunnel disabled (--no-tunnel)");
|
||||
await new Promise(() => {});
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Start cloudflared quick tunnel
|
||||
printCliLine("starting cloudflared quick tunnel...");
|
||||
const tunnel = await startTunnel(options.port);
|
||||
|
||||
if (!tunnel) {
|
||||
printCliLine("failed to create tunnel — continuing without gateway registration");
|
||||
await new Promise(() => {});
|
||||
return 0;
|
||||
}
|
||||
|
||||
printCliLine(`tunnel: ${tunnel.url}`);
|
||||
|
||||
// Register with gateway
|
||||
if (options.gatewaySecret) {
|
||||
const registered = await registerWithGateway(
|
||||
options.gatewayUrl,
|
||||
options.name,
|
||||
tunnel.url,
|
||||
options.gatewaySecret,
|
||||
agentToken!,
|
||||
);
|
||||
if (registered) {
|
||||
printCliLine(`registered with gateway as "${options.name}"`);
|
||||
}
|
||||
|
||||
// Start heartbeat
|
||||
const heartbeatTimer = startHeartbeat(
|
||||
options.gatewayUrl,
|
||||
options.name,
|
||||
tunnel.url,
|
||||
options.gatewaySecret,
|
||||
agentToken!,
|
||||
HEARTBEAT_INTERVAL_MS,
|
||||
);
|
||||
|
||||
// Cleanup on exit
|
||||
const cleanup = async () => {
|
||||
clearInterval(heartbeatTimer);
|
||||
printCliLine("unregistering from gateway...");
|
||||
await unregisterFromGateway(options.gatewayUrl, options.name, options.gatewaySecret);
|
||||
tunnel.process.kill();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGINT", cleanup);
|
||||
process.on("SIGTERM", cleanup);
|
||||
} else {
|
||||
printCliLine("WORKFLOW_GATEWAY_SECRET not set — skipping gateway registration");
|
||||
}
|
||||
|
||||
// Keep process alive
|
||||
await new Promise(() => {});
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
import { printCliLine } from "../../cli-output.js";
|
||||
|
||||
type TunnelHandle = {
|
||||
process: ReturnType<typeof Bun.spawn>;
|
||||
url: string;
|
||||
};
|
||||
|
||||
export async function startTunnel(port: number): Promise<TunnelHandle | null> {
|
||||
const proc = Bun.spawn(["cloudflared", "tunnel", "--url", `http://localhost:${port}`], {
|
||||
stdout: "pipe",
|
||||
stderr: "pipe",
|
||||
});
|
||||
|
||||
// cloudflared prints the URL to stderr
|
||||
const reader = proc.stderr.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
const deadline = Date.now() + 30_000;
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const match = buffer.match(/https:\/\/[a-z0-9-]+\.trycloudflare\.com/);
|
||||
if (match) {
|
||||
// Release the reader so stderr keeps flowing without backpressure
|
||||
reader.releaseLock();
|
||||
return { process: proc, url: match[0] };
|
||||
}
|
||||
}
|
||||
|
||||
reader.releaseLock();
|
||||
proc.kill();
|
||||
return null;
|
||||
}
|
||||
|
||||
export async function registerWithGateway(
|
||||
gatewayUrl: string,
|
||||
name: string,
|
||||
tunnelUrl: string,
|
||||
secret: string,
|
||||
agentToken: string,
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const resp = await fetch(`${gatewayUrl}/register`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ name, url: tunnelUrl, secret, agentToken }),
|
||||
});
|
||||
if (!resp.ok) {
|
||||
const body = await resp.text();
|
||||
printCliLine(`gateway registration failed: ${resp.status} ${body}`);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (e) {
|
||||
printCliLine(`gateway registration error: ${e}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export async function unregisterFromGateway(
|
||||
gatewayUrl: string,
|
||||
name: string,
|
||||
secret: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await fetch(`${gatewayUrl}/register/${name}`, {
|
||||
method: "DELETE",
|
||||
headers: { Authorization: `Bearer ${secret}` },
|
||||
});
|
||||
} catch {
|
||||
// Best effort — process is exiting
|
||||
}
|
||||
}
|
||||
|
||||
export function startHeartbeat(
|
||||
gatewayUrl: string,
|
||||
name: string,
|
||||
tunnelUrl: string,
|
||||
secret: string,
|
||||
agentToken: string,
|
||||
intervalMs: number,
|
||||
): ReturnType<typeof setInterval> {
|
||||
return setInterval(() => {
|
||||
registerWithGateway(gatewayUrl, name, tunnelUrl, secret, agentToken).catch(() => {});
|
||||
}, intervalMs);
|
||||
}
|
||||
@@ -1,4 +1,8 @@
|
||||
export type ServeOptions = {
|
||||
port: number;
|
||||
hostname: string;
|
||||
name: string;
|
||||
noTunnel: boolean;
|
||||
gatewayUrl: string;
|
||||
gatewaySecret: string;
|
||||
};
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import { join } from "node:path";
|
||||
import { buildForkPlan } from "@uncaged/workflow-execute";
|
||||
import { createCasStore } from "@uncaged/workflow-cas";
|
||||
import { prepareCasFork } from "@uncaged/workflow-execute";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
import { generateUlid } from "@uncaged/workflow-util";
|
||||
import { generateUlid, getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import { pathExists, readTextFileIfExists } from "../../fs-utils.js";
|
||||
import { resolveThreadDataPath } from "../../thread-scan.js";
|
||||
import { pathExists } from "../../fs-utils.js";
|
||||
import { resolveThreadRecord } from "../../thread-scan.js";
|
||||
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
|
||||
|
||||
export async function cmdFork(
|
||||
@@ -12,49 +13,51 @@ export async function cmdFork(
|
||||
threadId: string,
|
||||
fromRole: string | null,
|
||||
): Promise<Result<{ threadId: string }, string>> {
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved === null) {
|
||||
return err(`thread not found: ${threadId}`);
|
||||
}
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return err(`thread data missing: ${threadId}`);
|
||||
|
||||
const bundlePath = join(storageRoot, "bundles", `${resolved.bundleHash}.esm.js`);
|
||||
if (!(await pathExists(bundlePath))) {
|
||||
return err(`bundle file missing for thread hash ${resolved.bundleHash}`);
|
||||
}
|
||||
|
||||
const plan = buildForkPlan(text, fromRole);
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const newThreadId = generateUlid(Date.now());
|
||||
|
||||
const plan = await prepareCasFork({
|
||||
cas,
|
||||
bundleDir: resolved.bundleDir,
|
||||
bundleHash: resolved.bundleHash,
|
||||
sourceThreadId: threadId,
|
||||
headHash: resolved.head,
|
||||
startHash: resolved.start,
|
||||
newThreadId,
|
||||
fromRole,
|
||||
});
|
||||
if (!plan.ok) {
|
||||
return plan;
|
||||
}
|
||||
|
||||
const bundlePath = join(storageRoot, "bundles", `${plan.value.hash}.esm.js`);
|
||||
if (!(await pathExists(bundlePath))) {
|
||||
return err(`bundle file missing for thread hash ${plan.value.hash}`);
|
||||
}
|
||||
|
||||
const worker = await ensureWorkerForHash(storageRoot, plan.value.hash, bundlePath);
|
||||
if (!worker.ok) {
|
||||
return worker;
|
||||
}
|
||||
|
||||
const newThreadId = generateUlid(Date.now());
|
||||
const stepsOnWire = plan.value.historicalSteps.map((s) => ({
|
||||
role: s.role,
|
||||
contentHash: s.contentHash,
|
||||
meta: s.meta,
|
||||
refs: s.refs,
|
||||
timestamp: s.timestamp,
|
||||
}));
|
||||
|
||||
const p = plan.value;
|
||||
const sent = await sendWorkerTcpCommand(
|
||||
worker.value.port,
|
||||
{
|
||||
type: "run",
|
||||
threadId: newThreadId,
|
||||
workflowName: plan.value.workflowName,
|
||||
prompt: plan.value.prompt,
|
||||
options: plan.value.runOptions,
|
||||
steps: stepsOnWire,
|
||||
forkSourceThreadId: plan.value.sourceThreadId,
|
||||
workflowName: p.workflowName,
|
||||
prompt: p.prompt,
|
||||
options: p.runOptions,
|
||||
steps: p.steps,
|
||||
stepTimestamps: p.stepTimestamps.length > 0 ? p.stepTimestamps : null,
|
||||
forkSourceThreadId: threadId,
|
||||
forkContinuation: p.forkContinuation,
|
||||
},
|
||||
{ awaitResponseLine: false },
|
||||
);
|
||||
|
||||
@@ -1,16 +1,26 @@
|
||||
import { watch } from "node:fs";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import { mkdir, readFile } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||
import { tryParseRoleStepRecord, tryParseWorkflowResultRecord } from "@uncaged/workflow-execute";
|
||||
import {
|
||||
FORK_BRANCH_ROLE,
|
||||
readThreadsIndex,
|
||||
type ThreadIndex,
|
||||
walkStateFramesNewestFirst,
|
||||
} from "@uncaged/workflow-execute";
|
||||
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import { dimGreyLine, highlightLiveRole } from "../../cli-color.js";
|
||||
import { printCliError, printCliLine } from "../../cli-output.js";
|
||||
import { pathExists } from "../../fs-utils.js";
|
||||
import type { ParsedLiveArgv } from "../../live-argv.js";
|
||||
import { findLatestThreadDataPath, resolveThreadDataPath } from "../../thread-scan.js";
|
||||
import {
|
||||
findLatestThreadBundleTarget,
|
||||
type LatestThreadTarget,
|
||||
resolveThreadRecord,
|
||||
} from "../../thread-scan.js";
|
||||
import type { LiveRoleRow } from "./types.js";
|
||||
|
||||
export const LIVE_CONTENT_MAX_LINES = 10;
|
||||
@@ -48,16 +58,15 @@ function printSummary(result: WorkflowCompletion): void {
|
||||
printCliLine(`completed: returnCode=${result.returnCode} — ${result.summary}`);
|
||||
}
|
||||
|
||||
type LiveSessionState = {
|
||||
sawStart: boolean;
|
||||
completed: boolean;
|
||||
type InfoLiveState = {
|
||||
carry: string;
|
||||
contentOffset: number;
|
||||
};
|
||||
|
||||
type InfoLiveState = {
|
||||
carry: string;
|
||||
contentOffset: number;
|
||||
type CasLiveState = {
|
||||
printedHashes: Set<string>;
|
||||
lastHead: string | null;
|
||||
completionEmitted: boolean;
|
||||
};
|
||||
|
||||
function tryParseInfoRecord(obj: Record<string, unknown>): {
|
||||
@@ -79,102 +88,140 @@ function tryParseInfoRecord(obj: Record<string, unknown>): {
|
||||
return { tag, content, timestamp };
|
||||
}
|
||||
|
||||
async function handleJsonlLine(
|
||||
rawLine: string,
|
||||
state: LiveSessionState,
|
||||
roleFilter: string | null,
|
||||
cas: CasStore,
|
||||
): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> {
|
||||
const trimmed = rawLine.trim();
|
||||
if (trimmed === "") {
|
||||
return { parseError: null, workflowResult: null };
|
||||
function completionFromEndMeta(meta: Record<string, unknown>): WorkflowCompletion | null {
|
||||
const returnCode = meta.returnCode;
|
||||
const summary = meta.summary;
|
||||
if (typeof returnCode !== "number" || typeof summary !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { returnCode, summary };
|
||||
}
|
||||
|
||||
let rec: unknown;
|
||||
try {
|
||||
rec = JSON.parse(trimmed) as unknown;
|
||||
} catch {
|
||||
return { parseError: "invalid JSON in thread data file", workflowResult: null };
|
||||
async function emitRoleStepPrint(params: {
|
||||
cas: CasStore;
|
||||
role: string;
|
||||
contentHash: string;
|
||||
meta: Record<string, unknown>;
|
||||
timestamp: number;
|
||||
roleFilter: string | null;
|
||||
}): Promise<void> {
|
||||
if (params.roleFilter !== null && params.role !== params.roleFilter) {
|
||||
return;
|
||||
}
|
||||
if (rec === null || typeof rec !== "object") {
|
||||
return { parseError: "invalid record in thread data file", workflowResult: null };
|
||||
}
|
||||
const obj = rec as Record<string, unknown>;
|
||||
|
||||
if (!state.sawStart) {
|
||||
state.sawStart = true;
|
||||
return { parseError: null, workflowResult: null };
|
||||
}
|
||||
|
||||
const wf = tryParseWorkflowResultRecord(obj);
|
||||
if (wf !== null) {
|
||||
state.completed = true;
|
||||
return { parseError: null, workflowResult: wf };
|
||||
}
|
||||
|
||||
const roleRow = tryParseRoleStepRecord(obj);
|
||||
if (roleRow === null) {
|
||||
return {
|
||||
parseError: "unrecognized record in thread data (expected role step or result)",
|
||||
workflowResult: null,
|
||||
};
|
||||
}
|
||||
|
||||
if (roleFilter !== null && roleRow.role !== roleFilter) {
|
||||
return { parseError: null, workflowResult: null };
|
||||
}
|
||||
|
||||
const payload = await getContentMerklePayload(cas, roleRow.contentHash);
|
||||
const payload = await getContentMerklePayload(params.cas, params.contentHash);
|
||||
const content =
|
||||
payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`;
|
||||
payload !== null ? payload : `(content not in CAS; contentHash=${params.contentHash})`;
|
||||
|
||||
const row: LiveRoleRow = {
|
||||
role: roleRow.role,
|
||||
role: params.role,
|
||||
content,
|
||||
meta: roleRow.meta,
|
||||
timestamp: roleRow.timestamp,
|
||||
meta: params.meta,
|
||||
timestamp: params.timestamp,
|
||||
};
|
||||
for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) {
|
||||
printCliLine(outLine);
|
||||
}
|
||||
return { parseError: null, workflowResult: null };
|
||||
}
|
||||
|
||||
async function pumpNewContent(
|
||||
dataPath: string,
|
||||
state: LiveSessionState,
|
||||
roleFilter: string | null,
|
||||
cas: CasStore,
|
||||
): Promise<number | null> {
|
||||
let text: string;
|
||||
async function emitStatesReachableFromHead(params: {
|
||||
cas: CasStore;
|
||||
headHash: string;
|
||||
state: CasLiveState;
|
||||
roleFilter: string | null;
|
||||
}): Promise<WorkflowCompletion | null> {
|
||||
const frames = await walkStateFramesNewestFirst(params.cas, params.headHash);
|
||||
const chronological = [...frames].reverse();
|
||||
|
||||
for (const fr of chronological) {
|
||||
if (params.state.printedHashes.has(fr.hash)) {
|
||||
continue;
|
||||
}
|
||||
params.state.printedHashes.add(fr.hash);
|
||||
|
||||
const role = fr.payload.role;
|
||||
if (role === FORK_BRANCH_ROLE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (role === END) {
|
||||
const wf = completionFromEndMeta(fr.payload.meta);
|
||||
if (wf !== null) {
|
||||
printSummary(wf);
|
||||
return wf;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
await emitRoleStepPrint({
|
||||
cas: params.cas,
|
||||
role,
|
||||
contentHash: fr.payload.content,
|
||||
meta: fr.payload.meta,
|
||||
timestamp: fr.payload.timestamp,
|
||||
roleFilter: params.roleFilter,
|
||||
});
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async function pumpThreadsJson(params: {
|
||||
storageRoot: string;
|
||||
bundleDir: string;
|
||||
bundleHash: string;
|
||||
threadId: string;
|
||||
state: CasLiveState;
|
||||
roleFilter: string | null;
|
||||
cas: CasStore;
|
||||
}): Promise<number | null> {
|
||||
let idx: ThreadIndex;
|
||||
try {
|
||||
text = await readFile(dataPath, "utf8");
|
||||
idx = await readThreadsIndex(params.bundleDir);
|
||||
} catch {
|
||||
return null;
|
||||
idx = {};
|
||||
}
|
||||
|
||||
if (text.length < state.contentOffset) {
|
||||
state.contentOffset = 0;
|
||||
state.carry = "";
|
||||
const active = idx[params.threadId];
|
||||
|
||||
if (active === undefined) {
|
||||
if (params.state.completionEmitted) {
|
||||
return null;
|
||||
}
|
||||
const hist = await resolveThreadRecord(params.storageRoot, params.threadId);
|
||||
if (hist === null || hist.source !== "history") {
|
||||
return null;
|
||||
}
|
||||
params.state.completionEmitted = true;
|
||||
const wf = await emitStatesReachableFromHead({
|
||||
cas: params.cas,
|
||||
headHash: hist.head,
|
||||
state: params.state,
|
||||
roleFilter: params.roleFilter,
|
||||
});
|
||||
return wf !== null ? 0 : null;
|
||||
}
|
||||
|
||||
const chunk = text.slice(state.contentOffset);
|
||||
state.contentOffset = text.length;
|
||||
state.carry += chunk;
|
||||
const head = active.head;
|
||||
if (params.state.lastHead === null) {
|
||||
params.state.lastHead = head;
|
||||
const wf = await emitStatesReachableFromHead({
|
||||
cas: params.cas,
|
||||
headHash: head,
|
||||
state: params.state,
|
||||
roleFilter: params.roleFilter,
|
||||
});
|
||||
return wf !== null ? 0 : null;
|
||||
}
|
||||
|
||||
const parts = state.carry.split("\n");
|
||||
state.carry = parts.pop() ?? "";
|
||||
|
||||
for (const line of parts) {
|
||||
const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas);
|
||||
if (parseError !== null) {
|
||||
printCliError(parseError);
|
||||
return 1;
|
||||
}
|
||||
if (workflowResult !== null) {
|
||||
printSummary(workflowResult);
|
||||
return 0;
|
||||
}
|
||||
if (head !== params.state.lastHead) {
|
||||
params.state.lastHead = head;
|
||||
const wf = await emitStatesReachableFromHead({
|
||||
cas: params.cas,
|
||||
headHash: head,
|
||||
state: params.state,
|
||||
roleFilter: params.roleFilter,
|
||||
});
|
||||
return wf !== null ? 0 : null;
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -291,9 +338,9 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
|
||||
schedulePump(path, pump);
|
||||
});
|
||||
watchers.push(watcher);
|
||||
watcher.on("error", (err: Error) => {
|
||||
watcher.on("error", (errObj: Error) => {
|
||||
closeAll();
|
||||
reject(err);
|
||||
reject(errObj);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -309,17 +356,14 @@ function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal })
|
||||
});
|
||||
}
|
||||
|
||||
type LiveThreadTarget = {
|
||||
threadId: string;
|
||||
dataPath: string;
|
||||
};
|
||||
type LiveThreadTarget = LatestThreadTarget;
|
||||
|
||||
async function resolveLiveThreadTarget(
|
||||
storageRoot: string,
|
||||
parsed: ParsedLiveArgv,
|
||||
): Promise<LiveThreadTarget | null> {
|
||||
if (parsed.latest) {
|
||||
const found = await findLatestThreadDataPath(storageRoot);
|
||||
const found = await findLatestThreadBundleTarget(storageRoot);
|
||||
if (found === null) {
|
||||
printCliError("live: no threads found");
|
||||
return null;
|
||||
@@ -332,36 +376,56 @@ async function resolveLiveThreadTarget(
|
||||
printCliError("live: internal error: missing thread id");
|
||||
return null;
|
||||
}
|
||||
const resolved = await resolveThreadDataPath(storageRoot, id);
|
||||
const resolved = await resolveThreadRecord(storageRoot, id);
|
||||
if (resolved === null) {
|
||||
printCliError(`thread not found: ${id}`);
|
||||
return null;
|
||||
}
|
||||
return { threadId: id, dataPath: resolved };
|
||||
return {
|
||||
threadId: id,
|
||||
bundleHash: resolved.bundleHash,
|
||||
bundleDir: resolved.bundleDir,
|
||||
threadsJsonPath: join(resolved.bundleDir, "threads.json"),
|
||||
};
|
||||
}
|
||||
|
||||
async function buildLiveWatchTasks(params: {
|
||||
dataPath: string;
|
||||
infoPath: string;
|
||||
storageRoot: string;
|
||||
target: LiveThreadTarget;
|
||||
debug: boolean;
|
||||
dataState: LiveSessionState;
|
||||
dataState: CasLiveState;
|
||||
infoState: InfoLiveState;
|
||||
roleFilter: string | null;
|
||||
cas: CasStore;
|
||||
}): Promise<WatchPumpTask[]> {
|
||||
const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params;
|
||||
const infoPath = join(
|
||||
params.storageRoot,
|
||||
"logs",
|
||||
params.target.bundleHash,
|
||||
`${params.target.threadId}.info.jsonl`,
|
||||
);
|
||||
|
||||
const tasks: WatchPumpTask[] = [
|
||||
{
|
||||
path: dataPath,
|
||||
pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas),
|
||||
path: params.target.threadsJsonPath,
|
||||
pump: () =>
|
||||
pumpThreadsJson({
|
||||
storageRoot: params.storageRoot,
|
||||
bundleDir: params.target.bundleDir,
|
||||
bundleHash: params.target.bundleHash,
|
||||
threadId: params.target.threadId,
|
||||
state: params.dataState,
|
||||
roleFilter: params.roleFilter,
|
||||
cas: params.cas,
|
||||
}),
|
||||
},
|
||||
];
|
||||
|
||||
if (debug && (await pathExists(infoPath))) {
|
||||
if (params.debug && (await pathExists(infoPath))) {
|
||||
tasks.push({
|
||||
path: infoPath,
|
||||
pump: async () => {
|
||||
await pumpNewInfoContent(infoPath, infoState);
|
||||
await pumpNewInfoContent(infoPath, params.infoState);
|
||||
return null;
|
||||
},
|
||||
});
|
||||
@@ -376,16 +440,13 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
|
||||
return 1;
|
||||
}
|
||||
|
||||
const { threadId, dataPath } = target;
|
||||
const roleFilter = parsed.role;
|
||||
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
|
||||
const dataState: LiveSessionState = {
|
||||
sawStart: false,
|
||||
completed: false,
|
||||
carry: "",
|
||||
contentOffset: 0,
|
||||
const dataState: CasLiveState = {
|
||||
printedHashes: new Set<string>(),
|
||||
lastHead: null,
|
||||
completionEmitted: false,
|
||||
};
|
||||
|
||||
const infoState: InfoLiveState = {
|
||||
@@ -400,22 +461,29 @@ export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Prom
|
||||
process.on("SIGINT", onSigInt);
|
||||
|
||||
try {
|
||||
const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas);
|
||||
if (firstData === 1) {
|
||||
return 1;
|
||||
}
|
||||
await mkdir(dirname(target.threadsJsonPath), { recursive: true });
|
||||
|
||||
const firstData = await pumpThreadsJson({
|
||||
storageRoot,
|
||||
bundleDir: target.bundleDir,
|
||||
bundleHash: target.bundleHash,
|
||||
threadId: target.threadId,
|
||||
state: dataState,
|
||||
roleFilter,
|
||||
cas,
|
||||
});
|
||||
const infoPath = join(storageRoot, "logs", target.bundleHash, `${target.threadId}.info.jsonl`);
|
||||
if (parsed.debug && (await pathExists(infoPath))) {
|
||||
await pumpNewInfoContent(infoPath, infoState);
|
||||
}
|
||||
|
||||
if (firstData === 0 || dataState.completed) {
|
||||
if (firstData === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const tasks = await buildLiveWatchTasks({
|
||||
dataPath,
|
||||
infoPath,
|
||||
storageRoot,
|
||||
target,
|
||||
debug: parsed.debug,
|
||||
dataState,
|
||||
infoState,
|
||||
|
||||
@@ -1,24 +1,35 @@
|
||||
import { unlink } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
import { garbageCollectCas } from "@uncaged/workflow-execute";
|
||||
import { join } from "node:path";
|
||||
import {
|
||||
garbageCollectCas,
|
||||
removeThreadEntry,
|
||||
removeThreadHistoryEntries,
|
||||
} from "@uncaged/workflow-execute";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
|
||||
import { resolveThreadDataPath } from "../../thread-scan.js";
|
||||
import { resolveThreadRecord } from "../../thread-scan.js";
|
||||
|
||||
export async function cmdThreadRemove(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
): Promise<Result<void, string>> {
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved === null) {
|
||||
return err(`thread not found: ${threadId}`);
|
||||
}
|
||||
|
||||
const dir = dirname(dataPath);
|
||||
const infoPath = join(dir, `${threadId}.info.jsonl`);
|
||||
const runningPath = join(dir, `${threadId}.running`);
|
||||
if (resolved.source === "active") {
|
||||
await removeThreadEntry(resolved.bundleDir, threadId);
|
||||
} else {
|
||||
const hist = await removeThreadHistoryEntries(resolved.bundleDir, threadId);
|
||||
if (!hist.ok) {
|
||||
return hist;
|
||||
}
|
||||
}
|
||||
|
||||
const infoPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.info.jsonl`);
|
||||
const runningPath = join(storageRoot, "logs", resolved.bundleHash, `${threadId}.running`);
|
||||
|
||||
await unlink(dataPath);
|
||||
await unlink(infoPath).catch(() => {});
|
||||
await unlink(runningPath).catch(() => {});
|
||||
|
||||
|
||||
@@ -1,19 +1,49 @@
|
||||
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import { readTextFileIfExists } from "../../fs-utils.js";
|
||||
import { resolveThreadDataPath } from "../../thread-scan.js";
|
||||
import { resolveThreadRecord } from "../../thread-scan.js";
|
||||
|
||||
export async function cmdThreadShow(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
): Promise<Result<string, string>> {
|
||||
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
|
||||
if (dataPath === null) {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved === null) {
|
||||
return err(`thread not found: ${threadId}`);
|
||||
}
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return err(`thread data missing: ${threadId}`);
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
||||
const chronological = [...frames].reverse();
|
||||
|
||||
const steps: Array<{ role: string; hash: string; timestamp: number; content: string }> = [];
|
||||
for (const fr of chronological) {
|
||||
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
|
||||
continue;
|
||||
}
|
||||
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
|
||||
steps.push({
|
||||
role: fr.payload.role,
|
||||
hash: fr.hash,
|
||||
timestamp: fr.payload.timestamp,
|
||||
content:
|
||||
payloadText !== null
|
||||
? payloadText
|
||||
: `(content not in CAS; contentHash=${fr.payload.content})`,
|
||||
});
|
||||
}
|
||||
return ok(text.endsWith("\n") ? text.slice(0, -1) : text);
|
||||
|
||||
const payload = {
|
||||
threadId: resolved.threadId,
|
||||
bundleHash: resolved.bundleHash,
|
||||
head: resolved.head,
|
||||
start: resolved.start,
|
||||
source: resolved.source,
|
||||
steps,
|
||||
};
|
||||
|
||||
return ok(JSON.stringify(payload, null, 2));
|
||||
}
|
||||
|
||||
@@ -70,8 +70,8 @@ function formatSkillCli(): string {
|
||||
|---------|-------------|
|
||||
| **Workflow** | A single-file ESM bundle (\`.esm.js\`) that exports \`run\` and \`descriptor\`. Identified by name and XXH64 hash. |
|
||||
| **Bundle** | The physical \`.esm.js\` file stored in the bundles directory. Immutable once written. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persists state as JSONL files. |
|
||||
| **CAS** | Content-Addressable Storage. Per-thread key-value store keyed by content hash. |
|
||||
| **Thread** | A single execution of a workflow, identified by a ULID. CAS state chain; \`threads.json\` for active; \`history/*.jsonl\` when done; \`.info.jsonl\` for debug logs. |
|
||||
| **CAS** | Global content-addressable blob store (\`cas/\`), keyed by hash. |
|
||||
| **Registry** | \`workflow.yaml\` — maps workflow names to their current and historical bundle hashes. |
|
||||
|
||||
## Commands
|
||||
|
||||
@@ -1,23 +1,89 @@
|
||||
import { readdir, stat } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
|
||||
import {
|
||||
readThreadsIndex,
|
||||
type ThreadHistoryEntry,
|
||||
type ThreadIndex,
|
||||
walkStateFramesNewestFirst,
|
||||
} from "@uncaged/workflow-execute";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||
|
||||
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
|
||||
|
||||
function parseFirstJsonLineObject(text: string): Record<string, unknown> | null {
|
||||
const firstLine = text.split("\n")[0];
|
||||
if (firstLine === undefined || firstLine.trim() === "") {
|
||||
async function readWorkflowNameFromStartHash(
|
||||
storageRoot: string,
|
||||
startHash: string,
|
||||
): Promise<string | null> {
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const yamlText = await cas.get(startHash);
|
||||
if (yamlText === null) {
|
||||
return null;
|
||||
}
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(firstLine) as unknown;
|
||||
} catch {
|
||||
const parsed = parseCasThreadNode(yamlText);
|
||||
if (parsed === null || parsed.kind !== "start") {
|
||||
return null;
|
||||
}
|
||||
if (parsed === null || typeof parsed !== "object") {
|
||||
return null;
|
||||
return parsed.node.payload.name;
|
||||
}
|
||||
|
||||
async function listBundleHashDirs(storageRoot: string): Promise<string[]> {
|
||||
const bundlesRoot = join(storageRoot, "bundles");
|
||||
if (!(await pathExists(bundlesRoot))) {
|
||||
return [];
|
||||
}
|
||||
return parsed as Record<string, unknown>;
|
||||
const names = await readdir(bundlesRoot);
|
||||
const out: string[] = [];
|
||||
for (const name of names) {
|
||||
const p = join(bundlesRoot, name);
|
||||
try {
|
||||
const st = await stat(p);
|
||||
if (st.isDirectory()) {
|
||||
out.push(name);
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
out.sort();
|
||||
return out;
|
||||
}
|
||||
|
||||
async function parseHistoryFile(path: string): Promise<ThreadHistoryEntry[]> {
|
||||
const text = await readTextFileIfExists(path);
|
||||
if (text === null) {
|
||||
return [];
|
||||
}
|
||||
const out: ThreadHistoryEntry[] = [];
|
||||
for (const line of text.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed === "") {
|
||||
continue;
|
||||
}
|
||||
let raw: unknown;
|
||||
try {
|
||||
raw = JSON.parse(trimmed) as unknown;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
continue;
|
||||
}
|
||||
const rec = raw as Record<string, unknown>;
|
||||
const threadId = rec.threadId;
|
||||
const head = rec.head;
|
||||
const start = rec.start;
|
||||
const completedAt = rec.completedAt;
|
||||
if (
|
||||
typeof threadId !== "string" ||
|
||||
typeof head !== "string" ||
|
||||
typeof start !== "string" ||
|
||||
typeof completedAt !== "number"
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
out.push({ threadId, head, start, completedAt });
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export type RunningThreadRow = {
|
||||
@@ -30,32 +96,151 @@ export type HistoricalThreadRow = {
|
||||
threadId: string;
|
||||
hash: string;
|
||||
workflowName: string | null;
|
||||
/** Active entry from `threads.json` vs completed line from `history/*.jsonl`. */
|
||||
source: "active" | "history";
|
||||
/** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */
|
||||
activityTs: number;
|
||||
/** Current CAS head (`threads.json` / history row). */
|
||||
head: string;
|
||||
};
|
||||
|
||||
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return null;
|
||||
export type ResolvedThreadRecord = {
|
||||
threadId: string;
|
||||
bundleHash: string;
|
||||
bundleDir: string;
|
||||
head: string;
|
||||
start: string;
|
||||
source: "active" | "history";
|
||||
};
|
||||
|
||||
/** Resolve a thread via `threads.json` (active) or `history/*.jsonl` (completed). */
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: scans all bundle dirs for thread id
|
||||
export async function resolveThreadRecord(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
): Promise<ResolvedThreadRecord | null> {
|
||||
const hashes = await listBundleHashDirs(storageRoot);
|
||||
for (const bundleHash of hashes) {
|
||||
const bundleDir = join(storageRoot, "bundles", bundleHash);
|
||||
let index: ThreadIndex;
|
||||
try {
|
||||
index = await readThreadsIndex(bundleDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const active = index[threadId];
|
||||
if (active !== undefined) {
|
||||
return {
|
||||
threadId,
|
||||
bundleHash,
|
||||
bundleDir,
|
||||
head: active.head,
|
||||
start: active.start,
|
||||
source: "active",
|
||||
};
|
||||
}
|
||||
}
|
||||
const parsed = parseFirstJsonLineObject(text);
|
||||
if (parsed === null) {
|
||||
return null;
|
||||
|
||||
for (const bundleHash of hashes) {
|
||||
const bundleDir = join(storageRoot, "bundles", bundleHash);
|
||||
const histDir = join(bundleDir, "history");
|
||||
if (!(await pathExists(histDir))) {
|
||||
continue;
|
||||
}
|
||||
let files: string[];
|
||||
try {
|
||||
files = await readdir(histDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const name of files) {
|
||||
if (!name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const entries = await parseHistoryFile(join(histDir, name));
|
||||
for (const e of entries) {
|
||||
if (e.threadId === threadId) {
|
||||
return {
|
||||
threadId,
|
||||
bundleHash,
|
||||
bundleDir,
|
||||
head: e.head,
|
||||
start: e.start,
|
||||
source: "history",
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const ts = parsed.timestamp;
|
||||
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
|
||||
const text = await readTextFileIfExists(dataPath);
|
||||
if (text === null) {
|
||||
return null;
|
||||
export type ThreadHeadTerminal =
|
||||
| { kind: "non-terminal" }
|
||||
| { kind: "terminal"; returnCode: number };
|
||||
|
||||
/** True when the newest frame at `headHash` is `__end__` (workflow finished in CAS). */
|
||||
export async function readThreadTerminalFromHead(
|
||||
storageRoot: string,
|
||||
headHash: string,
|
||||
): Promise<ThreadHeadTerminal> {
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const frames = await walkStateFramesNewestFirst(cas, headHash);
|
||||
const newest = frames[0];
|
||||
if (newest === undefined) {
|
||||
return { kind: "non-terminal" };
|
||||
}
|
||||
const parsed = parseFirstJsonLineObject(text);
|
||||
if (parsed === null) {
|
||||
return null;
|
||||
if (newest.payload.role !== END) {
|
||||
return { kind: "non-terminal" };
|
||||
}
|
||||
const name = parsed.name;
|
||||
return typeof name === "string" ? name : null;
|
||||
const rc = newest.payload.meta.returnCode;
|
||||
if (typeof rc !== "number") {
|
||||
return { kind: "terminal", returnCode: 1 };
|
||||
}
|
||||
return { kind: "terminal", returnCode: rc };
|
||||
}
|
||||
|
||||
export type ThreadListStatus = "running" | "active" | "completed" | "failed";
|
||||
|
||||
/** Combines `.running` marker with CAS head: stale markers do not imply `running`. */
|
||||
export async function resolveThreadListStatus(
|
||||
storageRoot: string,
|
||||
row: HistoricalThreadRow,
|
||||
runningMarkerPresent: boolean,
|
||||
): Promise<ThreadListStatus> {
|
||||
const terminal = await readThreadTerminalFromHead(storageRoot, row.head);
|
||||
if (terminal.kind === "terminal") {
|
||||
return terminal.returnCode !== 0 ? "failed" : "completed";
|
||||
}
|
||||
if (row.source === "history") {
|
||||
return "completed";
|
||||
}
|
||||
if (runningMarkerPresent) {
|
||||
return "running";
|
||||
}
|
||||
return "active";
|
||||
}
|
||||
|
||||
async function appendRunningThreadRowIfLive(
|
||||
storageRoot: string,
|
||||
hash: string,
|
||||
threadId: string,
|
||||
out: RunningThreadRow[],
|
||||
): Promise<void> {
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved !== null && resolved.bundleHash !== hash) {
|
||||
return;
|
||||
}
|
||||
if (resolved !== null) {
|
||||
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
|
||||
if (terminal.kind === "terminal") {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const workflowName =
|
||||
resolved !== null ? await readWorkflowNameFromStartHash(storageRoot, resolved.start) : null;
|
||||
out.push({ threadId, hash, workflowName });
|
||||
}
|
||||
|
||||
/** Threads currently executing — identified via `<threadId>.running` markers. */
|
||||
@@ -82,9 +267,7 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
|
||||
continue;
|
||||
}
|
||||
const threadId = fileName.slice(0, -".running".length);
|
||||
const dataPath = join(dir, `${threadId}.data.jsonl`);
|
||||
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
|
||||
out.push({ threadId, hash, workflowName });
|
||||
await appendRunningThreadRowIfLive(storageRoot, hash, threadId, out);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,41 +281,84 @@ export async function listRunningThreads(storageRoot: string): Promise<RunningTh
|
||||
}
|
||||
|
||||
/**
|
||||
* Historical threads discovered via `*.data.jsonl`.
|
||||
* When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned.
|
||||
* Threads discovered via `threads.json` (active) and `history/*.jsonl` (completed).
|
||||
* When `workflowNameFilter` is non-null, only threads whose StartNode `name` matches are returned.
|
||||
*/
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: merges active index + partitioned history
|
||||
export async function listHistoricalThreads(
|
||||
storageRoot: string,
|
||||
workflowNameFilter: string | null,
|
||||
): Promise<HistoricalThreadRow[]> {
|
||||
const logsRoot = join(storageRoot, "logs");
|
||||
if (!(await pathExists(logsRoot))) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const hashes = await readdir(logsRoot);
|
||||
const hashes = await listBundleHashDirs(storageRoot);
|
||||
const seen = new Set<string>();
|
||||
const out: HistoricalThreadRow[] = [];
|
||||
|
||||
for (const hash of hashes) {
|
||||
const dir = join(logsRoot, hash);
|
||||
let entries: string[];
|
||||
for (const bundleHash of hashes) {
|
||||
const bundleDir = join(storageRoot, "bundles", bundleHash);
|
||||
let index: ThreadIndex;
|
||||
try {
|
||||
entries = await readdir(dir);
|
||||
index = await readThreadsIndex(bundleDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const fileName of entries) {
|
||||
if (!fileName.endsWith(".data.jsonl")) {
|
||||
for (const threadId of Object.keys(index)) {
|
||||
const key = `${bundleHash}/${threadId}`;
|
||||
if (seen.has(key)) {
|
||||
continue;
|
||||
}
|
||||
const threadId = fileName.slice(0, -".data.jsonl".length);
|
||||
const dataPath = join(dir, fileName);
|
||||
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
|
||||
seen.add(key);
|
||||
const entry = index[threadId];
|
||||
if (entry === undefined) {
|
||||
continue;
|
||||
}
|
||||
const workflowName = await readWorkflowNameFromStartHash(storageRoot, entry.start);
|
||||
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
||||
continue;
|
||||
}
|
||||
out.push({ threadId, hash, workflowName });
|
||||
out.push({
|
||||
threadId,
|
||||
hash: bundleHash,
|
||||
workflowName,
|
||||
source: "active",
|
||||
activityTs: entry.updatedAt,
|
||||
head: entry.head,
|
||||
});
|
||||
}
|
||||
|
||||
const histDir = join(bundleDir, "history");
|
||||
if (!(await pathExists(histDir))) {
|
||||
continue;
|
||||
}
|
||||
let files: string[];
|
||||
try {
|
||||
files = await readdir(histDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const name of files) {
|
||||
if (!name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const entries = await parseHistoryFile(join(histDir, name));
|
||||
for (const e of entries) {
|
||||
const key = `${bundleHash}/${e.threadId}`;
|
||||
if (seen.has(key)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(key);
|
||||
const workflowName = await readWorkflowNameFromStartHash(storageRoot, e.start);
|
||||
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
||||
continue;
|
||||
}
|
||||
out.push({
|
||||
threadId: e.threadId,
|
||||
hash: bundleHash,
|
||||
workflowName,
|
||||
source: "history",
|
||||
activityTs: e.completedAt,
|
||||
head: e.head,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,64 +371,93 @@ export async function listHistoricalThreads(
|
||||
return out;
|
||||
}
|
||||
|
||||
export type LatestThreadTarget = {
|
||||
threadId: string;
|
||||
bundleHash: string;
|
||||
bundleDir: string;
|
||||
threadsJsonPath: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
|
||||
* falling back to file `mtime` when the timestamp is missing.
|
||||
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
|
||||
* Picks the newest thread by StartNode timestamp approximation (`updatedAt` active,
|
||||
* else `completedAt` history), falling back to lexical thread id order.
|
||||
*/
|
||||
export async function findLatestThreadDataPath(
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: compares active heads vs history tails
|
||||
export async function findLatestThreadBundleTarget(
|
||||
storageRoot: string,
|
||||
): Promise<{ threadId: string; dataPath: string } | null> {
|
||||
const threads = await listHistoricalThreads(storageRoot, null);
|
||||
if (threads.length === 0) {
|
||||
return null;
|
||||
}
|
||||
): Promise<LatestThreadTarget | null> {
|
||||
const hashes = await listBundleHashDirs(storageRoot);
|
||||
|
||||
let best: {
|
||||
threadId: string;
|
||||
dataPath: string;
|
||||
primary: number;
|
||||
secondary: number;
|
||||
bundleHash: string;
|
||||
bundleDir: string;
|
||||
ts: number;
|
||||
} | null = null;
|
||||
|
||||
for (const t of threads) {
|
||||
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
|
||||
let mtimeMs = 0;
|
||||
for (const bundleHash of hashes) {
|
||||
const bundleDir = join(storageRoot, "bundles", bundleHash);
|
||||
let index: ThreadIndex;
|
||||
try {
|
||||
const st = await stat(dataPath);
|
||||
mtimeMs = st.mtimeMs;
|
||||
index = await readThreadsIndex(bundleDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const startTs = await readThreadStartTimestampMs(dataPath);
|
||||
const primary = startTs !== null ? startTs : mtimeMs;
|
||||
const secondary = mtimeMs;
|
||||
if (
|
||||
best === null ||
|
||||
primary > best.primary ||
|
||||
(primary === best.primary && secondary > best.secondary)
|
||||
) {
|
||||
best = { threadId: t.threadId, dataPath, primary, secondary };
|
||||
for (const threadId of Object.keys(index)) {
|
||||
const ent = index[threadId];
|
||||
if (ent === undefined) {
|
||||
continue;
|
||||
}
|
||||
const ts = ent.updatedAt;
|
||||
const cand = { threadId, bundleHash, bundleDir, ts };
|
||||
if (
|
||||
best === null ||
|
||||
cand.ts > best.ts ||
|
||||
(cand.ts === best.ts &&
|
||||
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
|
||||
) {
|
||||
best = cand;
|
||||
}
|
||||
}
|
||||
|
||||
const histDir = join(bundleDir, "history");
|
||||
if (!(await pathExists(histDir))) {
|
||||
continue;
|
||||
}
|
||||
let files: string[];
|
||||
try {
|
||||
files = await readdir(histDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const name of files) {
|
||||
if (!name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const entries = await parseHistoryFile(join(histDir, name));
|
||||
for (const e of entries) {
|
||||
const ts = e.completedAt;
|
||||
const cand = { threadId: e.threadId, bundleHash, bundleDir, ts };
|
||||
if (
|
||||
best === null ||
|
||||
cand.ts > best.ts ||
|
||||
(cand.ts === best.ts &&
|
||||
`${cand.bundleHash}/${cand.threadId}` > `${best.bundleHash}/${best.threadId}`)
|
||||
) {
|
||||
best = cand;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
|
||||
}
|
||||
|
||||
export async function resolveThreadDataPath(
|
||||
storageRoot: string,
|
||||
threadId: string,
|
||||
): Promise<string | null> {
|
||||
const logsRoot = join(storageRoot, "logs");
|
||||
if (!(await pathExists(logsRoot))) {
|
||||
if (best === null) {
|
||||
return null;
|
||||
}
|
||||
const hashes = await readdir(logsRoot);
|
||||
for (const hash of hashes) {
|
||||
const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`);
|
||||
if (await pathExists(candidate)) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
return {
|
||||
threadId: best.threadId,
|
||||
bundleHash: best.bundleHash,
|
||||
bundleDir: best.bundleDir,
|
||||
threadsJsonPath: join(best.bundleDir, "threads.json"),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||
|
||||
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
|
||||
import { readThreadTerminalFromHead, resolveThreadRecord } from "./thread-scan.js";
|
||||
|
||||
export type WorkerCtl = {
|
||||
pid: number;
|
||||
@@ -269,7 +270,25 @@ export async function resolveRunningHashForThread(
|
||||
if (!(await pathExists(logsRoot))) {
|
||||
return err(`thread not running (no logs dir): ${threadId}`);
|
||||
}
|
||||
const hashes = await readdir(logsRoot);
|
||||
const resolved = await resolveThreadRecord(storageRoot, threadId);
|
||||
if (resolved !== null) {
|
||||
const runningPath = join(logsRoot, resolved.bundleHash, `${threadId}.running`);
|
||||
if (!(await pathExists(runningPath))) {
|
||||
return err(`thread not running: ${threadId}`);
|
||||
}
|
||||
const terminal = await readThreadTerminalFromHead(storageRoot, resolved.head);
|
||||
if (terminal.kind === "terminal") {
|
||||
return err(`thread not running: ${threadId}`);
|
||||
}
|
||||
return ok(resolved.bundleHash);
|
||||
}
|
||||
|
||||
let hashes: string[];
|
||||
try {
|
||||
hashes = await readdir(logsRoot);
|
||||
} catch {
|
||||
return err(`thread not running: ${threadId}`);
|
||||
}
|
||||
for (const hash of hashes) {
|
||||
const runningPath = join(logsRoot, hash, `${threadId}.running`);
|
||||
if (await pathExists(runningPath)) {
|
||||
|
||||
@@ -1,8 +1,41 @@
|
||||
import { parse, stringify } from "yaml";
|
||||
|
||||
import type { CasStore, MerkleNode, StepMerklePayload, ThreadMerklePayload } from "./types.js";
|
||||
import type {
|
||||
CasStore,
|
||||
MerkleNode,
|
||||
MerkleNodeType,
|
||||
StepMerklePayload,
|
||||
ThreadMerklePayload,
|
||||
} from "./types.js";
|
||||
|
||||
function requireStringHashArray(value: unknown, notArrayMessage: string): string[] {
|
||||
if (!Array.isArray(value)) {
|
||||
throw new Error(notArrayMessage);
|
||||
}
|
||||
const out: string[] = [];
|
||||
for (const c of value) {
|
||||
if (typeof c !== "string") {
|
||||
throw new Error("merkle: hash entry must be a string");
|
||||
}
|
||||
out.push(c);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function edgeListRaw(rec: Record<string, unknown>, type: MerkleNodeType): unknown {
|
||||
if (type === "content") {
|
||||
return rec.refs !== undefined ? rec.refs : rec.children;
|
||||
}
|
||||
return rec.children;
|
||||
}
|
||||
|
||||
export function serializeMerkleNode(node: MerkleNode): string {
|
||||
if (node.type === "content") {
|
||||
return stringify(
|
||||
{ type: node.type, payload: node.payload, refs: node.children },
|
||||
{ indent: 2 },
|
||||
);
|
||||
}
|
||||
return stringify(
|
||||
{ type: node.type, payload: node.payload, children: node.children },
|
||||
{ indent: 2 },
|
||||
@@ -17,23 +50,18 @@ export function parseMerkleNode(yamlText: string): MerkleNode {
|
||||
const rec = raw as Record<string, unknown>;
|
||||
const type = rec.type;
|
||||
const payload = rec.payload;
|
||||
const children = rec.children;
|
||||
if (type !== "content" && type !== "step" && type !== "thread") {
|
||||
throw new Error("merkle: invalid or missing type");
|
||||
}
|
||||
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
|
||||
throw new Error("merkle: payload must be a string or object");
|
||||
}
|
||||
if (!Array.isArray(children)) {
|
||||
throw new Error("merkle: children must be an array");
|
||||
}
|
||||
const childHashes: string[] = [];
|
||||
for (const c of children) {
|
||||
if (typeof c !== "string") {
|
||||
throw new Error("merkle: child hash must be a string");
|
||||
}
|
||||
childHashes.push(c);
|
||||
}
|
||||
|
||||
const notArrayMsg =
|
||||
type === "content"
|
||||
? "merkle: content node requires refs or children array"
|
||||
: "merkle: children must be an array";
|
||||
const childHashes = requireStringHashArray(edgeListRaw(rec, type), notArrayMsg);
|
||||
return {
|
||||
type,
|
||||
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
|
||||
@@ -85,8 +113,8 @@ export async function putContentMerkleNode(store: CasStore, content: string): Pr
|
||||
/**
|
||||
* Loads a CAS blob and returns the payload string for a `content` node.
|
||||
*
|
||||
* Accepts both the legacy `{type:content, payload, children}` Merkle layout
|
||||
* and the RFC v3 `{type:content, payload, refs}` content node layout.
|
||||
* Accepts both the legacy `{ type:content, payload, children }` Merkle layout
|
||||
* and the RFC-aligned `{ type:content, payload, refs }` content node layout.
|
||||
*/
|
||||
export async function getContentMerklePayload(
|
||||
store: CasStore,
|
||||
|
||||
@@ -9,7 +9,10 @@ function refsFromBlob(content: string): string[] {
|
||||
return [];
|
||||
}
|
||||
const rec = raw as Record<string, unknown>;
|
||||
const refs = rec.refs;
|
||||
let refs = rec.refs;
|
||||
if (!Array.isArray(refs) && Array.isArray(rec.children)) {
|
||||
refs = rec.children;
|
||||
}
|
||||
if (!Array.isArray(refs)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
VITE_GATEWAY_URL=https://workflow-gateway.shazhou.workers.dev
|
||||
@@ -10,7 +10,9 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"react": "^19.2.6",
|
||||
"react-dom": "^19.2.6"
|
||||
"react-dom": "^19.2.6",
|
||||
"react-markdown": "^10.1.0",
|
||||
"shiki": "^4.0.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tailwindcss/vite": "^4.2.4",
|
||||
|
||||
@@ -1,9 +1,43 @@
|
||||
const BASE = "/api";
|
||||
const GATEWAY_URL = import.meta.env.VITE_GATEWAY_URL || "";
|
||||
|
||||
async function postJson<T>(path: string, body: unknown): Promise<T> {
|
||||
const res = await fetch(`${BASE}${path}`, {
|
||||
export function getApiKey(): string | null {
|
||||
try {
|
||||
return localStorage.getItem("workflow-api-key");
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function setApiKey(key: string): void {
|
||||
localStorage.setItem("workflow-api-key", key);
|
||||
}
|
||||
|
||||
export function clearApiKey(): void {
|
||||
localStorage.removeItem("workflow-api-key");
|
||||
}
|
||||
|
||||
export function hasApiKey(): boolean {
|
||||
return getApiKey() !== null && getApiKey() !== "";
|
||||
}
|
||||
|
||||
function authHeaders(): Record<string, string> {
|
||||
const key = getApiKey();
|
||||
if (key) return { Authorization: `Bearer ${key}` };
|
||||
return {};
|
||||
}
|
||||
|
||||
function agentBase(agent: string): string {
|
||||
if (GATEWAY_URL) {
|
||||
return `${GATEWAY_URL}/api/${agent}`;
|
||||
}
|
||||
// Local dev: proxy via vite, no agent prefix
|
||||
return "/api";
|
||||
}
|
||||
|
||||
async function postJson<T>(base: string, path: string, body: unknown): Promise<T> {
|
||||
const res = await fetch(`${base}${path}`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
headers: { "Content-Type": "application/json", ...authHeaders() },
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
if (!res.ok) {
|
||||
@@ -13,14 +47,23 @@ async function postJson<T>(path: string, body: unknown): Promise<T> {
|
||||
return res.json() as Promise<T>;
|
||||
}
|
||||
|
||||
async function fetchJson<T>(path: string): Promise<T> {
|
||||
const res = await fetch(`${BASE}${path}`);
|
||||
async function fetchJson<T>(base: string, path: string): Promise<T> {
|
||||
const res = await fetch(`${base}${path}`, { headers: authHeaders() });
|
||||
if (!res.ok) {
|
||||
throw new Error(`API ${res.status}: ${path}`);
|
||||
}
|
||||
return res.json() as Promise<T>;
|
||||
}
|
||||
|
||||
// ── Endpoint types ──────────────────────────────────────────────────
|
||||
|
||||
export type AgentEndpoint = {
|
||||
name: string;
|
||||
url: string;
|
||||
status: string;
|
||||
lastHeartbeat: number;
|
||||
};
|
||||
|
||||
export type WorkflowSummary = {
|
||||
name: string;
|
||||
currentHash: string;
|
||||
@@ -35,50 +78,78 @@ export type ThreadSummary = {
|
||||
status: string | null;
|
||||
};
|
||||
|
||||
export type ThreadRecord = {
|
||||
type: string;
|
||||
role: string | null;
|
||||
content: string | null;
|
||||
timestamp: number | null;
|
||||
[key: string]: unknown;
|
||||
export type ThreadStartRecord = {
|
||||
type: "thread-start";
|
||||
workflow: string;
|
||||
prompt: string | null;
|
||||
threadId: string;
|
||||
status: string;
|
||||
timestamp: null;
|
||||
};
|
||||
|
||||
export function listWorkflows(): Promise<{ workflows: WorkflowSummary[] }> {
|
||||
return fetchJson("/workflows");
|
||||
export type RoleRecord = {
|
||||
type: "role";
|
||||
role: string;
|
||||
content: string;
|
||||
timestamp: number | null;
|
||||
meta: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type WorkflowResultRecord = {
|
||||
type: "workflow-result";
|
||||
returnCode: number;
|
||||
content: string;
|
||||
timestamp: number | null;
|
||||
};
|
||||
|
||||
export type ThreadRecord = ThreadStartRecord | RoleRecord | WorkflowResultRecord;
|
||||
|
||||
// ── Gateway endpoints ───────────────────────────────────────────────
|
||||
|
||||
export function listAgents(): Promise<AgentEndpoint[]> {
|
||||
const url = GATEWAY_URL || "";
|
||||
return fetchJson(url, "/endpoints");
|
||||
}
|
||||
|
||||
export function listThreads(): Promise<{ threads: ThreadSummary[] }> {
|
||||
return fetchJson("/threads");
|
||||
// ── Agent-scoped endpoints ──────────────────────────────────────────
|
||||
|
||||
export function listWorkflows(agent: string): Promise<{ workflows: WorkflowSummary[] }> {
|
||||
return fetchJson(agentBase(agent), "/workflows");
|
||||
}
|
||||
|
||||
export function listRunningThreads(): Promise<{ threads: ThreadSummary[] }> {
|
||||
return fetchJson("/threads/running");
|
||||
export function listThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
|
||||
return fetchJson(agentBase(agent), "/threads");
|
||||
}
|
||||
|
||||
export function getThread(id: string): Promise<{ records: ThreadRecord[] }> {
|
||||
return fetchJson(`/threads/${id}`);
|
||||
export function listRunningThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
|
||||
return fetchJson(agentBase(agent), "/threads/running");
|
||||
}
|
||||
|
||||
export function getThread(agent: string, id: string): Promise<{ records: ThreadRecord[] }> {
|
||||
return fetchJson(agentBase(agent), `/threads/${id}`);
|
||||
}
|
||||
|
||||
export function runThread(
|
||||
agent: string,
|
||||
workflow: string,
|
||||
prompt: string,
|
||||
maxRounds: number = 10,
|
||||
): Promise<{ threadId: string }> {
|
||||
return postJson("/threads", { workflow, prompt, maxRounds });
|
||||
return postJson(agentBase(agent), "/threads", { workflow, prompt, maxRounds });
|
||||
}
|
||||
|
||||
export function killThread(threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(`/threads/${threadId}/kill`, {});
|
||||
export function killThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(agentBase(agent), `/threads/${threadId}/kill`, {});
|
||||
}
|
||||
|
||||
export function pauseThread(threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(`/threads/${threadId}/pause`, {});
|
||||
export function pauseThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(agentBase(agent), `/threads/${threadId}/pause`, {});
|
||||
}
|
||||
|
||||
export function resumeThread(threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(`/threads/${threadId}/resume`, {});
|
||||
export function resumeThread(agent: string, threadId: string): Promise<{ ok: boolean }> {
|
||||
return postJson(agentBase(agent), `/threads/${threadId}/resume`, {});
|
||||
}
|
||||
|
||||
export function getHealth(): Promise<{ ok: boolean }> {
|
||||
return fetchJson("/healthz");
|
||||
export function getAgentHealth(agent: string): Promise<{ ok: boolean }> {
|
||||
return fetchJson(agentBase(agent), "/healthz");
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { useState } from "react";
|
||||
import { hasApiKey, clearApiKey } from "./api.ts";
|
||||
import { LoginPage } from "./components/login.tsx";
|
||||
import { RunDialog } from "./components/run-dialog.tsx";
|
||||
import { Sidebar } from "./components/sidebar.tsx";
|
||||
import { StatusBar } from "./components/status-bar.tsx";
|
||||
@@ -8,24 +10,39 @@ import { WorkflowList } from "./components/workflow-list.tsx";
|
||||
import { useHashRoute } from "./use-hash-route.ts";
|
||||
|
||||
export function App() {
|
||||
const { view, threadId, setView, setThreadId } = useHashRoute();
|
||||
const [authed, setAuthed] = useState(hasApiKey());
|
||||
const { view, agent, threadId, setView, setAgent, setThreadId } = useHashRoute();
|
||||
const [showRun, setShowRun] = useState(false);
|
||||
|
||||
if (!authed) {
|
||||
return <LoginPage onLogin={() => setAuthed(true)} />;
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="flex h-screen">
|
||||
<Sidebar view={view} onViewChange={setView} />
|
||||
<Sidebar view={view} agent={agent} onViewChange={setView} onAgentChange={setAgent} onLogout={() => { clearApiKey(); setAuthed(false); }} />
|
||||
<main className="flex-1 overflow-hidden flex flex-col">
|
||||
<StatusBar onRun={() => setShowRun(true)} />
|
||||
<StatusBar agent={agent} onRun={() => setShowRun(true)} />
|
||||
<div className="flex-1 overflow-auto p-6">
|
||||
{view === "threads" && threadId === null && <ThreadList onSelect={setThreadId} />}
|
||||
{view === "threads" && threadId !== null && (
|
||||
<ThreadDetail threadId={threadId} onBack={() => setThreadId(null)} />
|
||||
{!agent && (
|
||||
<div className="flex items-center justify-center h-full">
|
||||
<p style={{ color: "var(--color-text-muted)" }}>
|
||||
Select an agent from the sidebar to get started.
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
{view === "workflows" && <WorkflowList />}
|
||||
{agent && view === "threads" && threadId === null && (
|
||||
<ThreadList agent={agent} onSelect={setThreadId} />
|
||||
)}
|
||||
{agent && view === "threads" && threadId !== null && (
|
||||
<ThreadDetail agent={agent} threadId={threadId} onBack={() => setThreadId(null)} />
|
||||
)}
|
||||
{agent && view === "workflows" && <WorkflowList agent={agent} />}
|
||||
</div>
|
||||
</main>
|
||||
{showRun && (
|
||||
{showRun && agent && (
|
||||
<RunDialog
|
||||
agent={agent}
|
||||
onClose={() => setShowRun(false)}
|
||||
onCreated={(id) => {
|
||||
setShowRun(false);
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
import { useState } from "react";
|
||||
import { setApiKey } from "../api.ts";
|
||||
|
||||
type Props = {
|
||||
onLogin: () => void;
|
||||
};
|
||||
|
||||
export function LoginPage({ onLogin }: Props) {
|
||||
const [key, setKey] = useState("");
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [loading, setLoading] = useState(false);
|
||||
|
||||
async function handleSubmit(e: React.FormEvent) {
|
||||
e.preventDefault();
|
||||
if (!key.trim()) return;
|
||||
|
||||
setLoading(true);
|
||||
setError(null);
|
||||
|
||||
// Test the key by hitting the endpoints list
|
||||
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
|
||||
try {
|
||||
const res = await fetch(`${gatewayUrl}/endpoints`, {
|
||||
headers: { Authorization: `Bearer ${key.trim()}` },
|
||||
});
|
||||
if (res.status === 401) {
|
||||
setError("Invalid API key");
|
||||
setLoading(false);
|
||||
return;
|
||||
}
|
||||
if (!res.ok) {
|
||||
setError(`Server error: ${res.status}`);
|
||||
setLoading(false);
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
setError(`Connection failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
setLoading(false);
|
||||
return;
|
||||
}
|
||||
|
||||
setApiKey(key.trim());
|
||||
onLogin();
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="min-h-screen flex items-center justify-center" style={{ background: "var(--color-bg)" }}>
|
||||
<div
|
||||
className="p-8 rounded-lg border w-full max-w-sm"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
<h1 className="text-xl font-bold mb-1" style={{ color: "var(--color-accent)" }}>
|
||||
⚙ Workflow Dashboard
|
||||
</h1>
|
||||
<p className="text-sm mb-6" style={{ color: "var(--color-text-muted)" }}>
|
||||
Enter your API key to continue
|
||||
</p>
|
||||
<form onSubmit={handleSubmit}>
|
||||
<input
|
||||
type="password"
|
||||
value={key}
|
||||
onChange={(e) => setKey(e.target.value)}
|
||||
placeholder="API Key"
|
||||
className="w-full px-3 py-2 rounded border text-sm mb-3 outline-none"
|
||||
style={{
|
||||
background: "var(--color-bg)",
|
||||
borderColor: "var(--color-border)",
|
||||
color: "var(--color-text)",
|
||||
}}
|
||||
autoFocus
|
||||
/>
|
||||
{error && (
|
||||
<p className="text-xs mb-3" style={{ color: "var(--color-error)" }}>
|
||||
{error}
|
||||
</p>
|
||||
)}
|
||||
<button
|
||||
type="submit"
|
||||
disabled={loading || !key.trim()}
|
||||
className="w-full px-3 py-2 rounded text-sm font-medium"
|
||||
style={{
|
||||
background: "var(--color-accent)",
|
||||
color: "var(--color-bg)",
|
||||
opacity: loading || !key.trim() ? 0.5 : 1,
|
||||
}}
|
||||
>
|
||||
{loading ? "Verifying..." : "Login"}
|
||||
</button>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
import ReactMarkdown from "react-markdown";
|
||||
import { useEffect, useState } from "react";
|
||||
import { createHighlighter, type HighlighterGeneric, type BundledLanguage, type BundledTheme } from "shiki";
|
||||
|
||||
let highlighterPromise: Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> | null = null;
|
||||
|
||||
const LANGS: BundledLanguage[] = ["typescript", "javascript", "json", "yaml", "bash", "python", "markdown"];
|
||||
|
||||
function getHighlighter(): Promise<HighlighterGeneric<BundledLanguage, BundledTheme>> {
|
||||
if (highlighterPromise === null) {
|
||||
highlighterPromise = createHighlighter({
|
||||
themes: ["github-dark"],
|
||||
langs: LANGS,
|
||||
});
|
||||
}
|
||||
return highlighterPromise;
|
||||
}
|
||||
|
||||
function CodeBlock({ className, children }: { className?: string; children?: React.ReactNode }) {
|
||||
const [html, setHtml] = useState<string | null>(null);
|
||||
const code = String(children).replace(/\n$/, "");
|
||||
const lang = className?.replace("language-", "") ?? "text";
|
||||
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
getHighlighter().then((hl) => {
|
||||
if (cancelled) return;
|
||||
try {
|
||||
const result = hl.codeToHtml(code, { lang, theme: "github-dark" });
|
||||
setHtml(result);
|
||||
} catch {
|
||||
setHtml(null);
|
||||
}
|
||||
});
|
||||
return () => { cancelled = true; };
|
||||
}, [code, lang]);
|
||||
|
||||
if (html !== null) {
|
||||
return (
|
||||
<div
|
||||
className="rounded overflow-x-auto text-xs my-2"
|
||||
// biome-ignore lint/security/noDangerouslySetInnerHtml: shiki output is safe
|
||||
dangerouslySetInnerHTML={{ __html: html }}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<pre className="rounded overflow-x-auto text-xs my-2 p-3" style={{ background: "var(--color-bg)" }}>
|
||||
<code>{code}</code>
|
||||
</pre>
|
||||
);
|
||||
}
|
||||
|
||||
export function Markdown({ content }: { content: string }) {
|
||||
return (
|
||||
<div className="prose prose-invert prose-sm max-w-none">
|
||||
<ReactMarkdown
|
||||
components={{
|
||||
code({ className, children, ...props }) {
|
||||
const isInline = !className;
|
||||
if (isInline) {
|
||||
return (
|
||||
<code
|
||||
className="text-xs px-1 py-0.5 rounded"
|
||||
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
|
||||
{...props}
|
||||
>
|
||||
{children}
|
||||
</code>
|
||||
);
|
||||
}
|
||||
return <CodeBlock className={className}>{children}</CodeBlock>;
|
||||
},
|
||||
p({ children }) {
|
||||
return <p className="my-1.5 leading-relaxed">{children}</p>;
|
||||
},
|
||||
ul({ children }) {
|
||||
return <ul className="list-disc pl-4 my-1.5">{children}</ul>;
|
||||
},
|
||||
ol({ children }) {
|
||||
return <ol className="list-decimal pl-4 my-1.5">{children}</ol>;
|
||||
},
|
||||
h1({ children }) {
|
||||
return <h1 className="text-lg font-bold mt-3 mb-1">{children}</h1>;
|
||||
},
|
||||
h2({ children }) {
|
||||
return <h2 className="text-base font-bold mt-2 mb-1">{children}</h2>;
|
||||
},
|
||||
h3({ children }) {
|
||||
return <h3 className="text-sm font-bold mt-2 mb-1">{children}</h3>;
|
||||
},
|
||||
blockquote({ children }) {
|
||||
return (
|
||||
<blockquote
|
||||
className="border-l-2 pl-3 my-2 text-sm"
|
||||
style={{ borderColor: "var(--color-accent)", color: "var(--color-text-muted)" }}
|
||||
>
|
||||
{children}
|
||||
</blockquote>
|
||||
);
|
||||
},
|
||||
}}>
|
||||
{content}
|
||||
</ReactMarkdown>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,128 @@
|
||||
import type { ThreadStartRecord, RoleRecord, WorkflowResultRecord, ThreadRecord } from "../api.ts";
|
||||
import { Markdown } from "./markdown.tsx";
|
||||
|
||||
const ROLE_COLORS: Record<string, string> = {
|
||||
preparer: "#8b5cf6",
|
||||
agent: "#3b82f6",
|
||||
extractor: "#f59e0b",
|
||||
};
|
||||
|
||||
function roleColor(role: string): string {
|
||||
return ROLE_COLORS[role] ?? "var(--color-accent)";
|
||||
}
|
||||
|
||||
function formatTime(ts: number | null): string | null {
|
||||
if (ts === null) return null;
|
||||
return new Date(ts).toLocaleTimeString();
|
||||
}
|
||||
|
||||
function StartCard({ record }: { record: ThreadStartRecord }) {
|
||||
return (
|
||||
<div
|
||||
className="p-4 rounded-lg border"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
<div className="flex items-center gap-2 mb-2">
|
||||
<span className="text-lg">🚀</span>
|
||||
<span className="font-semibold" style={{ color: "var(--color-accent)" }}>
|
||||
{record.workflow}
|
||||
</span>
|
||||
<span
|
||||
className="text-xs px-2 py-0.5 rounded"
|
||||
style={{
|
||||
background: record.status === "active" ? "var(--color-success)" : "var(--color-border)",
|
||||
color: record.status === "active" ? "var(--color-bg)" : "var(--color-text-muted)",
|
||||
}}
|
||||
>
|
||||
{record.status}
|
||||
</span>
|
||||
</div>
|
||||
{record.prompt !== null && (
|
||||
<div
|
||||
className="mt-2 p-3 rounded text-sm border-l-2"
|
||||
style={{
|
||||
background: "var(--color-bg)",
|
||||
borderColor: "var(--color-accent)",
|
||||
color: "var(--color-text)",
|
||||
}}
|
||||
>
|
||||
<div className="text-xs mb-1" style={{ color: "var(--color-text-muted)" }}>
|
||||
Prompt
|
||||
</div>
|
||||
<Markdown content={record.prompt} />
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function RoleMessage({ record }: { record: RoleRecord }) {
|
||||
const color = roleColor(record.role);
|
||||
return (
|
||||
<div
|
||||
className="p-3 rounded-lg border text-sm"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
<div className="flex items-center gap-2 mb-2">
|
||||
<span
|
||||
className="text-xs px-2 py-0.5 rounded font-mono font-medium"
|
||||
style={{ background: color, color: "#fff" }}
|
||||
>
|
||||
{record.role}
|
||||
</span>
|
||||
{formatTime(record.timestamp) !== null && (
|
||||
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
|
||||
{formatTime(record.timestamp)}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
<Markdown content={record.content} />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function ResultCard({ record }: { record: WorkflowResultRecord }) {
|
||||
const success = record.returnCode === 0;
|
||||
return (
|
||||
<div
|
||||
className="p-4 rounded-lg border"
|
||||
style={{
|
||||
background: "var(--color-surface)",
|
||||
borderColor: success ? "var(--color-success)" : "var(--color-error)",
|
||||
}}
|
||||
>
|
||||
<div className="flex items-center gap-2 mb-2">
|
||||
<span className="text-lg">{success ? "✅" : "❌"}</span>
|
||||
<span className="font-semibold text-sm">
|
||||
{success ? "Completed" : "Failed"}
|
||||
</span>
|
||||
<span
|
||||
className="text-xs px-2 py-0.5 rounded font-mono"
|
||||
style={{
|
||||
background: success ? "var(--color-success)" : "var(--color-error)",
|
||||
color: "#fff",
|
||||
}}
|
||||
>
|
||||
exit {record.returnCode}
|
||||
</span>
|
||||
{formatTime(record.timestamp) !== null && (
|
||||
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
|
||||
{formatTime(record.timestamp)}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
<Markdown content={record.content} />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
export function RecordCard({ record }: { record: ThreadRecord }) {
|
||||
switch (record.type) {
|
||||
case "thread-start":
|
||||
return <StartCard record={record} />;
|
||||
case "role":
|
||||
return <RoleMessage record={record} />;
|
||||
case "workflow-result":
|
||||
return <ResultCard record={record} />;
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,13 @@ import { listWorkflows, runThread } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
|
||||
type Props = {
|
||||
agent: string;
|
||||
onClose: () => void;
|
||||
onCreated: (threadId: string) => void;
|
||||
};
|
||||
|
||||
export function RunDialog({ onClose, onCreated }: Props) {
|
||||
const workflows = useFetch(() => listWorkflows(), []);
|
||||
export function RunDialog({ agent, onClose, onCreated }: Props) {
|
||||
const workflows = useFetch(() => listWorkflows(agent), [agent]);
|
||||
const [workflow, setWorkflow] = useState("");
|
||||
const [prompt, setPrompt] = useState("");
|
||||
const [maxRounds, setMaxRounds] = useState(10);
|
||||
@@ -21,7 +22,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
|
||||
setSubmitting(true);
|
||||
setError(null);
|
||||
try {
|
||||
const result = await runThread(workflow, prompt, maxRounds);
|
||||
const result = await runThread(agent, workflow, prompt, maxRounds);
|
||||
onCreated(result.threadId);
|
||||
} catch (err) {
|
||||
setError(err instanceof Error ? err.message : String(err));
|
||||
@@ -38,7 +39,7 @@ export function RunDialog({ onClose, onCreated }: Props) {
|
||||
className="w-full max-w-lg p-6 rounded-lg border"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
<h3 className="text-lg font-semibold mb-4">Run Thread</h3>
|
||||
<h3 className="text-lg font-semibold mb-4">Run Thread on {agent}</h3>
|
||||
<form onSubmit={handleSubmit} className="space-y-4">
|
||||
<div>
|
||||
<label
|
||||
|
||||
@@ -1,10 +1,22 @@
|
||||
import { useState } from "react";
|
||||
import type { AgentEndpoint } from "../api.ts";
|
||||
import { listAgents } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
|
||||
type Props = {
|
||||
view: "threads" | "workflows";
|
||||
agent: string | null;
|
||||
onViewChange: (v: "threads" | "workflows") => void;
|
||||
onAgentChange: (a: string | null) => void;
|
||||
onLogout: () => void;
|
||||
};
|
||||
|
||||
export function Sidebar({ view, onViewChange }: Props) {
|
||||
const items = [
|
||||
export function Sidebar({ view, agent, onViewChange, onAgentChange, onLogout }: Props) {
|
||||
const { status, data } = useFetch(() => listAgents(), []);
|
||||
const [expanded, setExpanded] = useState(true);
|
||||
|
||||
const agents: AgentEndpoint[] = status === "ok" ? data : [];
|
||||
const viewItems = [
|
||||
{ key: "threads" as const, label: "Threads", icon: "⚡" },
|
||||
{ key: "workflows" as const, label: "Workflows", icon: "📦" },
|
||||
];
|
||||
@@ -22,8 +34,56 @@ export function Sidebar({ view, onViewChange }: Props) {
|
||||
Dashboard
|
||||
</p>
|
||||
</div>
|
||||
|
||||
{/* Agent selector */}
|
||||
<div className="border-b" style={{ borderColor: "var(--color-border)" }}>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => setExpanded(!expanded)}
|
||||
className="w-full text-left px-4 py-2 text-xs font-medium"
|
||||
style={{ color: "var(--color-text-muted)" }}
|
||||
>
|
||||
{expanded ? "▾" : "▸"} Agents
|
||||
{agent && (
|
||||
<span className="ml-2 text-xs" style={{ color: "var(--color-accent)" }}>
|
||||
({agent})
|
||||
</span>
|
||||
)}
|
||||
</button>
|
||||
{expanded && (
|
||||
<div className="px-2 pb-2 space-y-0.5">
|
||||
{agents.length === 0 && (
|
||||
<p className="text-xs px-2 py-1" style={{ color: "var(--color-text-muted)" }}>
|
||||
{status === "loading" ? "Loading..." : "No agents online"}
|
||||
</p>
|
||||
)}
|
||||
{agents.map((a) => (
|
||||
<button
|
||||
type="button"
|
||||
key={a.name}
|
||||
onClick={() => onAgentChange(a.name)}
|
||||
className="w-full text-left px-3 py-1.5 rounded text-xs transition-colors flex items-center gap-2"
|
||||
style={{
|
||||
background: agent === a.name ? "var(--color-accent-dim)" : "transparent",
|
||||
color: agent === a.name ? "#fff" : "var(--color-text-muted)",
|
||||
}}
|
||||
>
|
||||
<span
|
||||
className="inline-block w-1.5 h-1.5 rounded-full"
|
||||
style={{
|
||||
background: a.status === "online" ? "var(--color-success)" : "var(--color-error)",
|
||||
}}
|
||||
/>
|
||||
{a.name}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
{/* View navigation */}
|
||||
<nav className="flex-1 p-2 space-y-1">
|
||||
{items.map((item) => (
|
||||
{viewItems.map((item) => (
|
||||
<button
|
||||
type="button"
|
||||
key={item.key}
|
||||
@@ -38,6 +98,17 @@ export function Sidebar({ view, onViewChange }: Props) {
|
||||
</button>
|
||||
))}
|
||||
</nav>
|
||||
|
||||
<div className="p-2 border-t" style={{ borderColor: "var(--color-border)" }}>
|
||||
<button
|
||||
type="button"
|
||||
onClick={onLogout}
|
||||
className="w-full text-left px-3 py-2 rounded text-xs transition-colors"
|
||||
style={{ color: "var(--color-text-muted)" }}
|
||||
>
|
||||
🚪 Logout
|
||||
</button>
|
||||
</div>
|
||||
</aside>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import { getHealth } from "../api.ts";
|
||||
import { getAgentHealth } from "../api.ts";
|
||||
|
||||
type HealthStatus = "connected" | "disconnected" | "reconnecting";
|
||||
|
||||
type Props = {
|
||||
agent: string | null;
|
||||
onRun: () => void;
|
||||
};
|
||||
|
||||
@@ -17,13 +18,17 @@ function statusLabel(status: HealthStatus): { text: string; color: string } {
|
||||
return { text: "● Offline", color: "var(--color-error)" };
|
||||
}
|
||||
|
||||
export function StatusBar({ onRun }: Props) {
|
||||
export function StatusBar({ agent, onRun }: Props) {
|
||||
const [status, setStatus] = useState<HealthStatus>("disconnected");
|
||||
const wasConnectedRef = useRef(false);
|
||||
|
||||
const checkHealth = useCallback(async () => {
|
||||
if (!agent) {
|
||||
setStatus("disconnected");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await getHealth();
|
||||
await getAgentHealth(agent);
|
||||
wasConnectedRef.current = true;
|
||||
setStatus("connected");
|
||||
} catch {
|
||||
@@ -33,9 +38,11 @@ export function StatusBar({ onRun }: Props) {
|
||||
setStatus("disconnected");
|
||||
}
|
||||
}
|
||||
}, []);
|
||||
}, [agent]);
|
||||
|
||||
useEffect(() => {
|
||||
wasConnectedRef.current = false;
|
||||
setStatus("disconnected");
|
||||
checkHealth();
|
||||
const interval = setInterval(checkHealth, 10_000);
|
||||
return () => clearInterval(interval);
|
||||
@@ -49,12 +56,19 @@ export function StatusBar({ onRun }: Props) {
|
||||
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
|
||||
>
|
||||
<div className="flex items-center gap-4">
|
||||
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
|
||||
<span style={{ color: "var(--color-text-muted)" }}>
|
||||
{agent ? `Agent: ${agent}` : "No agent selected"}
|
||||
</span>
|
||||
<button
|
||||
type="button"
|
||||
onClick={onRun}
|
||||
disabled={!agent}
|
||||
className="px-3 py-1 rounded text-xs font-medium"
|
||||
style={{ background: "var(--color-accent)", color: "#fff" }}
|
||||
style={{
|
||||
background: agent ? "var(--color-accent)" : "var(--color-border)",
|
||||
color: "#fff",
|
||||
opacity: agent ? 1 : 0.5,
|
||||
}}
|
||||
>
|
||||
▶ Run Thread
|
||||
</button>
|
||||
|
||||
@@ -2,15 +2,17 @@ import { useEffect, useRef, useState } from "react";
|
||||
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
import { useSSE } from "../use-sse.ts";
|
||||
import { RecordCard } from "./record-card.tsx";
|
||||
|
||||
type Props = {
|
||||
agent: string;
|
||||
threadId: string;
|
||||
onBack: () => void;
|
||||
};
|
||||
|
||||
export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
const sse = useSSE(threadId);
|
||||
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
|
||||
export function ThreadDetail({ agent, threadId, onBack }: Props) {
|
||||
const sse = useSSE(agent, threadId);
|
||||
const { status, data, error } = useFetch(() => getThread(agent, threadId), [agent, threadId]);
|
||||
const [actionStatus, setActionStatus] = useState<string | null>(null);
|
||||
const recordsEndRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
@@ -30,7 +32,7 @@ export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
setActionStatus(`${action}ing...`);
|
||||
try {
|
||||
const fn = action === "kill" ? killThread : action === "pause" ? pauseThread : resumeThread;
|
||||
await fn(threadId);
|
||||
await fn(agent, threadId);
|
||||
setActionStatus(`${action} sent ✓`);
|
||||
} catch (e) {
|
||||
setActionStatus(`${action} failed: ${e instanceof Error ? e.message : String(e)}`);
|
||||
@@ -101,39 +103,8 @@ export function ThreadDetail({ threadId, onBack }: Props) {
|
||||
)}
|
||||
{(status === "ok" || liveActive || records.length > 0) && (
|
||||
<div className="space-y-3">
|
||||
{records.map((r) => (
|
||||
<div
|
||||
key={`${threadId}-${r.type}-${String(r.timestamp)}-${r.role ?? ""}-${r.content ?? ""}`}
|
||||
className="p-3 rounded border text-sm"
|
||||
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
|
||||
>
|
||||
<div className="flex items-center gap-2 mb-1">
|
||||
<span
|
||||
className="text-xs px-1.5 py-0.5 rounded font-mono"
|
||||
style={{ background: "var(--color-border)", color: "var(--color-accent)" }}
|
||||
>
|
||||
{r.type}
|
||||
</span>
|
||||
{r.role && (
|
||||
<span className="text-xs" style={{ color: "var(--color-text-muted)" }}>
|
||||
{r.role}
|
||||
</span>
|
||||
)}
|
||||
{r.timestamp !== null && (
|
||||
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
|
||||
{new Date(r.timestamp).toLocaleTimeString()}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
{r.content && (
|
||||
<pre
|
||||
className="whitespace-pre-wrap text-xs mt-1"
|
||||
style={{ color: "var(--color-text)" }}
|
||||
>
|
||||
{typeof r.content === "string" ? r.content : JSON.stringify(r.content, null, 2)}
|
||||
</pre>
|
||||
)}
|
||||
</div>
|
||||
{records.map((r, i) => (
|
||||
<RecordCard key={`${threadId}-${i}`} record={r} />
|
||||
))}
|
||||
<div ref={recordsEndRef} aria-hidden />
|
||||
</div>
|
||||
|
||||
@@ -2,11 +2,12 @@ import { listThreads } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
|
||||
type Props = {
|
||||
agent: string;
|
||||
onSelect: (id: string) => void;
|
||||
};
|
||||
|
||||
export function ThreadList({ onSelect }: Props) {
|
||||
const { status, data, error } = useFetch(() => listThreads(), []);
|
||||
export function ThreadList({ agent, onSelect }: Props) {
|
||||
const { status, data, error } = useFetch(() => listThreads(agent), [agent]);
|
||||
|
||||
if (status === "loading")
|
||||
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import { listWorkflows } from "../api.ts";
|
||||
import { useFetch } from "../hooks.ts";
|
||||
|
||||
export function WorkflowList() {
|
||||
const { status, data, error } = useFetch(() => listWorkflows(), []);
|
||||
type Props = {
|
||||
agent: string;
|
||||
};
|
||||
|
||||
export function WorkflowList({ agent }: Props) {
|
||||
const { status, data, error } = useFetch(() => listWorkflows(agent), [agent]);
|
||||
|
||||
if (status === "loading")
|
||||
return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
|
||||
|
||||
@@ -4,37 +4,50 @@ type View = "threads" | "workflows";
|
||||
|
||||
type HashRoute = {
|
||||
view: View;
|
||||
agent: string | null;
|
||||
threadId: string | null;
|
||||
};
|
||||
|
||||
function parseHash(hash: string): HashRoute {
|
||||
const raw = hash.replace(/^#\/?/, "");
|
||||
if (raw.startsWith("threads/")) {
|
||||
const id = raw.slice("threads/".length);
|
||||
if (id.length > 0) {
|
||||
return { view: "threads", threadId: id };
|
||||
}
|
||||
// Format: #agent/threads/id or #agent/workflows or #threads or #workflows
|
||||
const parts = raw.split("/");
|
||||
|
||||
// Check if first part is a known view
|
||||
if (parts[0] === "threads" || parts[0] === "workflows") {
|
||||
return {
|
||||
view: parts[0] as View,
|
||||
agent: null,
|
||||
threadId: parts[0] === "threads" && parts.length > 1 ? parts.slice(1).join("/") : null,
|
||||
};
|
||||
}
|
||||
if (raw === "workflows") {
|
||||
return { view: "workflows", threadId: null };
|
||||
}
|
||||
return { view: "threads", threadId: null };
|
||||
|
||||
// First part is agent name
|
||||
const agent = parts[0] || null;
|
||||
const viewPart = parts[1] ?? "threads";
|
||||
const view: View = viewPart === "workflows" ? "workflows" : "threads";
|
||||
const threadId = view === "threads" && parts.length > 2 ? parts.slice(2).join("/") : null;
|
||||
|
||||
return { view, agent, threadId };
|
||||
}
|
||||
|
||||
function buildHash(route: HashRoute): string {
|
||||
const prefix = route.agent ? `${route.agent}/` : "";
|
||||
if (route.view === "workflows") {
|
||||
return "#workflows";
|
||||
return `#${prefix}workflows`;
|
||||
}
|
||||
if (route.threadId !== null) {
|
||||
return `#threads/${route.threadId}`;
|
||||
return `#${prefix}threads/${route.threadId}`;
|
||||
}
|
||||
return "#threads";
|
||||
return `#${prefix}threads`;
|
||||
}
|
||||
|
||||
export function useHashRoute(): {
|
||||
view: View;
|
||||
agent: string | null;
|
||||
threadId: string | null;
|
||||
setView: (v: View) => void;
|
||||
setAgent: (a: string | null) => void;
|
||||
setThreadId: (id: string | null) => void;
|
||||
} {
|
||||
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
|
||||
@@ -53,12 +66,20 @@ export function useHashRoute(): {
|
||||
setRoute(next);
|
||||
}, []);
|
||||
|
||||
const setView = useCallback((v: View) => navigate({ view: v, threadId: null }), [navigate]);
|
||||
|
||||
const setThreadId = useCallback(
|
||||
(id: string | null) => navigate({ view: "threads", threadId: id }),
|
||||
[navigate],
|
||||
const setView = useCallback(
|
||||
(v: View) => navigate({ view: v, agent: route.agent, threadId: null }),
|
||||
[navigate, route.agent],
|
||||
);
|
||||
|
||||
return { view: route.view, threadId: route.threadId, setView, setThreadId };
|
||||
const setAgent = useCallback(
|
||||
(a: string | null) => navigate({ view: route.view, agent: a, threadId: null }),
|
||||
[navigate, route.view],
|
||||
);
|
||||
|
||||
const setThreadId = useCallback(
|
||||
(id: string | null) => navigate({ view: "threads", agent: route.agent, threadId: id }),
|
||||
[navigate, route.agent],
|
||||
);
|
||||
|
||||
return { view: route.view, agent: route.agent, threadId: route.threadId, setView, setAgent, setThreadId };
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
} from "react";
|
||||
|
||||
import type { ThreadRecord } from "./api.ts";
|
||||
import { getApiKey } from "./api.ts";
|
||||
|
||||
export type UseSSEReturn = {
|
||||
records: ThreadRecord[];
|
||||
@@ -56,7 +57,17 @@ function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
|
||||
ctx.cleanupEs();
|
||||
}
|
||||
|
||||
export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
function sseUrl(agent: string, threadId: string): string {
|
||||
const gatewayUrl = import.meta.env.VITE_GATEWAY_URL || "";
|
||||
const key = getApiKey();
|
||||
const keyParam = key ? `?key=${encodeURIComponent(key)}` : "";
|
||||
if (gatewayUrl) {
|
||||
return `${gatewayUrl}/api/${agent}/threads/${encodeURIComponent(threadId)}/live${keyParam}`;
|
||||
}
|
||||
return `/api/threads/${encodeURIComponent(threadId)}/live`;
|
||||
}
|
||||
|
||||
export function useSSE(agent: string | null, threadId: string | null): UseSSEReturn {
|
||||
const [records, setRecords] = useState<ThreadRecord[]>([]);
|
||||
const [connected, setConnected] = useState(false);
|
||||
const [completed, setCompleted] = useState(false);
|
||||
@@ -65,7 +76,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
const reconnectAttemptsRef = useRef(0);
|
||||
|
||||
useEffect(() => {
|
||||
if (threadId === null) {
|
||||
if (threadId === null || agent === null) {
|
||||
completedRef.current = false;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
setRecords([]);
|
||||
@@ -75,6 +86,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
}
|
||||
|
||||
const tid = threadId;
|
||||
const agentName = agent;
|
||||
|
||||
completedRef.current = false;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
@@ -113,7 +125,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
}
|
||||
|
||||
cleanupEs();
|
||||
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
|
||||
const url = sseUrl(agentName, tid);
|
||||
es = new EventSource(url);
|
||||
|
||||
es.onopen = () => {
|
||||
@@ -155,7 +167,7 @@ export function useSSE(threadId: string | null): UseSSEReturn {
|
||||
}
|
||||
cleanupEs();
|
||||
};
|
||||
}, [threadId]);
|
||||
}, [agent, threadId]);
|
||||
|
||||
return { records, connected, completed };
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOpt
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: null,
|
||||
prefilledDiskSteps: null,
|
||||
forkContinuation: null,
|
||||
replayTimestamps: null,
|
||||
storageRoot: "/tmp/never",
|
||||
...overrides,
|
||||
};
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import {
|
||||
createCasStore,
|
||||
putContentNodeWithRefs,
|
||||
putStartNode,
|
||||
putStateNode,
|
||||
} from "@uncaged/workflow-cas";
|
||||
import type { StateNodePayload } from "@uncaged/workflow-protocol";
|
||||
|
||||
import { FORK_BRANCH_ROLE } from "../src/engine/fork-thread.js";
|
||||
import { garbageCollectCas } from "../src/engine/gc.js";
|
||||
import { getBundleDir, removeThreadEntry, upsertThreadEntry } from "../src/engine/threads-index.js";
|
||||
|
||||
describe("garbageCollectCas (mark-and-sweep)", () => {
|
||||
let storageRoot: string;
|
||||
let casDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-ms-"));
|
||||
casDir = join(storageRoot, "cas");
|
||||
await mkdir(casDir, { recursive: true });
|
||||
await writeFile(
|
||||
join(storageRoot, "workflow.yaml"),
|
||||
"config:\n maxDepth: 1\n supervisorInterval: 0\n providers: {}\n models: {}\nworkflows: {}\n",
|
||||
"utf8",
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(storageRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
test("shared CAS prefix survives when one fork thread index entry is removed", async () => {
|
||||
const bundleHash = "TESTGC0000001";
|
||||
const bundleDir = getBundleDir(storageRoot, bundleHash);
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(casDir);
|
||||
const promptHash = await cas.put("prompt");
|
||||
const startHash = await putStartNode(
|
||||
cas,
|
||||
{
|
||||
name: "demo",
|
||||
hash: bundleHash,
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
|
||||
const c1 = await putContentNodeWithRefs(cas, "p1", []);
|
||||
const h1 = await putStateNode(cas, {
|
||||
role: "planner",
|
||||
meta: {},
|
||||
start: startHash,
|
||||
content: c1,
|
||||
ancestors: [],
|
||||
compact: null,
|
||||
timestamp: 1,
|
||||
} satisfies StateNodePayload);
|
||||
|
||||
const c2 = await putContentNodeWithRefs(cas, "c1", []);
|
||||
const h2 = await putStateNode(cas, {
|
||||
role: "coder",
|
||||
meta: {},
|
||||
start: startHash,
|
||||
content: c2,
|
||||
ancestors: [h1],
|
||||
compact: null,
|
||||
timestamp: 2,
|
||||
} satisfies StateNodePayload);
|
||||
|
||||
const ec = await putContentNodeWithRefs(cas, "", []);
|
||||
const fm = await putStateNode(cas, {
|
||||
role: FORK_BRANCH_ROLE,
|
||||
meta: {},
|
||||
start: startHash,
|
||||
content: ec,
|
||||
ancestors: [h1],
|
||||
compact: null,
|
||||
timestamp: 3,
|
||||
} satisfies StateNodePayload);
|
||||
|
||||
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
|
||||
head: h2,
|
||||
start: startHash,
|
||||
updatedAt: 10,
|
||||
});
|
||||
await upsertThreadEntry(bundleDir, "THREAD_BBBBBBB", {
|
||||
head: fm,
|
||||
start: startHash,
|
||||
updatedAt: 20,
|
||||
});
|
||||
|
||||
await removeThreadEntry(bundleDir, "THREAD_AAAAAAA");
|
||||
|
||||
const gc = await garbageCollectCas(storageRoot);
|
||||
expect(gc.ok).toBe(true);
|
||||
if (!gc.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
expect(await cas.get(h2)).toBeNull();
|
||||
expect(await cas.get(h1)).not.toBeNull();
|
||||
expect(await cas.get(startHash)).not.toBeNull();
|
||||
expect(await cas.get(promptHash)).not.toBeNull();
|
||||
expect(await cas.get(fm)).not.toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -33,20 +33,12 @@ import {
|
||||
removeThreadEntry,
|
||||
upsertThreadEntry,
|
||||
} from "./threads-index.js";
|
||||
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
|
||||
import type { ChainState, ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
|
||||
import { EMPTY_CHAIN_STATE } from "./types.js";
|
||||
|
||||
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
|
||||
const ANCESTORS_CAP = 11;
|
||||
|
||||
type ChainState = {
|
||||
/** State hash of the most recently written {@link StateNode}, or `null` before the first step. */
|
||||
parentStateHash: string | null;
|
||||
/** Ancestors recorded on the most recently written {@link StateNode}. */
|
||||
parentAncestors: readonly string[];
|
||||
};
|
||||
|
||||
const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] };
|
||||
|
||||
function computeAncestors(chain: ChainState): string[] {
|
||||
if (chain.parentStateHash === null) {
|
||||
return [];
|
||||
@@ -408,36 +400,56 @@ export async function executeThread(
|
||||
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
|
||||
|
||||
const prefilled = options.prefilledDiskSteps;
|
||||
const fork = options.forkContinuation;
|
||||
|
||||
if (fork !== null && prefilled !== null) {
|
||||
throw new Error("forkContinuation and prefilledDiskSteps cannot both be set");
|
||||
}
|
||||
|
||||
if (prefilled !== null && prefilled.length !== input.steps.length) {
|
||||
throw new Error(
|
||||
`prefilledDiskSteps length (${prefilled.length}) must match input.steps length (${input.steps.length})`,
|
||||
);
|
||||
}
|
||||
|
||||
const replayTs = options.replayTimestamps;
|
||||
if (replayTs !== null && replayTs.length !== input.steps.length) {
|
||||
throw new Error(
|
||||
`replayTimestamps length (${replayTs.length}) must match input.steps length (${input.steps.length})`,
|
||||
);
|
||||
}
|
||||
|
||||
const bundleDir = getBundleDir(options.storageRoot, io.hash);
|
||||
|
||||
const promptHash = await io.cas.put(input.prompt);
|
||||
const startHash = await putStartNode(
|
||||
io.cas,
|
||||
{
|
||||
name: workflowName,
|
||||
hash: io.hash,
|
||||
maxRounds: options.maxRounds,
|
||||
depth: options.depth,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
let startHash: string;
|
||||
|
||||
await publishHead({
|
||||
bundleDir,
|
||||
threadId: io.threadId,
|
||||
startHash,
|
||||
headHash: startHash,
|
||||
});
|
||||
if (fork !== null) {
|
||||
startHash = fork.startHash;
|
||||
logger("T9HQ2KHM", `thread ${io.threadId} continued fork for workflow ${workflowName}`);
|
||||
} else {
|
||||
const promptHash = await io.cas.put(input.prompt);
|
||||
startHash = await putStartNode(
|
||||
io.cas,
|
||||
{
|
||||
name: workflowName,
|
||||
hash: io.hash,
|
||||
maxRounds: options.maxRounds,
|
||||
depth: options.depth,
|
||||
},
|
||||
promptHash,
|
||||
);
|
||||
|
||||
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
|
||||
await publishHead({
|
||||
bundleDir,
|
||||
threadId: io.threadId,
|
||||
startHash,
|
||||
headHash: startHash,
|
||||
});
|
||||
|
||||
let chain: ChainState = EMPTY_CHAIN;
|
||||
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
|
||||
}
|
||||
|
||||
let chain: ChainState = fork !== null ? fork.initialChain : EMPTY_CHAIN_STATE;
|
||||
|
||||
if (prefilled !== null) {
|
||||
for (const row of prefilled) {
|
||||
@@ -497,7 +509,7 @@ export async function executeThread(
|
||||
contentHash: out.contentHash,
|
||||
meta: out.meta,
|
||||
refs: out.refs,
|
||||
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
|
||||
timestamp: replayTs?.[i] ?? prefilled?.[i]?.timestamp ?? nowMs + i,
|
||||
})),
|
||||
};
|
||||
|
||||
|
||||
@@ -1,9 +1,29 @@
|
||||
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
|
||||
import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
|
||||
import type { CasStore } from "@uncaged/workflow-cas";
|
||||
import { parseCasThreadNode, putContentNodeWithRefs, putStateNode } from "@uncaged/workflow-cas";
|
||||
import type { StateNodePayload } from "@uncaged/workflow-protocol";
|
||||
import type { RoleOutput, WorkflowCompletion } from "@uncaged/workflow-runtime";
|
||||
import { END } from "@uncaged/workflow-runtime";
|
||||
import { err, ok, type Result } from "@uncaged/workflow-util";
|
||||
import { parse as parseYaml } from "yaml";
|
||||
|
||||
import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js";
|
||||
import { upsertThreadEntry } from "./threads-index.js";
|
||||
import type { CasForkPlan, ChainState, ForkContinuationOptions } from "./types.js";
|
||||
import { EMPTY_CHAIN_STATE } from "./types.js";
|
||||
|
||||
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */
|
||||
/** Internal branch marker; skipped when presenting fork selection / replay slices. */
|
||||
export const FORK_BRANCH_ROLE = "__fork__";
|
||||
|
||||
/** Cap for {@link StateNodePayload}.ancestors: 1 parent + 10 skip-list. */
|
||||
const ANCESTORS_CAP = 11;
|
||||
|
||||
function computeAncestors(chain: ChainState): string[] {
|
||||
if (chain.parentStateHash === null) {
|
||||
return [];
|
||||
}
|
||||
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
|
||||
}
|
||||
|
||||
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). */
|
||||
export function tryParseWorkflowResultRecord(
|
||||
obj: Record<string, unknown>,
|
||||
): WorkflowCompletion | null {
|
||||
@@ -18,227 +38,288 @@ export function tryParseWorkflowResultRecord(
|
||||
return { returnCode, summary };
|
||||
}
|
||||
|
||||
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
|
||||
const role = obj.role;
|
||||
const contentHash = obj.contentHash;
|
||||
const meta = obj.meta;
|
||||
const timestamp = obj.timestamp;
|
||||
if (typeof role !== "string") {
|
||||
return null;
|
||||
}
|
||||
if (typeof contentHash !== "string") {
|
||||
return null;
|
||||
}
|
||||
if (meta === null || typeof meta !== "object") {
|
||||
return null;
|
||||
}
|
||||
if (typeof timestamp !== "number") {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
role,
|
||||
contentHash,
|
||||
meta: meta as Record<string, unknown>,
|
||||
refs: normalizeRefsField(obj.refs),
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
function parseRoleLine(
|
||||
obj: Record<string, unknown>,
|
||||
lineIndex: number,
|
||||
): Result<ForkHistoricalStep, string> {
|
||||
const parsed = tryParseRoleStepRecord(obj);
|
||||
if (parsed === null) {
|
||||
return err(`invalid role record at line ${lineIndex}`);
|
||||
}
|
||||
return ok(parsed);
|
||||
}
|
||||
|
||||
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
|
||||
let startParsed: unknown;
|
||||
try {
|
||||
startParsed = JSON.parse(firstLine) as unknown;
|
||||
} catch {
|
||||
return err("invalid JSON on line 1 (start record)");
|
||||
}
|
||||
if (startParsed === null || typeof startParsed !== "object") {
|
||||
return err("invalid start record shape");
|
||||
}
|
||||
const startRec = startParsed as Record<string, unknown>;
|
||||
const name = startRec.name;
|
||||
const hash = startRec.hash;
|
||||
const threadId = startRec.threadId;
|
||||
const parameters = startRec.parameters;
|
||||
if (typeof name !== "string" || typeof hash !== "string" || typeof threadId !== "string") {
|
||||
return err("start record missing name, hash, or threadId");
|
||||
}
|
||||
if (parameters === null || typeof parameters !== "object") {
|
||||
return err("start record missing parameters");
|
||||
}
|
||||
const paramsRec = parameters as Record<string, unknown>;
|
||||
const prompt = paramsRec.prompt;
|
||||
const options = paramsRec.options;
|
||||
if (typeof prompt !== "string") {
|
||||
return err("start record missing parameters.prompt");
|
||||
}
|
||||
if (options === null || typeof options !== "object") {
|
||||
return err("start record missing parameters.options");
|
||||
}
|
||||
const optRec = options as Record<string, unknown>;
|
||||
const maxRounds = optRec.maxRounds;
|
||||
if (typeof maxRounds !== "number") {
|
||||
return err("start record missing parameters.options.maxRounds");
|
||||
}
|
||||
|
||||
const depthRaw = optRec.depth;
|
||||
const depth =
|
||||
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
|
||||
|
||||
return ok({
|
||||
workflowName: name,
|
||||
hash,
|
||||
threadId,
|
||||
prompt,
|
||||
maxRounds,
|
||||
depth,
|
||||
});
|
||||
}
|
||||
|
||||
function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[], string> {
|
||||
const roleSteps: ForkHistoricalStep[] = [];
|
||||
for (let i = 1; i < lines.length; i++) {
|
||||
const line = lines[i];
|
||||
if (line === undefined) {
|
||||
/** Walk {@link StateNode} hashes from head toward the first step (newest → oldest). */
|
||||
export async function walkStateFramesNewestFirst(
|
||||
cas: CasStore,
|
||||
headHash: string,
|
||||
): Promise<Array<{ hash: string; payload: StateNodePayload }>> {
|
||||
const frames: Array<{ hash: string; payload: StateNodePayload }> = [];
|
||||
let cur = headHash;
|
||||
while (true) {
|
||||
const yamlText = await cas.get(cur);
|
||||
if (yamlText === null) {
|
||||
break;
|
||||
}
|
||||
let rec: unknown;
|
||||
try {
|
||||
rec = JSON.parse(line) as unknown;
|
||||
} catch {
|
||||
return err(`invalid JSON at line ${i + 1}`);
|
||||
}
|
||||
if (rec === null || typeof rec !== "object") {
|
||||
return err(`invalid record at line ${i + 1}`);
|
||||
}
|
||||
const recObj = rec as Record<string, unknown>;
|
||||
const wf = tryParseWorkflowResultRecord(recObj);
|
||||
if (wf !== null) {
|
||||
if (i !== lines.length - 1) {
|
||||
return err("WorkflowResult record must be the final line in `.data.jsonl`");
|
||||
}
|
||||
const parsed = parseCasThreadNode(yamlText);
|
||||
if (parsed === null || parsed.kind !== "state") {
|
||||
break;
|
||||
}
|
||||
const parsed = parseRoleLine(recObj, i + 1);
|
||||
if (!parsed.ok) {
|
||||
return parsed;
|
||||
frames.push({ hash: cur, payload: parsed.node.payload });
|
||||
const ancestors = parsed.node.payload.ancestors;
|
||||
if (ancestors.length === 0) {
|
||||
break;
|
||||
}
|
||||
roleSteps.push(parsed.value);
|
||||
const parent = ancestors[0];
|
||||
if (parent === undefined || parent === "") {
|
||||
break;
|
||||
}
|
||||
cur = parent;
|
||||
}
|
||||
return ok(roleSteps);
|
||||
return frames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse RFC-001 `.data.jsonl`: line 1 start record, line 2+ role outputs.
|
||||
*/
|
||||
export function parseThreadDataJsonl(text: string): Result<
|
||||
{
|
||||
start: ParsedThreadStartRecord;
|
||||
roleSteps: ForkHistoricalStep[];
|
||||
},
|
||||
string
|
||||
> {
|
||||
const lines = text
|
||||
.split("\n")
|
||||
.map((l) => l.trim())
|
||||
.filter((l) => l !== "");
|
||||
if (lines.length === 0) {
|
||||
return err("thread data is empty");
|
||||
}
|
||||
|
||||
const firstLine = lines[0];
|
||||
if (firstLine === undefined) {
|
||||
return err("thread data is empty");
|
||||
}
|
||||
|
||||
const start = parseStartRecordLine(firstLine);
|
||||
if (!start.ok) {
|
||||
return start;
|
||||
}
|
||||
|
||||
const roleSteps = parseFollowingRoleLines(lines);
|
||||
if (!roleSteps.ok) {
|
||||
return roleSteps;
|
||||
}
|
||||
|
||||
return ok({
|
||||
start: start.value,
|
||||
roleSteps: roleSteps.value,
|
||||
});
|
||||
}
|
||||
|
||||
function orderedUniqueRoles(roleSteps: ForkHistoricalStep[]): string[] {
|
||||
function orderedUniqueRoles(roles: string[]): string[] {
|
||||
const seen = new Set<string>();
|
||||
const out: string[] = [];
|
||||
for (const s of roleSteps) {
|
||||
if (!seen.has(s.role)) {
|
||||
seen.add(s.role);
|
||||
out.push(s.role);
|
||||
for (const r of roles) {
|
||||
if (!seen.has(r)) {
|
||||
seen.add(r);
|
||||
out.push(r);
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select historical steps for a fork:
|
||||
* - `fromRole === null`: drop the last step (retry the last role).
|
||||
* - `fromRole !== null`: keep steps through the first occurrence of that role (inclusive).
|
||||
*/
|
||||
export function selectForkHistoricalSteps(
|
||||
roleSteps: ForkHistoricalStep[],
|
||||
async function readPromptText(cas: CasStore, promptHash: string): Promise<Result<string, string>> {
|
||||
const yamlText = await cas.get(promptHash);
|
||||
if (yamlText === null) {
|
||||
return err(`prompt CAS blob missing: ${promptHash}`);
|
||||
}
|
||||
let raw: unknown;
|
||||
try {
|
||||
raw = parseYaml(yamlText) as unknown;
|
||||
} catch {
|
||||
return err(`prompt CAS blob is not valid YAML: ${promptHash}`);
|
||||
}
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
return err(`prompt CAS blob has unexpected shape: ${promptHash}`);
|
||||
}
|
||||
const payload = (raw as Record<string, unknown>).payload;
|
||||
if (typeof payload !== "string") {
|
||||
return err(`prompt CAS blob missing string payload: ${promptHash}`);
|
||||
}
|
||||
return ok(payload);
|
||||
}
|
||||
|
||||
async function readStartWorkflowIdentity(params: {
|
||||
cas: CasStore;
|
||||
startHash: string;
|
||||
}): Promise<
|
||||
Result<{ workflowName: string; maxRounds: number; depth: number; prompt: string }, string>
|
||||
> {
|
||||
const yamlText = await params.cas.get(params.startHash);
|
||||
if (yamlText === null) {
|
||||
return err(`start node missing in CAS: ${params.startHash}`);
|
||||
}
|
||||
const parsed = parseCasThreadNode(yamlText);
|
||||
if (parsed === null || parsed.kind !== "start") {
|
||||
return err(`CAS blob is not a StartNode: ${params.startHash}`);
|
||||
}
|
||||
const refs = parsed.node.refs;
|
||||
const promptHash = refs[0];
|
||||
if (typeof promptHash !== "string") {
|
||||
return err("StartNode refs[0] must be the prompt hash");
|
||||
}
|
||||
const prompt = await readPromptText(params.cas, promptHash);
|
||||
if (!prompt.ok) {
|
||||
return prompt;
|
||||
}
|
||||
const p = parsed.node.payload;
|
||||
return ok({
|
||||
workflowName: p.name,
|
||||
maxRounds: p.maxRounds,
|
||||
depth: p.depth,
|
||||
prompt: prompt.value,
|
||||
});
|
||||
}
|
||||
|
||||
async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Promise<RoleOutput> {
|
||||
let refs: string[] = [];
|
||||
const blob = await cas.get(payload.content);
|
||||
if (blob !== null) {
|
||||
const cn = parseCasThreadNode(blob);
|
||||
if (cn?.kind === "content") {
|
||||
refs = [...cn.node.refs];
|
||||
}
|
||||
}
|
||||
return {
|
||||
role: payload.role,
|
||||
contentHash: payload.content,
|
||||
meta: payload.meta,
|
||||
refs,
|
||||
};
|
||||
}
|
||||
|
||||
function meaningfulFramesOldestFirst(
|
||||
newestFirst: Array<{ hash: string; payload: StateNodePayload }>,
|
||||
): Array<{ hash: string; payload: StateNodePayload }> {
|
||||
const chronological = [...newestFirst].reverse();
|
||||
return chronological.filter((f) => f.payload.role !== END && f.payload.role !== FORK_BRANCH_ROLE);
|
||||
}
|
||||
|
||||
function selectForkPointStateHash(
|
||||
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
|
||||
fromRole: string | null,
|
||||
): Result<ForkHistoricalStep[], string> {
|
||||
if (roleSteps.length === 0) {
|
||||
): Result<string | null, string> {
|
||||
if (meaningfulOldestFirst.length === 0) {
|
||||
return err("thread has no completed role steps to fork from");
|
||||
}
|
||||
|
||||
if (fromRole === null) {
|
||||
if (roleSteps.length === 1) {
|
||||
return ok([]);
|
||||
if (meaningfulOldestFirst.length === 1) {
|
||||
return ok(null);
|
||||
}
|
||||
return ok(roleSteps.slice(0, -1));
|
||||
const forkFrame = meaningfulOldestFirst[meaningfulOldestFirst.length - 2];
|
||||
if (forkFrame === undefined) {
|
||||
return err("thread has no completed role steps to fork from");
|
||||
}
|
||||
return ok(forkFrame.hash);
|
||||
}
|
||||
|
||||
const idx = roleSteps.findIndex((s) => s.role === fromRole);
|
||||
const idx = meaningfulOldestFirst.findIndex((f) => f.payload.role === fromRole);
|
||||
if (idx < 0) {
|
||||
const available = orderedUniqueRoles(roleSteps);
|
||||
const available = orderedUniqueRoles(meaningfulOldestFirst.map((f) => f.payload.role));
|
||||
return err(`role not found in thread: ${fromRole} (available: ${available.join(", ")})`);
|
||||
}
|
||||
return ok(roleSteps.slice(0, idx + 1));
|
||||
const forkFrame = meaningfulOldestFirst[idx];
|
||||
if (forkFrame === undefined) {
|
||||
return err("fork frame missing");
|
||||
}
|
||||
return ok(forkFrame.hash);
|
||||
}
|
||||
|
||||
function replayFramesThroughForkPoint(
|
||||
meaningfulOldestFirst: Array<{ hash: string; payload: StateNodePayload }>,
|
||||
forkPointHash: string | null,
|
||||
): Array<{ hash: string; payload: StateNodePayload }> {
|
||||
if (forkPointHash === null) {
|
||||
return [];
|
||||
}
|
||||
const idx = meaningfulOldestFirst.findIndex((f) => f.hash === forkPointHash);
|
||||
if (idx < 0) {
|
||||
return [];
|
||||
}
|
||||
return meaningfulOldestFirst.slice(0, idx + 1);
|
||||
}
|
||||
|
||||
async function buildForkContinuation(params: {
|
||||
cas: CasStore;
|
||||
sourceThreadId: string;
|
||||
startHash: string;
|
||||
forkPointStateHash: string | null;
|
||||
}): Promise<Result<ForkContinuationOptions, string>> {
|
||||
const { cas, sourceThreadId, startHash, forkPointStateHash } = params;
|
||||
|
||||
if (forkPointStateHash === null) {
|
||||
return ok({
|
||||
startHash,
|
||||
forkHeadHash: startHash,
|
||||
initialChain: EMPTY_CHAIN_STATE,
|
||||
});
|
||||
}
|
||||
|
||||
const yamlText = await cas.get(forkPointStateHash);
|
||||
if (yamlText === null) {
|
||||
return err(`fork point state missing in CAS: ${forkPointStateHash}`);
|
||||
}
|
||||
const parsed = parseCasThreadNode(yamlText);
|
||||
if (parsed === null || parsed.kind !== "state") {
|
||||
return err(`fork point blob is not a StateNode: ${forkPointStateHash}`);
|
||||
}
|
||||
const fpPayload = parsed.node.payload;
|
||||
|
||||
const chainBefore: ChainState = {
|
||||
parentStateHash: forkPointStateHash,
|
||||
parentAncestors: fpPayload.ancestors,
|
||||
};
|
||||
const ancestorsMarker = computeAncestors(chainBefore);
|
||||
|
||||
const emptyContentHash = await putContentNodeWithRefs(cas, "", []);
|
||||
const markerPayload: StateNodePayload = {
|
||||
role: FORK_BRANCH_ROLE,
|
||||
meta: { forkFrom: sourceThreadId },
|
||||
start: startHash,
|
||||
content: emptyContentHash,
|
||||
ancestors: ancestorsMarker,
|
||||
compact: null,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
const markerHash = await putStateNode(cas, markerPayload);
|
||||
|
||||
const initialChain: ChainState = {
|
||||
parentStateHash: markerHash,
|
||||
parentAncestors: ancestorsMarker,
|
||||
};
|
||||
|
||||
return ok({
|
||||
startHash,
|
||||
forkHeadHash: markerHash,
|
||||
initialChain,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Read `.data.jsonl` text and compute fork payload for the worker `run` command.
|
||||
* Prepare a CAS fork: writes the branch marker {@link StateNode}, registers `threads.json`,
|
||||
* and returns worker payload fields (shared {@link StartNode}, zero ancestor duplication).
|
||||
*/
|
||||
export function buildForkPlan(
|
||||
dataJsonlText: string,
|
||||
fromRole: string | null,
|
||||
): Result<ForkPlan, string> {
|
||||
const parsed = parseThreadDataJsonl(dataJsonlText);
|
||||
if (!parsed.ok) {
|
||||
return parsed;
|
||||
export async function prepareCasFork(params: {
|
||||
cas: CasStore;
|
||||
bundleDir: string;
|
||||
bundleHash: string;
|
||||
sourceThreadId: string;
|
||||
headHash: string;
|
||||
startHash: string;
|
||||
newThreadId: string;
|
||||
fromRole: string | null;
|
||||
}): Promise<Result<CasForkPlan, string>> {
|
||||
const id = await readStartWorkflowIdentity({
|
||||
cas: params.cas,
|
||||
startHash: params.startHash,
|
||||
});
|
||||
if (!id.ok) {
|
||||
return id;
|
||||
}
|
||||
const selected = selectForkHistoricalSteps(parsed.value.roleSteps, fromRole);
|
||||
if (!selected.ok) {
|
||||
return selected;
|
||||
|
||||
const newestFirst = await walkStateFramesNewestFirst(params.cas, params.headHash);
|
||||
const meaningful = meaningfulFramesOldestFirst(newestFirst);
|
||||
|
||||
const forkPoint = selectForkPointStateHash(meaningful, params.fromRole);
|
||||
if (!forkPoint.ok) {
|
||||
return forkPoint;
|
||||
}
|
||||
const { start } = parsed.value;
|
||||
|
||||
const replayFrames = replayFramesThroughForkPoint(meaningful, forkPoint.value);
|
||||
const steps: RoleOutput[] = [];
|
||||
const stepTimestamps: number[] = [];
|
||||
for (const fr of replayFrames) {
|
||||
steps.push(await payloadToRoleOutput(params.cas, fr.payload));
|
||||
stepTimestamps.push(fr.payload.timestamp);
|
||||
}
|
||||
|
||||
const cont = await buildForkContinuation({
|
||||
cas: params.cas,
|
||||
sourceThreadId: params.sourceThreadId,
|
||||
startHash: params.startHash,
|
||||
forkPointStateHash: forkPoint.value,
|
||||
});
|
||||
if (!cont.ok) {
|
||||
return cont;
|
||||
}
|
||||
|
||||
await upsertThreadEntry(params.bundleDir, params.newThreadId, {
|
||||
head: cont.value.forkHeadHash,
|
||||
start: params.startHash,
|
||||
updatedAt: Date.now(),
|
||||
});
|
||||
|
||||
return ok({
|
||||
workflowName: start.workflowName,
|
||||
hash: start.hash,
|
||||
sourceThreadId: start.threadId,
|
||||
prompt: start.prompt,
|
||||
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
|
||||
historicalSteps: selected.value,
|
||||
workflowName: id.value.workflowName,
|
||||
hash: params.bundleHash,
|
||||
sourceThreadId: params.sourceThreadId,
|
||||
prompt: id.value.prompt,
|
||||
runOptions: { maxRounds: id.value.maxRounds, depth: id.value.depth },
|
||||
steps,
|
||||
stepTimestamps,
|
||||
forkContinuation: cont.value,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,122 +1,182 @@
|
||||
import { readdir, readFile } from "node:fs/promises";
|
||||
import type { Stats } from "node:fs";
|
||||
import { readdir, readFile, stat } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
import { type CasStore, createCasStore } from "@uncaged/workflow-cas";
|
||||
import { type CasStore, createCasStore, findReachableHashes } from "@uncaged/workflow-cas";
|
||||
import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util";
|
||||
import { parseThreadDataJsonl } from "./fork-thread.js";
|
||||
|
||||
import type { ThreadHistoryEntry, ThreadIndex } from "./threads-index.js";
|
||||
import { readThreadsIndex } from "./threads-index.js";
|
||||
import type { GcResult } from "./types.js";
|
||||
|
||||
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
|
||||
const logsRoot = join(storageRoot, "logs");
|
||||
const paths: string[] = [];
|
||||
let hashes: string[];
|
||||
function isPlainObject(v: unknown): v is Record<string, unknown> {
|
||||
return v !== null && typeof v === "object" && !Array.isArray(v);
|
||||
}
|
||||
|
||||
function parseHistoryLine(jsonLine: string): ThreadHistoryEntry | null {
|
||||
let raw: unknown;
|
||||
try {
|
||||
hashes = await readdir(logsRoot);
|
||||
raw = JSON.parse(jsonLine) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
if (!isPlainObject(raw)) {
|
||||
return null;
|
||||
}
|
||||
const threadId = raw.threadId;
|
||||
const head = raw.head;
|
||||
const start = raw.start;
|
||||
const completedAt = raw.completedAt;
|
||||
if (
|
||||
typeof threadId !== "string" ||
|
||||
typeof head !== "string" ||
|
||||
typeof start !== "string" ||
|
||||
typeof completedAt !== "number"
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return { threadId, head, start, completedAt };
|
||||
}
|
||||
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: walks threads index + optional history dir
|
||||
async function collectGcRootsFromBundle(bundleDir: string): Promise<Result<string[], string>> {
|
||||
const roots: string[] = [];
|
||||
|
||||
let activeIndex: ThreadIndex;
|
||||
try {
|
||||
activeIndex = await readThreadsIndex(bundleDir);
|
||||
} catch (e) {
|
||||
return err(`failed to read threads.json under ${bundleDir}: ${String(e)}`);
|
||||
}
|
||||
|
||||
for (const entry of Object.values(activeIndex)) {
|
||||
roots.push(entry.head);
|
||||
roots.push(entry.start);
|
||||
}
|
||||
|
||||
const histDir = join(bundleDir, "history");
|
||||
let histFiles: string[];
|
||||
try {
|
||||
histFiles = await readdir(histDir);
|
||||
} catch (e) {
|
||||
const errObj = e as NodeJS.ErrnoException;
|
||||
if (errObj.code === "ENOENT") {
|
||||
return ok(roots);
|
||||
}
|
||||
return err(`failed to read history directory ${histDir}: ${String(e)}`);
|
||||
}
|
||||
|
||||
for (const name of histFiles) {
|
||||
if (!name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(join(histDir, name), "utf8");
|
||||
} catch (e) {
|
||||
return err(`failed to read history file ${name}: ${String(e)}`);
|
||||
}
|
||||
for (const line of text.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed === "") {
|
||||
continue;
|
||||
}
|
||||
const entry = parseHistoryLine(trimmed);
|
||||
if (entry === null) {
|
||||
continue;
|
||||
}
|
||||
roots.push(entry.head);
|
||||
roots.push(entry.start);
|
||||
}
|
||||
}
|
||||
|
||||
return ok(roots);
|
||||
}
|
||||
|
||||
async function collectAllGcRoots(storageRoot: string): Promise<Result<string[], string>> {
|
||||
const bundlesRoot = join(storageRoot, "bundles");
|
||||
let entries: string[];
|
||||
try {
|
||||
entries = await readdir(bundlesRoot);
|
||||
} catch (e) {
|
||||
const errObj = e as NodeJS.ErrnoException;
|
||||
if (errObj.code === "ENOENT") {
|
||||
return ok([]);
|
||||
}
|
||||
return err(`failed to read logs directory: ${String(e)}`);
|
||||
return err(`failed to read bundles directory: ${String(e)}`);
|
||||
}
|
||||
|
||||
for (const hash of hashes) {
|
||||
const dir = join(logsRoot, hash);
|
||||
let entries: string[];
|
||||
const roots: string[] = [];
|
||||
for (const name of entries) {
|
||||
const bundleDir = join(bundlesRoot, name);
|
||||
let st: Stats;
|
||||
try {
|
||||
entries = await readdir(dir);
|
||||
st = await stat(bundleDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const fileName of entries) {
|
||||
if (fileName.endsWith(".data.jsonl")) {
|
||||
paths.push(join(dir, fileName));
|
||||
}
|
||||
if (!st.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
const chunk = await collectGcRootsFromBundle(bundleDir);
|
||||
if (!chunk.ok) {
|
||||
return chunk;
|
||||
}
|
||||
roots.push(...chunk.value);
|
||||
}
|
||||
|
||||
paths.sort();
|
||||
return ok(paths);
|
||||
return ok(roots);
|
||||
}
|
||||
|
||||
async function collectActiveRefsFromDataPaths(
|
||||
dataPaths: string[],
|
||||
): Promise<Result<Set<string>, string>> {
|
||||
const activeRefs = new Set<string>();
|
||||
for (const dataPath of dataPaths) {
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(dataPath, "utf8");
|
||||
} catch (e) {
|
||||
return err(`failed to read ${dataPath}: ${String(e)}`);
|
||||
}
|
||||
const parsed = parseThreadDataJsonl(text);
|
||||
if (!parsed.ok) {
|
||||
return err(`${dataPath}: ${parsed.error}`);
|
||||
}
|
||||
for (const step of parsed.value.roleSteps) {
|
||||
for (const ref of step.refs) {
|
||||
activeRefs.add(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ok(activeRefs);
|
||||
}
|
||||
|
||||
async function deleteCasNotInSet(
|
||||
cas: CasStore,
|
||||
activeRefs: Set<string>,
|
||||
): Promise<Result<string[], string>> {
|
||||
async function deleteCasNotMarked(cas: CasStore, marked: ReadonlySet<string>): Promise<string[]> {
|
||||
let listed: string[];
|
||||
try {
|
||||
listed = await cas.list();
|
||||
} catch (e) {
|
||||
return err(`failed to list cas entries: ${String(e)}`);
|
||||
throw new Error(`failed to list cas entries: ${String(e)}`);
|
||||
}
|
||||
|
||||
const deletedHashes: string[] = [];
|
||||
for (const hash of listed) {
|
||||
if (activeRefs.has(hash)) {
|
||||
if (marked.has(hash)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await cas.delete(hash);
|
||||
} catch (e) {
|
||||
return err(`failed to delete cas ${hash}: ${String(e)}`);
|
||||
throw new Error(`failed to delete cas ${hash}: ${String(e)}`);
|
||||
}
|
||||
deletedHashes.push(hash);
|
||||
}
|
||||
|
||||
deletedHashes.sort();
|
||||
return ok(deletedHashes);
|
||||
return deletedHashes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
|
||||
* then delete CAS blobs not referenced by any surviving thread data.
|
||||
* Mark-and-sweep CAS GC: roots are every `head` / `start` hash from `threads.json` and
|
||||
* `history/*.jsonl` across bundle dirs; marks closure via `refs[]`; deletes unreachable blobs.
|
||||
*/
|
||||
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
|
||||
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
|
||||
if (!pathsResult.ok) {
|
||||
return pathsResult;
|
||||
const rootsResult = await collectAllGcRoots(storageRoot);
|
||||
if (!rootsResult.ok) {
|
||||
return rootsResult;
|
||||
}
|
||||
const paths = pathsResult.value;
|
||||
|
||||
const refsResult = await collectActiveRefsFromDataPaths(paths);
|
||||
if (!refsResult.ok) {
|
||||
return refsResult;
|
||||
}
|
||||
const activeRefs = refsResult.value;
|
||||
const roots = rootsResult.value;
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
|
||||
if (!deletedResult.ok) {
|
||||
return deletedResult;
|
||||
|
||||
const marked = await findReachableHashes(roots, cas);
|
||||
|
||||
let deletedHashes: string[];
|
||||
try {
|
||||
deletedHashes = await deleteCasNotMarked(cas, marked);
|
||||
} catch (e) {
|
||||
return err(String(e));
|
||||
}
|
||||
const deletedHashes = deletedResult.value;
|
||||
|
||||
return ok({
|
||||
scannedThreads: paths.length,
|
||||
activeRefs: activeRefs.size,
|
||||
scannedThreads: roots.length,
|
||||
activeRefs: marked.size,
|
||||
deletedEntries: deletedHashes.length,
|
||||
deletedHashes,
|
||||
});
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
export { createWorkflow } from "./create-workflow.js";
|
||||
export { executeThread } from "./engine.js";
|
||||
export {
|
||||
buildForkPlan,
|
||||
parseThreadDataJsonl,
|
||||
selectForkHistoricalSteps,
|
||||
tryParseRoleStepRecord,
|
||||
FORK_BRANCH_ROLE,
|
||||
prepareCasFork,
|
||||
tryParseWorkflowResultRecord,
|
||||
walkStateFramesNewestFirst,
|
||||
} from "./fork-thread.js";
|
||||
export { garbageCollectCas } from "./gc.js";
|
||||
export { createThreadPauseGate } from "./thread-pause-gate.js";
|
||||
@@ -13,18 +12,22 @@ export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./thread
|
||||
export {
|
||||
appendThreadHistoryEntry,
|
||||
getBundleDir,
|
||||
readThreadsIndex,
|
||||
removeThreadEntry,
|
||||
removeThreadHistoryEntries,
|
||||
upsertThreadEntry,
|
||||
writeThreadsIndex,
|
||||
} from "./threads-index.js";
|
||||
export type {
|
||||
CasForkPlan,
|
||||
ChainState,
|
||||
ExecuteThreadIo,
|
||||
ExecuteThreadOptions,
|
||||
ForkHistoricalStep,
|
||||
ForkPlan,
|
||||
ForkContinuationOptions,
|
||||
GcResult,
|
||||
ParsedThreadStartRecord,
|
||||
PrefilledDiskStep,
|
||||
SupervisorDecision,
|
||||
ThreadPauseGate,
|
||||
} from "./types.js";
|
||||
export { EMPTY_CHAIN_STATE } from "./types.js";
|
||||
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises";
|
||||
import { appendFile, mkdir, readdir, readFile, rename, writeFile } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import { err, ok, type Result } from "@uncaged/workflow-util";
|
||||
|
||||
/**
|
||||
* Active-thread index entry stored in `<bundleDir>/threads.json`.
|
||||
*
|
||||
@@ -71,7 +73,8 @@ function parseThreadIndex(text: string): ThreadIndex {
|
||||
return out;
|
||||
}
|
||||
|
||||
async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
|
||||
/** Read `<bundleDir>/threads.json` (empty object when missing or invalid). */
|
||||
export async function readThreadsIndex(bundleDir: string): Promise<ThreadIndex> {
|
||||
const path = threadsJsonPath(bundleDir);
|
||||
let text: string;
|
||||
try {
|
||||
@@ -86,7 +89,7 @@ async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
|
||||
return parseThreadIndex(text);
|
||||
}
|
||||
|
||||
async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
|
||||
export async function writeThreadsIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
|
||||
const path = threadsJsonPath(bundleDir);
|
||||
await mkdir(dirname(path), { recursive: true });
|
||||
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
|
||||
@@ -101,19 +104,19 @@ export async function upsertThreadEntry(
|
||||
threadId: string,
|
||||
entry: ThreadIndexEntry,
|
||||
): Promise<void> {
|
||||
const index = await readThreadIndex(bundleDir);
|
||||
const index = await readThreadsIndex(bundleDir);
|
||||
index[threadId] = entry;
|
||||
await writeThreadIndex(bundleDir, index);
|
||||
await writeThreadsIndex(bundleDir, index);
|
||||
}
|
||||
|
||||
/** Remove a thread entry from `threads.json` (no-op when absent). */
|
||||
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
|
||||
const index = await readThreadIndex(bundleDir);
|
||||
const index = await readThreadsIndex(bundleDir);
|
||||
if (!(threadId in index)) {
|
||||
return;
|
||||
}
|
||||
delete index[threadId];
|
||||
await writeThreadIndex(bundleDir, index);
|
||||
await writeThreadsIndex(bundleDir, index);
|
||||
}
|
||||
|
||||
function dateKey(epochMs: number): string {
|
||||
@@ -134,3 +137,63 @@ export async function appendThreadHistoryEntry(
|
||||
const line = `${JSON.stringify(entry)}\n`;
|
||||
await appendFile(path, line, "utf8");
|
||||
}
|
||||
|
||||
/** Removes every `history/*.jsonl` line whose `threadId` matches (rewrite files in place). */
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: per-file JSONL filtering keeps RM deterministic
|
||||
export async function removeThreadHistoryEntries(
|
||||
bundleDir: string,
|
||||
threadId: string,
|
||||
): Promise<Result<number, string>> {
|
||||
const histRoot = join(bundleDir, "history");
|
||||
let files: string[];
|
||||
try {
|
||||
files = await readdir(histRoot);
|
||||
} catch (e) {
|
||||
const errObj = e as NodeJS.ErrnoException;
|
||||
if (errObj.code === "ENOENT") {
|
||||
return ok(0);
|
||||
}
|
||||
return err(`failed to read history directory: ${String(e)}`);
|
||||
}
|
||||
|
||||
let removed = 0;
|
||||
for (const name of files) {
|
||||
if (!name.endsWith(".jsonl")) {
|
||||
continue;
|
||||
}
|
||||
const path = join(histRoot, name);
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(path, "utf8");
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const kept: string[] = [];
|
||||
for (const line of text.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed === "") {
|
||||
continue;
|
||||
}
|
||||
let rec: unknown;
|
||||
try {
|
||||
rec = JSON.parse(trimmed) as unknown;
|
||||
} catch {
|
||||
kept.push(`${trimmed}\n`);
|
||||
continue;
|
||||
}
|
||||
if (rec === null || typeof rec !== "object") {
|
||||
kept.push(`${trimmed}\n`);
|
||||
continue;
|
||||
}
|
||||
const id = (rec as Record<string, unknown>).threadId;
|
||||
if (id === threadId) {
|
||||
removed++;
|
||||
continue;
|
||||
}
|
||||
kept.push(`${trimmed}\n`);
|
||||
}
|
||||
await writeFile(path, kept.join(""), "utf8");
|
||||
}
|
||||
|
||||
return ok(removed);
|
||||
}
|
||||
|
||||
@@ -11,7 +11,25 @@ export type ExecuteThreadIo = {
|
||||
cas: CasStore;
|
||||
};
|
||||
|
||||
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
|
||||
/** CAS chain tail state before the next appended {@link StateNode}. */
|
||||
export type ChainState = {
|
||||
parentStateHash: string | null;
|
||||
parentAncestors: readonly string[];
|
||||
};
|
||||
|
||||
export const EMPTY_CHAIN_STATE: ChainState = { parentStateHash: null, parentAncestors: [] };
|
||||
|
||||
/**
|
||||
* When forking, the worker continues from an existing {@link StartNode} plus an optional
|
||||
* branch marker {@link StateNode} instead of allocating a new start blob.
|
||||
*/
|
||||
export type ForkContinuationOptions = {
|
||||
startHash: string;
|
||||
forkHeadHash: string;
|
||||
initialChain: ChainState;
|
||||
};
|
||||
|
||||
/** One replayed role step (prefill) before the generator runs (same layout as disk replay rows). */
|
||||
export type PrefilledDiskStep = {
|
||||
role: string;
|
||||
contentHash: string;
|
||||
@@ -30,37 +48,36 @@ export type ExecuteThreadOptions = {
|
||||
/** When non-null, written into the start record so tooling can trace lineage. */
|
||||
forkSourceThreadId: string | null;
|
||||
/**
|
||||
* Written to `.data.jsonl` immediately after the start record, before the generator runs.
|
||||
* When non-null, replays these steps into CAS before the generator runs.
|
||||
* Must match `input.steps` length and order when present.
|
||||
*/
|
||||
prefilledDiskSteps: PrefilledDiskStep[] | null;
|
||||
/** When non-null, skip creating a new {@link StartNode} and continue this CAS chain. */
|
||||
forkContinuation: ForkContinuationOptions | null;
|
||||
/**
|
||||
* When non-null, must match `input.steps.length`; supplies persisted timestamps for
|
||||
* {@link ThreadContext.steps} (used when restoring history without prefilled CAS replay).
|
||||
*/
|
||||
replayTimestamps: readonly number[] | null;
|
||||
/** Workspace root containing `workflow.yaml`; used to resolve the `extract` scene for meta extraction. */
|
||||
storageRoot: string;
|
||||
};
|
||||
|
||||
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
|
||||
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
|
||||
|
||||
export type ParsedThreadStartRecord = {
|
||||
workflowName: string;
|
||||
hash: string;
|
||||
threadId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
depth: number;
|
||||
};
|
||||
|
||||
export type ForkPlan = {
|
||||
export type CasForkPlan = {
|
||||
workflowName: string;
|
||||
hash: string;
|
||||
sourceThreadId: string;
|
||||
prompt: string;
|
||||
runOptions: { maxRounds: number; depth: number };
|
||||
historicalSteps: ForkHistoricalStep[];
|
||||
steps: RoleOutput[];
|
||||
stepTimestamps: number[];
|
||||
forkContinuation: ForkContinuationOptions;
|
||||
};
|
||||
|
||||
export type GcResult = {
|
||||
/** Count of root hashes seeded from thread indexes (`head`/`start` per entry). */
|
||||
scannedThreads: number;
|
||||
/** Reachable CAS blobs after the mark phase. */
|
||||
activeRefs: number;
|
||||
deletedEntries: number;
|
||||
deletedHashes: string[];
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { unlinkSync } from "node:fs";
|
||||
import { mkdir, unlink, writeFile } from "node:fs/promises";
|
||||
import { createServer, type Socket } from "node:net";
|
||||
import { dirname, join } from "node:path";
|
||||
@@ -17,7 +18,12 @@ import {
|
||||
} from "@uncaged/workflow-util";
|
||||
import { executeThread } from "./engine.js";
|
||||
import { createThreadPauseGate } from "./thread-pause-gate.js";
|
||||
import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js";
|
||||
import type {
|
||||
ExecuteThreadIo,
|
||||
ForkContinuationOptions,
|
||||
PrefilledDiskStep,
|
||||
ThreadPauseGate,
|
||||
} from "./types.js";
|
||||
|
||||
const bootLog = createLogger({ sink: { kind: "stderr" } });
|
||||
|
||||
@@ -28,9 +34,10 @@ type RunCommand = {
|
||||
prompt: string;
|
||||
options: { maxRounds: number; depth: number };
|
||||
steps: RoleOutput[];
|
||||
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
|
||||
/** Timestamps aligned with `steps` for replay / fork restore; length must match `steps` when steps are non-empty. */
|
||||
stepTimestamps: number[] | null;
|
||||
forkSourceThreadId: string | null;
|
||||
forkContinuation: ForkContinuationOptions | null;
|
||||
};
|
||||
|
||||
type KillCommand = {
|
||||
@@ -73,6 +80,7 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
|
||||
};
|
||||
}
|
||||
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: mirrors permissive worker IPC decoding shape checks
|
||||
function parseRunStepsPayload(rec: Record<string, unknown>): {
|
||||
steps: RoleOutput[];
|
||||
stepTimestamps: number[] | null;
|
||||
@@ -107,12 +115,60 @@ function parseRunStepsPayload(rec: Record<string, unknown>): {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const parallelTsRaw = rec.stepTimestamps;
|
||||
if (
|
||||
steps.length > 0 &&
|
||||
Array.isArray(parallelTsRaw) &&
|
||||
parallelTsRaw.length === steps.length &&
|
||||
parallelTsRaw.every((x): x is number => typeof x === "number")
|
||||
) {
|
||||
return { steps, stepTimestamps: [...parallelTsRaw] };
|
||||
}
|
||||
|
||||
return {
|
||||
steps,
|
||||
stepTimestamps: anyTimestamp ? timestamps : null,
|
||||
};
|
||||
}
|
||||
|
||||
function parseForkContinuation(rec: Record<string, unknown>): ForkContinuationOptions | null {
|
||||
const raw = rec.forkContinuation;
|
||||
if (raw === undefined || raw === null) {
|
||||
return null;
|
||||
}
|
||||
if (typeof raw !== "object") {
|
||||
return null;
|
||||
}
|
||||
const o = raw as Record<string, unknown>;
|
||||
const startHash = o.startHash;
|
||||
const forkHeadHash = o.forkHeadHash;
|
||||
const ic = o.initialChain;
|
||||
if (typeof startHash !== "string" || typeof forkHeadHash !== "string") {
|
||||
return null;
|
||||
}
|
||||
if (ic === null || typeof ic !== "object") {
|
||||
return null;
|
||||
}
|
||||
const ich = ic as Record<string, unknown>;
|
||||
const pph = ich.parentStateHash;
|
||||
const pa = ich.parentAncestors;
|
||||
if (!(pph === null || typeof pph === "string")) {
|
||||
return null;
|
||||
}
|
||||
if (!Array.isArray(pa) || !pa.every((x) => typeof x === "string")) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
startHash,
|
||||
forkHeadHash,
|
||||
initialChain: {
|
||||
parentStateHash: pph,
|
||||
parentAncestors: pa,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null {
|
||||
const threadId = rec.threadId;
|
||||
const workflowName = rec.workflowName;
|
||||
@@ -148,6 +204,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
|
||||
}
|
||||
forkSourceThreadId = rawFork;
|
||||
}
|
||||
const forkContinuation = parseForkContinuation(rec);
|
||||
return {
|
||||
type: "run",
|
||||
threadId,
|
||||
@@ -157,6 +214,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
|
||||
steps: parsedSteps.steps,
|
||||
stepTimestamps: parsedSteps.stepTimestamps,
|
||||
forkSourceThreadId,
|
||||
forkContinuation,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -325,6 +383,23 @@ async function main(): Promise<void> {
|
||||
let activeThreads = 0;
|
||||
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
function cleanupAllRunningMarkersSync(): void {
|
||||
for (const threadId of threads.keys()) {
|
||||
try {
|
||||
unlinkSync(join(storageRoot, "logs", hash, `${threadId}.running`));
|
||||
} catch {
|
||||
// ignore missing file or other fs errors
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const sig of ["SIGINT", "SIGTERM"] as const) {
|
||||
process.on(sig, () => {
|
||||
cleanupAllRunningMarkersSync();
|
||||
process.exit(sig === "SIGINT" ? 130 : 143);
|
||||
});
|
||||
}
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
|
||||
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
|
||||
@@ -357,6 +432,7 @@ async function main(): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: TCP worker multiplexes lifecycle + runs
|
||||
async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise<void> {
|
||||
if (cmd.type !== "run") {
|
||||
dispatchThreadLifecycleCommand(threads, socket, cmd);
|
||||
@@ -394,7 +470,19 @@ async function main(): Promise<void> {
|
||||
|
||||
const baseTs = Date.now();
|
||||
let prefilledDiskSteps: PrefilledDiskStep[] | null = null;
|
||||
if (cmd.steps.length > 0) {
|
||||
let replayTimestamps: readonly number[] | null = null;
|
||||
|
||||
if (cmd.forkContinuation !== null) {
|
||||
if (
|
||||
cmd.steps.length > 0 &&
|
||||
(cmd.stepTimestamps === null || cmd.stepTimestamps.length !== cmd.steps.length)
|
||||
) {
|
||||
bootLog("J5WQ8NXT", "forkContinuation requires stepTimestamps aligned with steps");
|
||||
throw new Error("forkContinuation requires stepTimestamps aligned with steps");
|
||||
}
|
||||
replayTimestamps =
|
||||
cmd.steps.length === 0 ? null : (cmd.stepTimestamps as readonly number[]);
|
||||
} else if (cmd.steps.length > 0) {
|
||||
prefilledDiskSteps = cmd.steps.map((step, i) => {
|
||||
const ts = cmd.stepTimestamps?.[i];
|
||||
return {
|
||||
@@ -417,6 +505,8 @@ async function main(): Promise<void> {
|
||||
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
|
||||
forkSourceThreadId: cmd.forkSourceThreadId,
|
||||
prefilledDiskSteps,
|
||||
forkContinuation: cmd.forkContinuation,
|
||||
replayTimestamps,
|
||||
storageRoot,
|
||||
},
|
||||
io,
|
||||
@@ -426,8 +516,8 @@ async function main(): Promise<void> {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
|
||||
} finally {
|
||||
threads.delete(threadId);
|
||||
await unlink(runningPath).catch(() => {});
|
||||
threads.delete(threadId);
|
||||
bumpDone();
|
||||
socket?.end();
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ const CAS_GET_TOOL_DEFINITION = {
|
||||
function: {
|
||||
name: "cas_get",
|
||||
description:
|
||||
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
|
||||
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
|
||||
parameters: {
|
||||
type: "object",
|
||||
properties: {
|
||||
@@ -102,7 +102,7 @@ export function createExtract(provider: LlmProvider, deps: ExtractDeps): Extract
|
||||
};
|
||||
},
|
||||
systemPromptForStructuredTool: (structuredToolName) =>
|
||||
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
|
||||
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, refs for content nodes or children for step/thread legacy nodes) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
|
||||
toolHandler: async (call, thread) => {
|
||||
if (call.function.name !== "cas_get") {
|
||||
return `Unexpected tool routed to handler: ${call.function.name}`;
|
||||
|
||||
@@ -1,25 +1,39 @@
|
||||
export { createWorkflow } from "./engine/create-workflow.js";
|
||||
export { executeThread } from "./engine/engine.js";
|
||||
export {
|
||||
buildForkPlan,
|
||||
parseThreadDataJsonl,
|
||||
selectForkHistoricalSteps,
|
||||
tryParseRoleStepRecord,
|
||||
FORK_BRANCH_ROLE,
|
||||
prepareCasFork,
|
||||
tryParseWorkflowResultRecord,
|
||||
walkStateFramesNewestFirst,
|
||||
} from "./engine/fork-thread.js";
|
||||
export { garbageCollectCas } from "./engine/gc.js";
|
||||
export { createThreadPauseGate } from "./engine/thread-pause-gate.js";
|
||||
export type {
|
||||
ThreadHistoryEntry,
|
||||
ThreadIndex,
|
||||
ThreadIndexEntry,
|
||||
} from "./engine/threads-index.js";
|
||||
export {
|
||||
appendThreadHistoryEntry,
|
||||
getBundleDir,
|
||||
readThreadsIndex,
|
||||
removeThreadEntry,
|
||||
removeThreadHistoryEntries,
|
||||
upsertThreadEntry,
|
||||
writeThreadsIndex,
|
||||
} from "./engine/threads-index.js";
|
||||
export type {
|
||||
CasForkPlan,
|
||||
ChainState,
|
||||
ExecuteThreadIo,
|
||||
ExecuteThreadOptions,
|
||||
ForkHistoricalStep,
|
||||
ForkPlan,
|
||||
ForkContinuationOptions,
|
||||
GcResult,
|
||||
ParsedThreadStartRecord,
|
||||
PrefilledDiskStep,
|
||||
SupervisorDecision,
|
||||
ThreadPauseGate,
|
||||
} from "./engine/types.js";
|
||||
export { EMPTY_CHAIN_STATE } from "./engine/types.js";
|
||||
export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js";
|
||||
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
|
||||
export {
|
||||
|
||||
@@ -101,6 +101,8 @@ export function workflowAsAgent(
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: ctx.threadId,
|
||||
prefilledDiskSteps: null,
|
||||
forkContinuation: null,
|
||||
replayTimestamps: null,
|
||||
storageRoot,
|
||||
},
|
||||
io,
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"name": "@uncaged/workflow-gateway",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "wrangler dev",
|
||||
"deploy": "wrangler deploy"
|
||||
},
|
||||
"dependencies": {
|
||||
"hono": "^4.7.11"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@cloudflare/workers-types": "^4.20260425.1",
|
||||
"wrangler": "^4.20.0"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
import { Hono } from "hono";
|
||||
import { cors } from "hono/cors";
|
||||
|
||||
type Env = {
|
||||
Bindings: {
|
||||
ENDPOINTS: KVNamespace;
|
||||
GATEWAY_SECRET: string;
|
||||
DASHBOARD_API_KEY: string;
|
||||
};
|
||||
};
|
||||
|
||||
type EndpointRecord = {
|
||||
name: string;
|
||||
url: string;
|
||||
agentToken: string;
|
||||
registeredAt: number;
|
||||
lastHeartbeat: number;
|
||||
};
|
||||
|
||||
const TTL_SECONDS = 300; // 5 min — offline if no heartbeat
|
||||
|
||||
const app = new Hono<Env>();
|
||||
|
||||
app.use("*", cors());
|
||||
|
||||
// ── Dashboard API key auth (skip healthz + register) ─────────────
|
||||
app.use("/endpoints", async (c, next) => {
|
||||
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
|
||||
await next();
|
||||
});
|
||||
app.use("/api/*", async (c, next) => {
|
||||
if (!checkDashboardAuth(c)) return c.json({ error: "unauthorized" }, 401);
|
||||
await next();
|
||||
});
|
||||
|
||||
function checkDashboardAuth(c: { req: { header: (n: string) => string | undefined; query: (n: string) => string | undefined }; env: Env["Bindings"] }): boolean {
|
||||
const bearer = c.req.header("Authorization")?.replace("Bearer ", "");
|
||||
const query = c.req.query("key");
|
||||
const key = bearer ?? query;
|
||||
return key === c.env.DASHBOARD_API_KEY;
|
||||
}
|
||||
|
||||
// ── Health ──────────────────────────────────────────────────────────
|
||||
app.get("/healthz", (c) => c.json({ ok: true }));
|
||||
|
||||
// ── Register / heartbeat ────────────────────────────────────────────
|
||||
app.post("/register", async (c) => {
|
||||
const body = await c.req.json<{ name?: string; url?: string; secret?: string; agentToken?: string }>();
|
||||
const { name, url, secret, agentToken } = body;
|
||||
|
||||
if (!name || !url) {
|
||||
return c.json({ error: "name and url required" }, 400);
|
||||
}
|
||||
if (secret !== c.env.GATEWAY_SECRET) {
|
||||
return c.json({ error: "unauthorized" }, 401);
|
||||
}
|
||||
|
||||
const existing = await c.env.ENDPOINTS.get<EndpointRecord>(name, "json");
|
||||
const now = Date.now();
|
||||
|
||||
const record: EndpointRecord = {
|
||||
name,
|
||||
url: url.replace(/\/+$/, ""), // strip trailing slash
|
||||
agentToken: agentToken ?? existing?.agentToken ?? "",
|
||||
registeredAt: existing?.registeredAt ?? now,
|
||||
lastHeartbeat: now,
|
||||
};
|
||||
|
||||
await c.env.ENDPOINTS.put(name, JSON.stringify(record), {
|
||||
expirationTtl: TTL_SECONDS,
|
||||
});
|
||||
|
||||
const status = existing ? 200 : 201;
|
||||
return c.json({ registered: name }, status);
|
||||
});
|
||||
|
||||
// ── Unregister ──────────────────────────────────────────────────────
|
||||
app.delete("/register/:name", async (c) => {
|
||||
const auth = c.req.header("Authorization");
|
||||
if (auth !== `Bearer ${c.env.GATEWAY_SECRET}`) {
|
||||
return c.json({ error: "unauthorized" }, 401);
|
||||
}
|
||||
|
||||
const name = c.req.param("name");
|
||||
await c.env.ENDPOINTS.delete(name);
|
||||
return c.json({ unregistered: name });
|
||||
});
|
||||
|
||||
// ── List endpoints ──────────────────────────────────────────────────
|
||||
app.get("/endpoints", async (c) => {
|
||||
const list = await c.env.ENDPOINTS.list();
|
||||
const endpoints: Array<{ name: string; url: string; status: string; lastHeartbeat: number }> = [];
|
||||
|
||||
for (const key of list.keys) {
|
||||
const record = await c.env.ENDPOINTS.get<EndpointRecord>(key.name, "json");
|
||||
if (record) {
|
||||
const age = Date.now() - record.lastHeartbeat;
|
||||
endpoints.push({
|
||||
name: record.name,
|
||||
url: record.url,
|
||||
status: age < TTL_SECONDS * 1000 ? "online" : "offline",
|
||||
lastHeartbeat: record.lastHeartbeat,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return c.json(endpoints);
|
||||
});
|
||||
|
||||
// ── API proxy: /api/:agent/* → agent's tunnel URL ───────────────────
|
||||
app.all("/api/:agent/*", async (c) => {
|
||||
const agent = c.req.param("agent");
|
||||
const record = await c.env.ENDPOINTS.get<EndpointRecord>(agent, "json");
|
||||
|
||||
if (!record) {
|
||||
return c.json({ error: "agent not found" }, 404);
|
||||
}
|
||||
|
||||
// Build target URL: strip /api/:agent prefix, forward the rest
|
||||
const url = new URL(c.req.url);
|
||||
const pathAfterAgent = url.pathname.replace(`/api/${agent}`, "");
|
||||
const targetUrl = `${record.url}/api${pathAfterAgent}${url.search}`;
|
||||
|
||||
const headers = new Headers(c.req.raw.headers);
|
||||
headers.delete("host");
|
||||
headers.delete("Authorization"); // don't forward dashboard key to agent
|
||||
if (record.agentToken) {
|
||||
headers.set("X-Agent-Token", record.agentToken);
|
||||
}
|
||||
|
||||
try {
|
||||
const resp = await fetch(targetUrl, {
|
||||
method: c.req.method,
|
||||
headers,
|
||||
body: c.req.method !== "GET" && c.req.method !== "HEAD" ? c.req.raw.body : undefined,
|
||||
});
|
||||
|
||||
// Stream response back
|
||||
return new Response(resp.body, {
|
||||
status: resp.status,
|
||||
headers: resp.headers,
|
||||
});
|
||||
} catch (err) {
|
||||
return c.json({ error: "agent unreachable", detail: String(err) }, 502);
|
||||
}
|
||||
});
|
||||
|
||||
export default app;
|
||||
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2022",
|
||||
"module": "ES2022",
|
||||
"moduleResolution": "bundler",
|
||||
"types": ["@cloudflare/workers-types"],
|
||||
"strict": true,
|
||||
"noEmit": true,
|
||||
"skipLibCheck": true
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
name = "workflow-gateway"
|
||||
main = "src/index.ts"
|
||||
compatibility_date = "2025-04-01"
|
||||
|
||||
[[kv_namespaces]]
|
||||
binding = "ENDPOINTS"
|
||||
id = "88b118d1cfab4c049f9c1684848811a3"
|
||||
|
||||
# GATEWAY_SECRET is set via `wrangler secret put`
|
||||
@@ -40,7 +40,9 @@ function isAllowedImportSpecifier(spec: string): boolean {
|
||||
if (
|
||||
spec === "@uncaged/workflow" ||
|
||||
spec === "@uncaged/workflow-runtime" ||
|
||||
spec === "@uncaged/workflow-cas"
|
||||
spec === "@uncaged/workflow-protocol" ||
|
||||
spec === "@uncaged/workflow-cas" ||
|
||||
spec === "@uncaged/workflow-util"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user