From e29d1bf34525e493182b9ba2581b17058ced8aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 9 May 2026 08:16:04 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=205=20=E2=80=94=20CLI=20+=20Dashb?= =?UTF-8?q?oard=20CAS=20adaptation,=20cleanup=20.data.jsonl?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Align REST API contracts for Dashboard (threads list, detail, SSE) - Add content resolution from CAS in thread show + API responses - Rename dataWatcher → threadsJsonWatcher in SSE routes - Update docs (CLAUDE.md, architecture.md, skill.ts) to reflect CAS storage - Zero .data.jsonl code paths in production code - All 166 tests pass, bun run check clean Refs #155, closes #160 小橘 --- CLAUDE.md | 2 +- README.md | 2 +- docs/architecture.md | 25 ++-- .../src/commands/serve/routes-live.ts | 7 +- .../src/commands/serve/routes-thread.ts | 107 +++++++++++------- .../cli-workflow/src/commands/thread/show.ts | 9 +- packages/cli-workflow/src/skill.ts | 4 +- packages/cli-workflow/src/thread-scan.ts | 20 +++- 8 files changed, 114 insertions(+), 62 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 70f48ad..9bbbdb8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,7 @@ This monorepo implements a workflow engine that executes single-file ESM bundles |---------|-----------| | **Workflow** | A single-file ESM module that exports `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash (Crockford Base32). | | **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. | -| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. | +| **Thread** | A single execution of a workflow, identified by a ULID. State lives in CAS (linked nodes); active threads indexed in `threads.json`; completed rows in `history/*.jsonl`. Debug logs use `.info.jsonl`. | | **Role** | A named actor within a workflow. Each role produces output with typed `meta`. | | **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. | diff --git a/README.md b/README.md index 6bac153..b85cc44 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A workflow engine that executes single-file ESM bundles. Each workflow is a self |---------|-------------| | **Workflow** | A single-file ESM module exporting `run` (workflow function) and `descriptor` (metadata). Identified by its XXH64 hash. | | **Bundle** | The physical `.esm.js` file stored in `~/.uncaged/workflow/bundles/`. | -| **Thread** | A single execution of a workflow, identified by a ULID. Persisted as `.data.jsonl` + `.info.jsonl`. | +| **Thread** | A single execution of a workflow, identified by a ULID. CAS-backed chain plus `threads.json` / `history/*.jsonl`; `.info.jsonl` for debug logs. | | **Role** | A named actor within a workflow. Each role produces output with typed `meta`. Roles live inside template packages (`src/roles/`). | | **Registry** | `workflow.yaml` — maps workflow names to current/historical bundle hashes. | | **CAS** | Content-Addressed Storage — bundles are immutable and addressed by hash. | diff --git a/docs/architecture.md b/docs/architecture.md index 0f06ef1..8639c4e 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -189,11 +189,15 @@ type WorkflowFn = ( ├── cas/ # Global content-addressed blobs (see getGlobalCasDir) ├── bundles/ │ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64 -│ └── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present) +│ ├── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present) +│ └── C9NMV6V2TQT81/ # Per-hash bundle dir (alongside or instead of loose files) +│ ├── threads.json # Active threads: threadId → { head, start, updatedAt } +│ └── history/ +│ └── 2026-05-09.jsonl # Completed threads (one JSON object per line) ├── logs/ # One folder per bundle hash │ └── C9NMV6V2TQT81/ -│ ├── 01KQXKW…YG.data.jsonl # Thread state -│ └── 01KQXKW…YG.info.jsonl # Debug log +│ ├── 01KQXKW…YG.running # Present while worker executes this thread (optional) +│ └── 01KQXKW…YG.info.jsonl # Debug log └── workflow.yaml # Registry ``` @@ -207,18 +211,13 @@ type WorkflowFn = ( Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution. -### Thread JSONL +### Thread storage (CAS + index) -**`.data.jsonl`** — Line 1: start record; following lines: role steps with CAS-backed content. +Thread execution state is a chain of immutable CAS nodes (`StartNode`, `StateNode`, content Merkle blobs). Per bundle: -```jsonc -// Start record -{ "name": "solve-issue", "hash": "C9NMV6V2TQT81", "threadId": "01KQXKW…", - "parameters": { "prompt": "Fix bug #3", "options": { "maxRounds": 5 } }, - "timestamp": 1714963200000 } -// Role output (engine persists contentHash + refs; body in ~/.uncaged/workflow/cas/) -{ "role": "planner", "contentHash": "…", "meta": { "phases": [...] }, "refs": ["…"], "timestamp": ... } -``` +- **`threads.json`** — only in-flight threads (`head`, `start`, `updatedAt`). +- **`history/{YYYY-MM-DD}.jsonl`** — completed threads (`threadId`, `head`, `start`, `completedAt`). +- **CAS (`cas/`)** — payloads and refs for replay, GC, and fork sharing. **`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`: diff --git a/packages/cli-workflow/src/commands/serve/routes-live.ts b/packages/cli-workflow/src/commands/serve/routes-live.ts index fa787b3..2bcd32b 100644 --- a/packages/cli-workflow/src/commands/serve/routes-live.ts +++ b/packages/cli-workflow/src/commands/serve/routes-live.ts @@ -136,6 +136,7 @@ async function emitRecordsForHead(params: { await params.stream.writeSSE({ event: "record", data: JSON.stringify({ + type: "role", role: fr.payload.role, contentHash: fr.payload.content, content, @@ -309,7 +310,7 @@ export function createLiveRoutes(storageRoot: string): Hono { const controller = new AbortController(); let completed = false; - const dataWatcher = watch(threadsJsonPath, async () => { + const threadsJsonWatcher = watch(threadsJsonPath, async () => { if (completed) { return; } @@ -334,7 +335,7 @@ export function createLiveRoutes(storageRoot: string): Hono { stream.onAbort(() => { completed = true; - dataWatcher.close(); + threadsJsonWatcher.close(); infoWatcher?.close(); }); @@ -347,7 +348,7 @@ export function createLiveRoutes(storageRoot: string): Hono { stream.onAbort(() => resolve()); }); - dataWatcher.close(); + threadsJsonWatcher.close(); infoWatcher?.close(); }); }); diff --git a/packages/cli-workflow/src/commands/serve/routes-thread.ts b/packages/cli-workflow/src/commands/serve/routes-thread.ts index 1ca76cc..36f2bac 100644 --- a/packages/cli-workflow/src/commands/serve/routes-thread.ts +++ b/packages/cli-workflow/src/commands/serve/routes-thread.ts @@ -1,9 +1,12 @@ -import { createCasStore } from "@uncaged/workflow-cas"; +import { join } from "node:path"; +import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute"; import { END } from "@uncaged/workflow-runtime"; import { getGlobalCasDir } from "@uncaged/workflow-util"; import { Hono } from "hono"; +import { pathExists } from "../../fs-utils.js"; +import type { ResolvedThreadRecord } from "../../thread-scan.js"; import { listHistoricalThreads, listRunningThreads, @@ -12,13 +15,76 @@ import { import { cmdKill, cmdPause, cmdResume } from "../thread/control.js"; import { cmdRun } from "../thread/run.js"; +async function buildThreadDetailRecords( + storageRoot: string, + resolved: ResolvedThreadRecord, +): Promise { + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const frames = await walkStateFramesNewestFirst(cas, resolved.head); + const chronological = [...frames].reverse(); + + const records: unknown[] = [ + { + type: "thread-start", + threadId: resolved.threadId, + bundleHash: resolved.bundleHash, + head: resolved.head, + start: resolved.start, + source: resolved.source, + }, + ]; + + for (const fr of chronological) { + if (fr.payload.role === FORK_BRANCH_ROLE) { + continue; + } + if (fr.payload.role === END) { + const returnCode = fr.payload.meta.returnCode; + const summary = fr.payload.meta.summary; + if (typeof returnCode === "number" && typeof summary === "string") { + records.push({ type: "workflow-result", returnCode, summary }); + } + continue; + } + const payloadText = await getContentMerklePayload(cas, fr.payload.content); + const content = + payloadText !== null + ? payloadText + : `(content not in CAS; contentHash=${fr.payload.content})`; + records.push({ + type: "role", + role: fr.payload.role, + contentHash: fr.payload.content, + content, + meta: fr.payload.meta, + timestamp: fr.payload.timestamp, + }); + } + + return records; +} + export function createThreadRoutes(storageRoot: string): Hono { const app = new Hono(); app.get("/", async (c) => { const nameFilter = c.req.query("workflow") ?? null; const rows = await listHistoricalThreads(storageRoot, nameFilter); - return c.json({ threads: rows }); + const threads = await Promise.all( + rows.map(async (r) => { + const runningPath = join(storageRoot, "logs", r.hash, `${r.threadId}.running`); + const isRunning = await pathExists(runningPath); + const status = r.source === "history" ? "completed" : isRunning ? "running" : "active"; + return { + threadId: r.threadId, + workflow: r.workflowName, + hash: r.hash, + startedAt: new Date(r.activityTs).toISOString(), + status, + }; + }), + ); + return c.json({ threads }); }); app.get("/running", async (c) => { @@ -32,42 +98,7 @@ export function createThreadRoutes(storageRoot: string): Hono { if (resolved === null) { return c.json({ error: `thread not found: ${threadId}` }, 404); } - - const cas = createCasStore(getGlobalCasDir(storageRoot)); - const frames = await walkStateFramesNewestFirst(cas, resolved.head); - const chronological = [...frames].reverse(); - - const records: unknown[] = [ - { - type: "thread-start", - threadId: resolved.threadId, - bundleHash: resolved.bundleHash, - head: resolved.head, - start: resolved.start, - source: resolved.source, - }, - ]; - - for (const fr of chronological) { - if (fr.payload.role === FORK_BRANCH_ROLE) { - continue; - } - if (fr.payload.role === END) { - const returnCode = fr.payload.meta.returnCode; - const summary = fr.payload.meta.summary; - if (typeof returnCode === "number" && typeof summary === "string") { - records.push({ type: "workflow-result", returnCode, summary }); - } - continue; - } - records.push({ - role: fr.payload.role, - contentHash: fr.payload.content, - meta: fr.payload.meta, - timestamp: fr.payload.timestamp, - }); - } - + const records = await buildThreadDetailRecords(storageRoot, resolved); return c.json({ threadId, records }); }); diff --git a/packages/cli-workflow/src/commands/thread/show.ts b/packages/cli-workflow/src/commands/thread/show.ts index 0514bf4..fdc08a1 100644 --- a/packages/cli-workflow/src/commands/thread/show.ts +++ b/packages/cli-workflow/src/commands/thread/show.ts @@ -1,4 +1,4 @@ -import { createCasStore } from "@uncaged/workflow-cas"; +import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute"; import { err, ok, type Result } from "@uncaged/workflow-protocol"; import { END } from "@uncaged/workflow-runtime"; @@ -19,15 +19,20 @@ export async function cmdThreadShow( const frames = await walkStateFramesNewestFirst(cas, resolved.head); const chronological = [...frames].reverse(); - const steps: Array<{ role: string; hash: string; timestamp: number }> = []; + const steps: Array<{ role: string; hash: string; timestamp: number; content: string }> = []; for (const fr of chronological) { if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) { continue; } + const payloadText = await getContentMerklePayload(cas, fr.payload.content); steps.push({ role: fr.payload.role, hash: fr.hash, timestamp: fr.payload.timestamp, + content: + payloadText !== null + ? payloadText + : `(content not in CAS; contentHash=${fr.payload.content})`, }); } diff --git a/packages/cli-workflow/src/skill.ts b/packages/cli-workflow/src/skill.ts index a697641..8a2b076 100644 --- a/packages/cli-workflow/src/skill.ts +++ b/packages/cli-workflow/src/skill.ts @@ -70,8 +70,8 @@ function formatSkillCli(): string { |---------|-------------| | **Workflow** | A single-file ESM bundle (\`.esm.js\`) that exports \`run\` and \`descriptor\`. Identified by name and XXH64 hash. | | **Bundle** | The physical \`.esm.js\` file stored in the bundles directory. Immutable once written. | -| **Thread** | A single execution of a workflow, identified by a ULID. Persists state as JSONL files. | -| **CAS** | Content-Addressable Storage. Per-thread key-value store keyed by content hash. | +| **Thread** | A single execution of a workflow, identified by a ULID. CAS state chain; \`threads.json\` for active; \`history/*.jsonl\` when done; \`.info.jsonl\` for debug logs. | +| **CAS** | Global content-addressable blob store (\`cas/\`), keyed by hash. | | **Registry** | \`workflow.yaml\` — maps workflow names to their current and historical bundle hashes. | ## Commands diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts index ab1889f..17b9659 100644 --- a/packages/cli-workflow/src/thread-scan.ts +++ b/packages/cli-workflow/src/thread-scan.ts @@ -94,6 +94,10 @@ export type HistoricalThreadRow = { threadId: string; hash: string; workflowName: string | null; + /** Active entry from `threads.json` vs completed line from `history/*.jsonl`. */ + source: "active" | "history"; + /** `updatedAt` for active threads; `completedAt` for history (ms since epoch). */ + activityTs: number; }; export type ResolvedThreadRecord = { @@ -243,7 +247,13 @@ export async function listHistoricalThreads( if (workflowNameFilter !== null && workflowName !== workflowNameFilter) { continue; } - out.push({ threadId, hash: bundleHash, workflowName }); + out.push({ + threadId, + hash: bundleHash, + workflowName, + source: "active", + activityTs: entry.updatedAt, + }); } const histDir = join(bundleDir, "history"); @@ -271,7 +281,13 @@ export async function listHistoricalThreads( if (workflowNameFilter !== null && workflowName !== workflowNameFilter) { continue; } - out.push({ threadId: e.threadId, hash: bundleHash, workflowName }); + out.push({ + threadId: e.threadId, + hash: bundleHash, + workflowName, + source: "history", + activityTs: e.completedAt, + }); } } }