feat: Phase 5 — CLI + Dashboard CAS adaptation, cleanup .data.jsonl
- Align REST API contracts for Dashboard (threads list, detail, SSE) - Add content resolution from CAS in thread show + API responses - Rename dataWatcher → threadsJsonWatcher in SSE routes - Update docs (CLAUDE.md, architecture.md, skill.ts) to reflect CAS storage - Zero .data.jsonl code paths in production code - All 166 tests pass, bun run check clean Refs #155, closes #160 小橘 <xiaoju@shazhou.work>
This commit is contained in:
@@ -10,7 +10,7 @@ This monorepo implements a workflow engine that executes single-file ESM bundles
|
|||||||
|---------|-----------|
|
|---------|-----------|
|
||||||
| **Workflow** | A single-file ESM module that exports `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash (Crockford Base32). |
|
| **Workflow** | A single-file ESM module that exports `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash (Crockford Base32). |
|
||||||
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
||||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
|
| **Thread** | A single execution of a workflow, identified by a ULID. State lives in CAS (linked nodes); active threads indexed in `threads.json`; completed rows in `history/*.jsonl`. Debug logs use `.info.jsonl`. |
|
||||||
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. |
|
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. |
|
||||||
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ A workflow engine that executes single-file ESM bundles. Each workflow is a self
|
|||||||
|---------|-------------|
|
|---------|-------------|
|
||||||
| **Workflow** | A single-file ESM module exporting `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash. |
|
| **Workflow** | A single-file ESM module exporting `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash. |
|
||||||
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
| **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. |
|
||||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. |
|
| **Thread** | A single execution of a workflow, identified by a ULID. CAS-backed chain plus `threads.json` / `history/*.jsonl`; `.info.jsonl` for debug logs. |
|
||||||
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. Roles live inside template packages (`src/roles/`). |
|
| **Role** | A named actor within a workflow. Each role produces output with typed `meta`. Roles live inside template packages (`src/roles/`). |
|
||||||
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
| **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. |
|
||||||
| **CAS** | Content-Addressed Storage — bundles are immutable and addressed by hash. |
|
| **CAS** | Content-Addressed Storage — bundles are immutable and addressed by hash. |
|
||||||
|
|||||||
+12
-13
@@ -189,11 +189,15 @@ type WorkflowFn = (
|
|||||||
├── cas/ # Global content-addressed blobs (see getGlobalCasDir)
|
├── cas/ # Global content-addressed blobs (see getGlobalCasDir)
|
||||||
├── bundles/
|
├── bundles/
|
||||||
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
|
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
|
||||||
│ └── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
|
│ ├── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
|
||||||
|
│ └── C9NMV6V2TQT81/ # Per-hash bundle dir (alongside or instead of loose files)
|
||||||
|
│ ├── threads.json # Active threads: threadId → { head, start, updatedAt }
|
||||||
|
│ └── history/
|
||||||
|
│ └── 2026-05-09.jsonl # Completed threads (one JSON object per line)
|
||||||
├── logs/ # One folder per bundle hash
|
├── logs/ # One folder per bundle hash
|
||||||
│ └── C9NMV6V2TQT81/
|
│ └── C9NMV6V2TQT81/
|
||||||
│ ├── 01KQXKW…YG.data.jsonl # Thread state
|
│ ├── 01KQXKW…YG.running # Present while worker executes this thread (optional)
|
||||||
│ └── 01KQXKW…YG.info.jsonl # Debug log
|
│ └── 01KQXKW…YG.info.jsonl # Debug log
|
||||||
└── workflow.yaml # Registry
|
└── workflow.yaml # Registry
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -207,18 +211,13 @@ type WorkflowFn = (
|
|||||||
|
|
||||||
Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution.
|
Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution.
|
||||||
|
|
||||||
### Thread JSONL
|
### Thread storage (CAS + index)
|
||||||
|
|
||||||
**`.data.jsonl`** — Line 1: start record; following lines: role steps with CAS-backed content.
|
Thread execution state is a chain of immutable CAS nodes (`StartNode`, `StateNode`, content Merkle blobs). Per bundle:
|
||||||
|
|
||||||
```jsonc
|
- **`threads.json`** — only in-flight threads (`head`, `start`, `updatedAt`).
|
||||||
// Start record
|
- **`history/{YYYY-MM-DD}.jsonl`** — completed threads (`threadId`, `head`, `start`, `completedAt`).
|
||||||
{ "name": "solve-issue", "hash": "C9NMV6V2TQT81", "threadId": "01KQXKW…",
|
- **CAS (`cas/`)** — payloads and refs for replay, GC, and fork sharing.
|
||||||
"parameters": { "prompt": "Fix bug #3", "options": { "maxRounds": 5 } },
|
|
||||||
"timestamp": 1714963200000 }
|
|
||||||
// Role output (engine persists contentHash + refs; body in ~/.uncaged/workflow/cas/)
|
|
||||||
{ "role": "planner", "contentHash": "…", "meta": { "phases": [...] }, "refs": ["…"], "timestamp": ... }
|
|
||||||
```
|
|
||||||
|
|
||||||
**`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`:
|
**`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`:
|
||||||
|
|
||||||
|
|||||||
@@ -136,6 +136,7 @@ async function emitRecordsForHead(params: {
|
|||||||
await params.stream.writeSSE({
|
await params.stream.writeSSE({
|
||||||
event: "record",
|
event: "record",
|
||||||
data: JSON.stringify({
|
data: JSON.stringify({
|
||||||
|
type: "role",
|
||||||
role: fr.payload.role,
|
role: fr.payload.role,
|
||||||
contentHash: fr.payload.content,
|
contentHash: fr.payload.content,
|
||||||
content,
|
content,
|
||||||
@@ -309,7 +310,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
|||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
let completed = false;
|
let completed = false;
|
||||||
|
|
||||||
const dataWatcher = watch(threadsJsonPath, async () => {
|
const threadsJsonWatcher = watch(threadsJsonPath, async () => {
|
||||||
if (completed) {
|
if (completed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -334,7 +335,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
|||||||
|
|
||||||
stream.onAbort(() => {
|
stream.onAbort(() => {
|
||||||
completed = true;
|
completed = true;
|
||||||
dataWatcher.close();
|
threadsJsonWatcher.close();
|
||||||
infoWatcher?.close();
|
infoWatcher?.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -347,7 +348,7 @@ export function createLiveRoutes(storageRoot: string): Hono {
|
|||||||
stream.onAbort(() => resolve());
|
stream.onAbort(() => resolve());
|
||||||
});
|
});
|
||||||
|
|
||||||
dataWatcher.close();
|
threadsJsonWatcher.close();
|
||||||
infoWatcher?.close();
|
infoWatcher?.close();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
import { createCasStore } from "@uncaged/workflow-cas";
|
import { join } from "node:path";
|
||||||
|
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||||
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
||||||
import { END } from "@uncaged/workflow-runtime";
|
import { END } from "@uncaged/workflow-runtime";
|
||||||
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
import { getGlobalCasDir } from "@uncaged/workflow-util";
|
||||||
import { Hono } from "hono";
|
import { Hono } from "hono";
|
||||||
|
|
||||||
|
import { pathExists } from "../../fs-utils.js";
|
||||||
|
import type { ResolvedThreadRecord } from "../../thread-scan.js";
|
||||||
import {
|
import {
|
||||||
listHistoricalThreads,
|
listHistoricalThreads,
|
||||||
listRunningThreads,
|
listRunningThreads,
|
||||||
@@ -12,13 +15,76 @@ import {
|
|||||||
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
|
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
|
||||||
import { cmdRun } from "../thread/run.js";
|
import { cmdRun } from "../thread/run.js";
|
||||||
|
|
||||||
|
async function buildThreadDetailRecords(
|
||||||
|
storageRoot: string,
|
||||||
|
resolved: ResolvedThreadRecord,
|
||||||
|
): Promise<unknown[]> {
|
||||||
|
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||||
|
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
||||||
|
const chronological = [...frames].reverse();
|
||||||
|
|
||||||
|
const records: unknown[] = [
|
||||||
|
{
|
||||||
|
type: "thread-start",
|
||||||
|
threadId: resolved.threadId,
|
||||||
|
bundleHash: resolved.bundleHash,
|
||||||
|
head: resolved.head,
|
||||||
|
start: resolved.start,
|
||||||
|
source: resolved.source,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
for (const fr of chronological) {
|
||||||
|
if (fr.payload.role === FORK_BRANCH_ROLE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (fr.payload.role === END) {
|
||||||
|
const returnCode = fr.payload.meta.returnCode;
|
||||||
|
const summary = fr.payload.meta.summary;
|
||||||
|
if (typeof returnCode === "number" && typeof summary === "string") {
|
||||||
|
records.push({ type: "workflow-result", returnCode, summary });
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
|
||||||
|
const content =
|
||||||
|
payloadText !== null
|
||||||
|
? payloadText
|
||||||
|
: `(content not in CAS; contentHash=${fr.payload.content})`;
|
||||||
|
records.push({
|
||||||
|
type: "role",
|
||||||
|
role: fr.payload.role,
|
||||||
|
contentHash: fr.payload.content,
|
||||||
|
content,
|
||||||
|
meta: fr.payload.meta,
|
||||||
|
timestamp: fr.payload.timestamp,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
export function createThreadRoutes(storageRoot: string): Hono {
|
export function createThreadRoutes(storageRoot: string): Hono {
|
||||||
const app = new Hono();
|
const app = new Hono();
|
||||||
|
|
||||||
app.get("/", async (c) => {
|
app.get("/", async (c) => {
|
||||||
const nameFilter = c.req.query("workflow") ?? null;
|
const nameFilter = c.req.query("workflow") ?? null;
|
||||||
const rows = await listHistoricalThreads(storageRoot, nameFilter);
|
const rows = await listHistoricalThreads(storageRoot, nameFilter);
|
||||||
return c.json({ threads: rows });
|
const threads = await Promise.all(
|
||||||
|
rows.map(async (r) => {
|
||||||
|
const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`);
|
||||||
|
const isRunning = await pathExists(runningPath);
|
||||||
|
const status = r.source === "history" ? "completed" : isRunning ? "running" : "active";
|
||||||
|
return {
|
||||||
|
threadId: r.threadId,
|
||||||
|
workflow: r.workflowName,
|
||||||
|
hash: r.hash,
|
||||||
|
startedAt: new Date(r.activityTs).toISOString(),
|
||||||
|
status,
|
||||||
|
};
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
return c.json({ threads });
|
||||||
});
|
});
|
||||||
|
|
||||||
app.get("/running", async (c) => {
|
app.get("/running", async (c) => {
|
||||||
@@ -32,42 +98,7 @@ export function createThreadRoutes(storageRoot: string): Hono {
|
|||||||
if (resolved === null) {
|
if (resolved === null) {
|
||||||
return c.json({ error: `thread not found: ${threadId}` }, 404);
|
return c.json({ error: `thread not found: ${threadId}` }, 404);
|
||||||
}
|
}
|
||||||
|
const records = await buildThreadDetailRecords(storageRoot, resolved);
|
||||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
|
||||||
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
|
||||||
const chronological = [...frames].reverse();
|
|
||||||
|
|
||||||
const records: unknown[] = [
|
|
||||||
{
|
|
||||||
type: "thread-start",
|
|
||||||
threadId: resolved.threadId,
|
|
||||||
bundleHash: resolved.bundleHash,
|
|
||||||
head: resolved.head,
|
|
||||||
start: resolved.start,
|
|
||||||
source: resolved.source,
|
|
||||||
},
|
|
||||||
];
|
|
||||||
|
|
||||||
for (const fr of chronological) {
|
|
||||||
if (fr.payload.role === FORK_BRANCH_ROLE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (fr.payload.role === END) {
|
|
||||||
const returnCode = fr.payload.meta.returnCode;
|
|
||||||
const summary = fr.payload.meta.summary;
|
|
||||||
if (typeof returnCode === "number" && typeof summary === "string") {
|
|
||||||
records.push({ type: "workflow-result", returnCode, summary });
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
records.push({
|
|
||||||
role: fr.payload.role,
|
|
||||||
contentHash: fr.payload.content,
|
|
||||||
meta: fr.payload.meta,
|
|
||||||
timestamp: fr.payload.timestamp,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.json({ threadId, records });
|
return c.json({ threadId, records });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { createCasStore } from "@uncaged/workflow-cas";
|
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
|
||||||
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
|
||||||
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
import { err, ok, type Result } from "@uncaged/workflow-protocol";
|
||||||
import { END } from "@uncaged/workflow-runtime";
|
import { END } from "@uncaged/workflow-runtime";
|
||||||
@@ -19,15 +19,20 @@ export async function cmdThreadShow(
|
|||||||
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
|
||||||
const chronological = [...frames].reverse();
|
const chronological = [...frames].reverse();
|
||||||
|
|
||||||
const steps: Array<{ role: string; hash: string; timestamp: number }> = [];
|
const steps: Array<{ role: string; hash: string; timestamp: number; content: string }> = [];
|
||||||
for (const fr of chronological) {
|
for (const fr of chronological) {
|
||||||
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
|
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
const payloadText = await getContentMerklePayload(cas, fr.payload.content);
|
||||||
steps.push({
|
steps.push({
|
||||||
role: fr.payload.role,
|
role: fr.payload.role,
|
||||||
hash: fr.hash,
|
hash: fr.hash,
|
||||||
timestamp: fr.payload.timestamp,
|
timestamp: fr.payload.timestamp,
|
||||||
|
content:
|
||||||
|
payloadText !== null
|
||||||
|
? payloadText
|
||||||
|
: `(content not in CAS; contentHash=${fr.payload.content})`,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,8 +70,8 @@ function formatSkillCli(): string {
|
|||||||
|---------|-------------|
|
|---------|-------------|
|
||||||
| **Workflow** | A single-file ESM bundle (\`.esm.js\`) that exports \`run\` and \`descriptor\`. Identified by name and XXH64 hash. |
|
| **Workflow** | A single-file ESM bundle (\`.esm.js\`) that exports \`run\` and \`descriptor\`. Identified by name and XXH64 hash. |
|
||||||
| **Bundle** | The physical \`.esm.js\` file stored in the bundles directory. Immutable once written. |
|
| **Bundle** | The physical \`.esm.js\` file stored in the bundles directory. Immutable once written. |
|
||||||
| **Thread** | A single execution of a workflow, identified by a ULID. Persists state as JSONL files. |
|
| **Thread** | A single execution of a workflow, identified by a ULID. CAS state chain; \`threads.json\` for active; \`history/*.jsonl\` when done; \`.info.jsonl\` for debug logs. |
|
||||||
| **CAS** | Content-Addressable Storage. Per-thread key-value store keyed by content hash. |
|
| **CAS** | Global content-addressable blob store (\`cas/\`), keyed by hash. |
|
||||||
| **Registry** | \`workflow.yaml\` — maps workflow names to their current and historical bundle hashes. |
|
| **Registry** | \`workflow.yaml\` — maps workflow names to their current and historical bundle hashes. |
|
||||||
|
|
||||||
## Commands
|
## Commands
|
||||||
|
|||||||
@@ -94,6 +94,10 @@ export type HistoricalThreadRow = {
|
|||||||
threadId: string;
|
threadId: string;
|
||||||
hash: string;
|
hash: string;
|
||||||
workflowName: string | null;
|
workflowName: string | null;
|
||||||
|
/** Active entry from `threads.json` vs completed line from `history/*.jsonl`. */
|
||||||
|
source: "active" | "history";
|
||||||
|
/** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */
|
||||||
|
activityTs: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type ResolvedThreadRecord = {
|
export type ResolvedThreadRecord = {
|
||||||
@@ -243,7 +247,13 @@ export async function listHistoricalThreads(
|
|||||||
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
out.push({ threadId, hash: bundleHash, workflowName });
|
out.push({
|
||||||
|
threadId,
|
||||||
|
hash: bundleHash,
|
||||||
|
workflowName,
|
||||||
|
source: "active",
|
||||||
|
activityTs: entry.updatedAt,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const histDir = join(bundleDir, "history");
|
const histDir = join(bundleDir, "history");
|
||||||
@@ -271,7 +281,13 @@ export async function listHistoricalThreads(
|
|||||||
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
out.push({ threadId: e.threadId, hash: bundleHash, workflowName });
|
out.push({
|
||||||
|
threadId: e.threadId,
|
||||||
|
hash: bundleHash,
|
||||||
|
workflowName,
|
||||||
|
source: "history",
|
||||||
|
activityTs: e.completedAt,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user