Compare commits

...

16 Commits

Author SHA1 Message Date
xiaoju a222db6f2d fix(workflow): bundle build & register workflow
- Add missing agent deps to template package.json so bun workspace
  resolution works during `bun build` (workflow-agent-cursor for develop,
  workflow-agent-hermes + workflow-execute for solve-issue)
- Create scripts/build-bundles.sh that builds both template bundles into
  dist/*.esm.js with correct --external flags for runtime-symlinked packages
- Add build:bundles script to root package.json
- Extend ensureUncagedWorkflowSymlink to also symlink workflow-util,
  workflow-execute, and workflow-register (needed by externalized bundles)
- Defer agent creation in develop bundle-entry.ts via lazy init so
  requireEnv('WORKFLOW_LLM_API_KEY') only throws at first invocation,
  not at import() time (fixes workflowAsAgent crash)
- Document that env vars must be set before the first worker spawn
  (workers are persistent and don't inherit later env changes)

Fixes #206

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 02:58:45 +00:00
xiaomo 1c68ce6217 Merge pull request 'feat: Phase 3 — agent observability for Merkle call stack' (#203) from feat/197-agent-observability into main 2026-05-12 02:34:47 +00:00
xingyue 7265603b55 feat: click graph node to scroll and highlight RecordCard (#198 Phase 3) 2026-05-12 10:34:06 +08:00
xiaoju 74cea09ac0 fix: bundle validator accepts Identifier init and wildcard @uncaged/workflow-* imports
- bindingInitializerIsCallable: accept Identifier (e.g. var run = wf)
- import allowlist: startsWith('@uncaged/workflow') instead of exact match list

小橘 🍊(NEKO Team)
2026-05-12 02:33:28 +00:00
xingyue b1e66fa7a4 fix: use async/await instead of .then() in getWorkflowDescriptor 2026-05-12 10:29:50 +08:00
xiaomo 81a7a8c7c1 Merge pull request 'feat: Dashboard workflow graph visualization (React Flow)' (#204) from feat/198-dashboard-workflow-graph into main 2026-05-12 02:28:40 +00:00
xingyue 9cb7d68abe feat: Dashboard workflow graph visualization with React Flow (#198)
Phase 1: API + static graph rendering

Backend:
- GET /workflows/:name now returns descriptor (with graph) from bundle YAML
- Graceful fallback to null if YAML missing/invalid

Frontend:
- New workflow-graph/ component module (7 files)
- React Flow + dagre auto-layout (TB direction)
- Custom nodes: RoleNode (rounded rect) + TerminalNode (circle for START/END)
- Custom edges: dashed for FALLBACK, solid with label for conditions
- Self-loop edges supported (e.g. coder → coder)
- Node states: default/completed/active with color-coded borders
- Active node pulse animation
- Collapsible graph panel (300px) above thread records
- Dark theme using existing CSS variables

Integration:
- ThreadDetail extracts workflow name → fetches descriptor → computes node states → renders graph
- Node states derived from ThreadRecord[] (completed/active/default)
2026-05-12 10:27:07 +08:00
xiaoju 98122b446d feat: Phase 3 — agent observability for Merkle call stack
- StartStep gains parentState: string | null (from StartNodePayload)
- buildAgentPrompt injects Parent Context section when parentState is set
- CLI thread show outputs parentState (top-level) and childThread (per step)
- 2 new prompt tests + thread show assertion updates

Refs #197, #194

小橘 🍊(NEKO Team)
2026-05-12 02:23:15 +00:00
xiaoju 4a31cf9d63 fix: workflowAsAgent error paths return AgentFnResult instead of plain string
Nit from PR #202 review — all error returns now use { output, childThread: null }
for type consistency with the success path.

小橘 🍊(NEKO Team)
2026-05-12 02:15:06 +00:00
xingyue 2c26be6ec6 docs: update graph visualization RFC — Phase 0 done (#198) 2026-05-12 10:13:39 +08:00
xiaomo f723daa014 Merge pull request 'feat(#194): Phase 2 — Engine layer Merkle call stack' (#202) from feat/194-merkle-call-stack-phase2 into main 2026-05-12 02:11:24 +00:00
xiaoju 1e9900bed3 feat(#194): Phase 2 — Engine layer Merkle call stack wiring
- Protocol: AgentFnResult = string | { output, childThread }, RoleOutput.childThread,
  ThreadContext.bundleHash for parent state lookup
- Runtime: create-workflow normalizes AgentFnResult, propagates childThread in RoleOutput
- Engine: ExecuteThreadOptions.parentStateHash, appendStateForStep writes childThread,
  putStartNode uses parentStateHash
- workflowAsAgent: reads parent head state from threads.json, passes parentStateHash
  to child, returns { output, childThread: rootHash }
- Integration test: 4 cases verifying bidirectional Merkle links (306 lines)

Phase 2 of #194 (Merkle Call Stack). Closes #196.

小橘 <xiaoju@shazhou.work>
2026-05-12 02:10:06 +00:00
xiaomo aebff8b906 Merge pull request 'refactor: replace Moderator function with ModeratorTable in WorkflowDefinition' (#201) from refactor/200-moderator-table into main 2026-05-12 02:06:03 +00:00
xiaomo 9c1b018ffa Merge pull request 'feat(#194): Phase 1 — Merkle Call Stack protocol + CAS layer' (#199) from feat/194-merkle-call-stack-phase1 into main 2026-05-12 01:50:05 +00:00
xiaoju a98431a12a feat(#194): Phase 1 — parentState / childThread in CAS nodes
- Protocol: StartNodePayload.parentState, StateNodePayload.childThread
- CAS: putStartNode refs include parentState, collectRefs includes childThread
- Parsing: legacy nodes without new fields default to null
- Engine + fork: all callers pass parentState: null / childThread: null
- Tests: 8 new cases for refs, parsing, collect-refs (+208 lines)

Phase 1 of #194 (Merkle Call Stack). Closes #195.

小橘 <xiaoju@shazhou.work>
2026-05-12 01:42:10 +00:00
xiaoju e37dbc3f35 wip: Phase 1 protocol + CAS types for Merkle call stack
小橘 <xiaoju@shazhou.work>
2026-05-12 01:35:45 +00:00
57 changed files with 1565 additions and 126 deletions
@@ -1,68 +1,53 @@
# Dashboard Workflow Graph Visualization
**Issue**: #198
**Status**: Draft
**Status**: In Progress
**Author**: xingyue
## Overview
在 Dashboard 的 ThreadDetail 页面中嵌入一个交互式流程图,将 workflow 的 `ModeratorTable` 可视化为有向图。用户可以一眼看到角色流转结构和当前执行进度。
## 数据
## 数据层(✅ 已完成 — PR #201)
### 问题:ModeratorTable 在 bundle 端,Dashboard 在前端
### WorkflowGraph 类型
`ModeratorTable` 是运行时数据结构(包含 JS 函数引用如 `check`),无法直接序列化给前端。需要一个**静态图描述格式**作为中间层。
### 方案:扩展 WorkflowDescriptor
当前 `WorkflowDescriptor` 只有 roles + description,不包含转换图信息:
`WorkflowDefinition.moderator`(函数)已替换为 `WorkflowDefinition.table`(声明式 `ModeratorTable`),`buildDescriptor` 自动从 table 提取 graph:
```ts
type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
```
**扩展为**
```ts
type TransitionEdge = {
condition: string; // condition.name,或 "FALLBACK"
description: string | null; // condition.description,FALLBACK 为 null
target: string; // role name 或 "__end__"
type WorkflowGraphEdge = {
from: string; // source role 或 "__start__"
to: string; // target role 或 "__end__"
condition: string; // condition.name 或 "FALLBACK"
conditionDescription: string | null;
};
type WorkflowGraph = Record<string, TransitionEdge[]>;
// key = source role name 或 "__start__"
type WorkflowGraph = {
edges: readonly WorkflowGraphEdge[];
};
type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
graph: WorkflowGraph | null; // null = legacy bundles without graph
graph: WorkflowGraph; // 必填,新 bundle 自动生成
};
```
`buildDescriptor``workflow-register`)中,从 `ModeratorTable` 提取静态图结构。`condition.check` 函数不序列化,只保留 `name``description`
### 数据暴露路径
### 数据流
```
ModeratorTable (runtime)
→ buildDescriptor() 提取 graph
→ descriptor.yaml 持久化
→ CLI serve /workflows API 返回
ModeratorTable (WorkflowDefinition.table)
→ buildDescriptor() 自动提取 graph
→ descriptor.yaml 持久化(hash.yaml)
→ CLI serve /workflows/:name API 返回 descriptor
→ Dashboard 前端拿到 graph
```
同时需要新增或扩展一个 API,让 Dashboard 能拿到指定 workflow 的 descriptor(含 graph):
### 剩余数据层工作
```
GET /workflows/:name → { descriptor: WorkflowDescriptor }
```
**serve API 需要返回 descriptor**:当前 `GET /workflows/:name` 只返回 registry entry(hash + timestamp),不含 descriptor。需要从 `bundles/{hash}.yaml` 读取 descriptor 并返回给前端。
或者在现有 `listWorkflows` 返回中附带。
方案:在 `routes-workflow.ts``GET /workflows/:name` 响应中附带 `descriptor` 字段。或者:thread-detail 发现 workflow name 后,请求 `GET /workflows/:name/descriptor` 拿到 graph
## 前端渲染
@@ -87,17 +72,17 @@ GET /workflows/:name → { descriptor: WorkflowDescriptor }
### 图结构映射
```
WorkflowGraph → React Flow nodes + edges
WorkflowGraph.edges → React Flow nodes + edges
节点:
节点(自动从 edges 推导):
- __start__ → 圆形小节点(入口)
- role → 圆角矩形,显示 role name + description
- __end__ → 圆形小节点(终止)
边:
- FALLBACK → 虚线(dashed),无 label
- condition → 实线,label = condition.name
hover tooltip = condition.description
- condition → 实线,label = condition
hover tooltip = conditionDescription
```
### 布局
@@ -172,7 +157,7 @@ function getNodeStates(records: ThreadRecord[]): Map<string, "completed" | "acti
workflow-dashboard/src/
components/
workflow-graph/
types.ts — TransitionEdge, NodeState 等前端类型
types.ts — NodeState 等前端类型
index.ts — export { WorkflowGraph }
workflow-graph.tsx — 主组件,React Flow canvas
role-node.tsx — 自定义 role 节点
@@ -196,15 +181,22 @@ workflow-dashboard/src/
图高度固定 300px,React Flow 支持 pan + zoom,不影响下方 records 滚动。
## 分阶段实施
## 实施计划
### Phase 1: 数据层 + 静态图
### ~~Phase 0: 数据层~~ ✅ Done (PR #201)
1. `workflow-protocol` 中新增 `TransitionEdge` / `WorkflowGraph` 类型
2. `workflow-register``buildDescriptor` 中从 `ModeratorTable` 提取 graph
3. `stringifyWorkflowDescriptor` / `validateWorkflowDescriptor` 支持 graph 字段
4. CLI serve 的 `/workflows` API 返回 descriptor(含 graph
5. Dashboard 新增 `WorkflowGraph` 组件,静态渲染图
- [x] `WorkflowDefinition.moderator` `table` (ModeratorTable)
- [x] `WorkflowDescriptor` 新增 `graph: WorkflowGraph`
- [x] `buildDescriptor` 自动提取 graph
- [x] `validateWorkflowDescriptor` 校验 graph
### Phase 1: API + 静态图渲染
1. serve API:`GET /workflows/:name` 返回 descriptor(含 graph),或新增 `GET /workflows/:name/descriptor`
2. Dashboard `api.ts` 新增 `getWorkflowDescriptor(agent, name)` 函数
3. 安装 `@xyflow/react` + `@dagrejs/dagre`
4. 实现 `workflow-graph/` 组件集
5. ThreadDetail 中集成:从 thread-start record 拿 workflow name → 请求 descriptor → 渲染图
**产出**:打开 ThreadDetail 看到 workflow 流程图,无高亮。
@@ -219,21 +211,14 @@ workflow-dashboard/src/
### Phase 3: 交互增强
1. 点击节点滚动到对应 role 的 RecordCard
2. 边 hover 显示 condition description tooltip
2. 边 hover 显示 conditionDescription tooltip
3. 节点 hover 显示 role description + schema summary
**产出**:图和记录列表联动。
## 注意事项
- **向后兼容**:`graph` 字段为 `null` 时(旧 bundle),不渲染图,只显示 records
- **自循环边**:如 `coder → coder (FALLBACK)`,React Flow 支持自循环,dagre 需要特殊处理(self-edge 用 loop 路径)
- **大图性能**:dagre 在 <50 节点时性能无忧,workflow 通常 <10 个 role
- **暗色主题**:Dashboard 已使用 CSS variables,节点/边样式复用现有色板
- **不提交 pnpm-lock.yaml**
## 开放问题
1. **graph 放 descriptor 还是独立字段?** — 建议放 descriptor,因为它描述的就是 workflow 结构
2. **是否需要 WorkflowList 页也展示图?** — Phase 1 先只在 ThreadDetail,后续按需扩展
3. **`buildDescriptor` 需要 `ModeratorTable` 参数** — 当前 `buildDescriptor` 只接收 roles,需要扩展签名或在 bundle 注册时额外传入 table
+1
View File
@@ -6,6 +6,7 @@
],
"scripts": {
"build": "bunx tsc --build",
"build:bundles": "bash scripts/build-bundles.sh",
"check": "bunx tsc --build && biome check .",
"typecheck": "bunx tsc --build",
"format": "biome format --write .",
@@ -45,8 +45,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -100,8 +100,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -135,8 +135,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -187,6 +187,14 @@ describe("cli thread commands", () => {
}
expect(shown.value.includes('"threadId"')).toBe(true);
const parsed = JSON.parse(shown.value) as Record<string, unknown>;
expect(parsed.parentState).toBeNull();
const parsedSteps = parsed.steps as Array<Record<string, unknown>>;
for (const step of parsedSteps) {
expect(step).toHaveProperty("childThread");
expect(step.childThread).toBeNull();
}
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
@@ -1,9 +1,14 @@
import { readFile } from "node:fs/promises";
import { join } from "node:path";
import type { WorkflowDescriptor } from "@uncaged/workflow-protocol";
import {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
readWorkflowRegistry,
validateWorkflowDescriptor,
} from "@uncaged/workflow-register";
import { Hono } from "hono";
import { parse as parseYaml } from "yaml";
export function createWorkflowRoutes(storageRoot: string): Hono {
const app = new Hono();
@@ -35,7 +40,17 @@ export function createWorkflowRoutes(storageRoot: string): Hono {
if (entry === null) {
return c.json({ error: `workflow not found: ${name}` }, 404);
}
return c.json({ name, ...entry });
let descriptor: WorkflowDescriptor | null = null;
try {
const yamlPath = join(storageRoot, "bundles", `${entry.hash}.yaml`);
const yamlText = await readFile(yamlPath, "utf8");
const parsed: unknown = parseYaml(yamlText);
const validated = validateWorkflowDescriptor(parsed);
descriptor = validated.ok ? validated.value : null;
} catch {
descriptor = null;
}
return c.json({ name, ...entry, descriptor });
});
app.get("/:name/history", async (c) => {
@@ -1,4 +1,4 @@
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createCasStore, getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
import { FORK_BRANCH_ROLE, walkStateFramesNewestFirst } from "@uncaged/workflow-execute";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { END } from "@uncaged/workflow-runtime";
@@ -6,6 +6,21 @@ import { getGlobalCasDir } from "@uncaged/workflow-util";
import { resolveThreadRecord } from "../../thread-scan.js";
async function readParentStateFromStartNode(
cas: { get(hash: string): Promise<string | null> },
startHash: string,
): Promise<string | null> {
const yamlText = await cas.get(startHash);
if (yamlText === null) {
return null;
}
const parsed = parseCasThreadNode(yamlText);
if (parsed === null || parsed.kind !== "start") {
return null;
}
return parsed.node.payload.parentState;
}
export async function cmdThreadShow(
storageRoot: string,
threadId: string,
@@ -19,7 +34,15 @@ export async function cmdThreadShow(
const frames = await walkStateFramesNewestFirst(cas, resolved.head);
const chronological = [...frames].reverse();
const steps: Array<{ role: string; hash: string; timestamp: number; content: string }> = [];
const parentState = await readParentStateFromStartNode(cas, resolved.start);
const steps: Array<{
role: string;
hash: string;
timestamp: number;
content: string;
childThread: string | null;
}> = [];
for (const fr of chronological) {
if (fr.payload.role === END || fr.payload.role === FORK_BRANCH_ROLE) {
continue;
@@ -33,6 +56,7 @@ export async function cmdThreadShow(
payloadText !== null
? payloadText
: `(content not in CAS; contentHash=${fr.payload.content})`,
childThread: fr.payload.childThread,
});
}
@@ -41,6 +65,7 @@ export async function cmdThreadShow(
bundleHash: resolved.bundleHash,
head: resolved.head,
start: resolved.start,
parentState,
source: resolved.source,
steps,
};
@@ -10,8 +10,10 @@ function makeCtx(userContent: string): AgentContext {
content: userContent,
meta: {},
timestamp: 1,
parentState: null,
},
depth: 0,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
+1
View File
@@ -0,0 +1 @@
workflow-cas
@@ -14,6 +14,7 @@ function payload(
ancestors: partial.ancestors ?? [],
compact: partial.compact ?? null,
timestamp: partial.timestamp ?? 0,
childThread: partial.childThread ?? null,
};
}
@@ -62,4 +63,32 @@ describe("collectRefs", () => {
);
expect(refs).toEqual(["S2", "C2"]);
});
test("includes childThread hash when childThread is non-null", () => {
const refs = collectRefs(
payload({
role: "developer",
start: "S3",
content: "C3",
ancestors: ["A3"],
compact: null,
childThread: "CHILDEND000000000000001",
}),
);
expect(refs).toEqual(["S3", "C3", "A3", "CHILDEND000000000000001"]);
});
test("does not include childThread when childThread is null", () => {
const refs = collectRefs(
payload({
role: "developer",
start: "S4",
content: "C4",
ancestors: [],
compact: null,
childThread: null,
}),
);
expect(refs).toEqual(["S4", "C4"]);
});
});
@@ -0,0 +1,161 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { stringify } from "yaml";
import { createCasStore } from "../src/cas.js";
import { parseCasThreadNode, putStartNode, putStateNode } from "../src/nodes.js";
describe("putStartNode — parentState in refs", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("refs contains only promptHash when parentState is null", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("hello");
const startHash = await putStartNode(
cas,
{ name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0, parentState: null },
promptHash,
);
const blob = await cas.get(startHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.refs).toEqual([promptHash]);
expect(parsed.node.payload.parentState).toBeNull();
});
test("refs contains [promptHash, parentStateHash] when parentState is set", async () => {
const cas = createCasStore(join(dir, "cas"));
const parentStateHash = await cas.put("fake-parent-state");
const promptHash = await cas.put("child-prompt");
const startHash = await putStartNode(
cas,
{ name: "develop", hash: "BUNDLEBBBBBBBBB", depth: 1, parentState: parentStateHash },
promptHash,
);
const blob = await cas.get(startHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.refs).toEqual([promptHash, parentStateHash]);
expect(parsed.node.payload.parentState).toBe(parentStateHash);
});
});
describe("putStateNode — childThread in refs", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-state-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("refs does not include childThread when childThread is null", async () => {
const cas = createCasStore(join(dir, "cas"));
const startHash = await cas.put("start");
const contentHash = await cas.put("content");
const stateHash = await putStateNode(cas, {
role: "planner",
meta: {},
start: startHash,
content: contentHash,
ancestors: [],
compact: null,
timestamp: 1000,
childThread: null,
});
const blob = await cas.get(stateHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.refs).not.toContain("anything-else");
expect(parsed.node.refs).toEqual([startHash, contentHash]);
expect(parsed.node.payload.childThread).toBeNull();
});
test("refs includes childThread hash when childThread is set", async () => {
const cas = createCasStore(join(dir, "cas"));
const startHash = await cas.put("start");
const contentHash = await cas.put("content");
const childEndHash = await cas.put("child-end-state");
const stateHash = await putStateNode(cas, {
role: "developer",
meta: { pr: 42 },
start: startHash,
content: contentHash,
ancestors: [],
compact: null,
timestamp: 2000,
childThread: childEndHash,
});
const blob = await cas.get(stateHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.refs).toContain(childEndHash);
expect(parsed.node.payload.childThread).toBe(childEndHash);
});
});
describe("parseCasThreadNode — legacy node compatibility", () => {
test("start node without parentState field defaults to null", () => {
const yaml = stringify({
type: "start",
payload: { name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0 },
refs: ["PROMPTHASH00001"],
});
const parsed = parseCasThreadNode(yaml);
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.payload.parentState).toBeNull();
});
test("state node without childThread field defaults to null", () => {
const yaml = stringify({
type: "state",
payload: {
role: "planner",
meta: {},
start: "STARTHASH00001",
content: "CONTENTHASH0001",
ancestors: [],
compact: null,
timestamp: 1000,
},
refs: ["STARTHASH00001", "CONTENTHASH0001"],
});
const parsed = parseCasThreadNode(yaml);
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.payload.childThread).toBeNull();
});
});
@@ -9,5 +9,8 @@ export function collectRefs(payload: StateNode["payload"]): string[] {
if (payload.compact !== null) {
out.push(payload.compact);
}
if (payload.childThread !== null) {
out.push(payload.childThread);
}
return out;
}
+47 -3
View File
@@ -18,6 +18,10 @@ function isStartPayload(value: unknown): value is StartNodePayload {
if (!isRecord(value)) {
return false;
}
const parentState = value.parentState;
if (parentState !== undefined && parentState !== null && typeof parentState !== "string") {
return false;
}
return (
typeof value.name === "string" &&
typeof value.hash === "string" &&
@@ -25,6 +29,16 @@ function isStartPayload(value: unknown): value is StartNodePayload {
);
}
/** Normalizes a raw start payload, defaulting `parentState` to `null` for legacy nodes. */
function normalizeStartPayload(raw: StartNodePayload): StartNodePayload {
return {
name: raw.name,
hash: raw.hash,
depth: raw.depth,
parentState: raw.parentState ?? null,
};
}
function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(value)) {
return false;
@@ -41,6 +55,10 @@ function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(meta)) {
return false;
}
const childThread = value.childThread;
if (childThread !== undefined && childThread !== null && typeof childThread !== "string") {
return false;
}
return (
typeof value.role === "string" &&
typeof value.start === "string" &&
@@ -49,6 +67,20 @@ function isStatePayload(value: unknown): value is StateNodePayload {
);
}
/** Normalizes a raw state payload, defaulting `childThread` to `null` for legacy nodes. */
function normalizeStatePayload(raw: StateNodePayload): StateNodePayload {
return {
role: raw.role,
meta: raw.meta,
start: raw.start,
content: raw.content,
ancestors: raw.ancestors,
compact: raw.compact,
timestamp: raw.timestamp,
childThread: raw.childThread ?? null,
};
}
/** Parses a YAML CAS blob into a typed RFC v3 thread node (or legacy content layout with `children`). */
export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null {
let raw: unknown;
@@ -86,14 +118,22 @@ export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null
if (!isStartPayload(raw.payload)) {
return null;
}
const node: StartNode = { type: "start", payload: raw.payload, refs: [...refs] };
const node: StartNode = {
type: "start",
payload: normalizeStartPayload(raw.payload),
refs: [...refs],
};
return { kind: "start", node };
}
if (!isStatePayload(raw.payload)) {
return null;
}
const node: StateNode = { type: "state", payload: raw.payload, refs: [...refs] };
const node: StateNode = {
type: "state",
payload: normalizeStatePayload(raw.payload),
refs: [...refs],
};
return { kind: "state", node };
}
@@ -143,10 +183,14 @@ export async function putStartNode(
payload: StartNode["payload"],
promptHash: string,
): Promise<string> {
const refs = [promptHash];
if (payload.parentState !== null) {
refs.push(payload.parentState);
}
const node: StartNode = {
type: "start",
payload,
refs: [promptHash],
refs,
};
return store.put(serializeCasNode(node));
}
+2
View File
@@ -9,6 +9,8 @@
"preview": "vite preview"
},
"dependencies": {
"@dagrejs/dagre": "^3.0.0",
"@xyflow/react": "^12.10.2",
"react": "^19.2.6",
"react-dom": "^19.2.6",
"react-markdown": "^10.1.0",
+41
View File
@@ -104,6 +104,36 @@ export type WorkflowResultRecord = {
export type ThreadRecord = ThreadStartRecord | RoleRecord | WorkflowResultRecord;
export type WorkflowGraphEdge = {
from: string;
to: string;
condition: string;
conditionDescription: string | null;
};
export type WorkflowGraph = {
edges: readonly WorkflowGraphEdge[];
};
export type WorkflowRoleDescriptor = {
description: string;
schema: Record<string, unknown>;
};
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
graph: WorkflowGraph;
};
export type WorkflowDetail = {
name: string;
hash: string;
timestamp: number;
history: unknown[];
descriptor: WorkflowDescriptor | null;
};
// ── Gateway endpoints ───────────────────────────────────────────────
export function listAgents(): Promise<AgentEndpoint[]> {
@@ -117,6 +147,17 @@ export function listWorkflows(agent: string): Promise<{ workflows: WorkflowSumma
return fetchJson(agentBase(agent), "/workflows");
}
export async function getWorkflowDescriptor(
agent: string,
name: string,
): Promise<WorkflowDescriptor | null> {
const res = await fetchJson<WorkflowDetail>(
agentBase(agent),
`/workflows/${encodeURIComponent(name)}`,
);
return res.descriptor;
}
export function listThreads(agent: string): Promise<{ threads: ThreadSummary[] }> {
return fetchJson(agentBase(agent), "/threads");
}
@@ -56,11 +56,11 @@ function StartCard({ record }: { record: ThreadStartRecord }) {
);
}
function RoleMessage({ record }: { record: RoleRecord }) {
function RoleMessage({ record, highlighted }: { record: RoleRecord; highlighted: boolean }) {
const color = roleColor(record.role);
return (
<div
className="p-3 rounded-lg border text-sm"
className={`p-3 rounded-lg border text-sm ${highlighted ? "wf-record-card-highlight" : ""}`}
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<div className="flex items-center gap-2 mb-2">
@@ -114,12 +114,17 @@ function ResultCard({ record }: { record: WorkflowResultRecord }) {
);
}
export function RecordCard({ record }: { record: ThreadRecord }) {
type RecordCardProps = {
record: ThreadRecord;
highlighted: boolean;
};
export function RecordCard({ record, highlighted }: RecordCardProps) {
switch (record.type) {
case "thread-start":
return <StartCard record={record} />;
case "role":
return <RoleMessage record={record} />;
return <RoleMessage record={record} highlighted={highlighted} />;
case "workflow-result":
return <ResultCard record={record} />;
}
@@ -1,8 +1,17 @@
import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import {
getThread,
getWorkflowDescriptor,
killThread,
pauseThread,
resumeThread,
type ThreadRecord,
type WorkflowDescriptor,
} from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
import { RecordCard } from "./record-card.tsx";
import { type NodeState, WorkflowGraph } from "./workflow-graph/index.ts";
type Props = {
agent: string;
@@ -10,11 +19,94 @@ type Props = {
onBack: () => void;
};
function extractWorkflowName(records: readonly ThreadRecord[]): string | null {
for (const r of records) {
if (r.type === "thread-start") return r.workflow;
}
return null;
}
type GraphPanelProps = {
descriptor: WorkflowDescriptor;
workflowName: string | null;
nodeStates: Map<string, NodeState>;
onNodeClick: ((roleName: string) => void) | null;
};
function GraphPanel({ descriptor, workflowName, nodeStates, onNodeClick }: GraphPanelProps) {
const [open, setOpen] = useState(true);
const edgeCount = descriptor.graph.edges.length;
return (
<div
className="mb-4 rounded-lg border overflow-hidden"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<button
type="button"
onClick={() => setOpen((v) => !v)}
className="w-full flex items-center justify-between px-3 py-2 text-xs"
style={{ color: "var(--color-text-muted)" }}
>
<span className="font-mono">
{open ? "▼" : "▶"} Workflow graph
{workflowName !== null && (
<span className="ml-2" style={{ color: "var(--color-text)" }}>
{workflowName}
</span>
)}
</span>
<span>
{edgeCount} edge{edgeCount === 1 ? "" : "s"}
</span>
</button>
{open && (
<div style={{ height: 300, width: "100%" }}>
<WorkflowGraph
graph={descriptor.graph}
roles={descriptor.roles}
nodeStates={nodeStates}
onNodeClick={onNodeClick}
/>
</div>
)}
</div>
);
}
function computeNodeStates(records: readonly ThreadRecord[]): Map<string, NodeState> {
const states = new Map<string, NodeState>();
const roleRecords = records.filter(
(r): r is Extract<ThreadRecord, { type: "role" }> => r.type === "role",
);
const hasResult = records.some((r) => r.type === "workflow-result");
for (let i = 0; i < roleRecords.length; i++) {
const role = roleRecords[i].role;
const isLast = i === roleRecords.length - 1;
states.set(role, !hasResult && isLast ? "active" : "completed");
}
if (roleRecords.length > 0) {
states.set("__start__", "completed");
}
if (hasResult) {
states.set("__end__", "completed");
for (const [k, v] of states) {
if (v === "active") states.set(k, "completed");
}
}
return states;
}
export function ThreadDetail({ agent, threadId, onBack }: Props) {
const sse = useSSE(agent, threadId);
const { status, data, error } = useFetch(() => getThread(agent, threadId), [agent, threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
const firstCardByRoleRef = useRef<Map<string, HTMLDivElement>>(new Map());
const highlightTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const [highlightedRole, setHighlightedRole] = useState<string | null>(null);
const liveActive = sse.connected && !sse.completed;
const records = liveActive
@@ -23,6 +115,46 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
? data.records
: ([] as typeof sse.records);
const workflowName = useMemo(() => extractWorkflowName(records), [records]);
const descriptorFetch = useFetch<WorkflowDescriptor | null>(
() =>
workflowName === null ? Promise.resolve(null) : getWorkflowDescriptor(agent, workflowName),
[agent, workflowName],
);
const descriptor = descriptorFetch.status === "ok" ? descriptorFetch.data : null;
const nodeStates = useMemo(() => computeNodeStates(records), [records]);
const firstIndexByRole = useMemo(() => {
const m = new Map<string, number>();
for (let i = 0; i < records.length; i++) {
const r = records[i];
if (r.type === "role" && !m.has(r.role)) {
m.set(r.role, i);
}
}
return m;
}, [records]);
const handleGraphNodeClick = useCallback((roleName: string) => {
const el = firstCardByRoleRef.current.get(roleName);
if (el == null) return;
el.scrollIntoView({ behavior: "smooth", block: "center" });
if (highlightTimerRef.current !== null) clearTimeout(highlightTimerRef.current);
setHighlightedRole(roleName);
highlightTimerRef.current = setTimeout(() => {
setHighlightedRole(null);
highlightTimerRef.current = null;
}, 1500);
}, []);
useEffect(() => {
return () => {
if (highlightTimerRef.current !== null) clearTimeout(highlightTimerRef.current);
};
}, []);
// biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows
useEffect(() => {
recordsEndRef.current?.scrollIntoView({ behavior: "smooth" });
@@ -95,6 +227,15 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
</p>
)}
{descriptor !== null && descriptor.graph.edges.length > 0 && (
<GraphPanel
descriptor={descriptor}
workflowName={workflowName}
nodeStates={nodeStates}
onNodeClick={handleGraphNodeClick}
/>
)}
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
@@ -103,9 +244,26 @@ export function ThreadDetail({ agent, threadId, onBack }: Props) {
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{records.map((r, i) => (
<RecordCard key={`${threadId}-${i}`} record={r} />
))}
{records.map((r, i) => {
const key = `${threadId}-${i}`;
if (r.type === "role") {
const isFirstForRole = firstIndexByRole.get(r.role) === i;
const flash = highlightedRole === r.role;
return (
<div
key={key}
ref={(el) => {
if (!isFirstForRole) return;
if (el !== null) firstCardByRoleRef.current.set(r.role, el);
else firstCardByRoleRef.current.delete(r.role);
}}
>
<RecordCard record={r} highlighted={flash} />
</div>
);
}
return <RecordCard key={key} record={r} highlighted={false} />;
})}
<div ref={recordsEndRef} aria-hidden />
</div>
)}
@@ -0,0 +1,76 @@
import {
BaseEdge,
EdgeLabelRenderer,
type EdgeProps,
getBezierPath,
getSmoothStepPath,
} from "@xyflow/react";
import type { ConditionEdgeData } from "./types.ts";
export function ConditionEdge(props: EdgeProps) {
const {
id,
source,
target,
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
data,
markerEnd,
} = props;
const edgeData = data as ConditionEdgeData | undefined;
const isFallback = edgeData?.isFallback ?? false;
const isSelfLoop = source === target;
const [path, labelX, labelY] = isSelfLoop
? getSmoothStepPath({
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
borderRadius: 20,
})
: getBezierPath({
sourceX,
sourceY,
targetX,
targetY,
sourcePosition,
targetPosition,
});
const stroke = isFallback ? "var(--color-text-muted)" : "var(--color-text)";
const strokeDasharray = isFallback ? "5 4" : undefined;
return (
<>
<BaseEdge
id={id}
path={path}
markerEnd={markerEnd}
style={{ stroke, strokeWidth: 1.5, strokeDasharray }}
/>
{edgeData && !isFallback && edgeData.condition !== "" && (
<EdgeLabelRenderer>
<div
className="absolute px-1.5 py-0.5 rounded text-[10px] font-mono pointer-events-auto"
style={{
transform: `translate(-50%, -50%) translate(${labelX}px, ${labelY}px)`,
background: "var(--color-bg)",
border: "1px solid var(--color-border)",
color: "var(--color-text)",
}}
title={edgeData.conditionDescription ?? undefined}
>
{edgeData.condition}
</div>
</EdgeLabelRenderer>
)}
</>
);
}
@@ -0,0 +1,2 @@
export type { NodeState } from "./types.ts";
export { WorkflowGraph } from "./workflow-graph.tsx";
@@ -0,0 +1,69 @@
import { Handle, type NodeProps, Position } from "@xyflow/react";
import type { RoleNodeData } from "./types.ts";
function borderColor(state: RoleNodeData["state"]): string {
switch (state) {
case "completed":
return "var(--color-success)";
case "active":
return "var(--color-accent)";
default:
return "var(--color-border)";
}
}
function stateIcon(state: RoleNodeData["state"]): string | null {
if (state === "completed") return "✓";
if (state === "active") return "●";
return null;
}
export function RoleNode(props: NodeProps) {
const data = props.data as RoleNodeData;
const icon = stateIcon(data.state);
const isActive = data.state === "active";
const handleStyle = {
background: "var(--color-text-muted)",
width: 6,
height: 6,
border: "none",
} as const;
return (
<div
className={`px-3 py-2 rounded-md border-2 text-xs font-medium cursor-pointer ${isActive ? "wf-node-pulse" : ""}`}
style={{
width: 180,
height: 60,
background: "var(--color-surface)",
borderColor: borderColor(data.state),
color: "var(--color-text)",
display: "flex",
flexDirection: "column",
justifyContent: "center",
boxSizing: "border-box",
}}
title={data.description}
>
<Handle type="target" position={Position.Top} style={handleStyle} isConnectable={false} />
<div className="flex items-center gap-1.5 font-mono">
{icon !== null && (
<span
style={{
color: data.state === "active" ? "var(--color-accent)" : "var(--color-success)",
}}
>
{icon}
</span>
)}
<span className="truncate">{data.label}</span>
</div>
{data.description !== "" && (
<div className="text-[10px] truncate mt-0.5" style={{ color: "var(--color-text-muted)" }}>
{data.description}
</div>
)}
<Handle type="source" position={Position.Bottom} style={handleStyle} isConnectable={false} />
</div>
);
}
@@ -0,0 +1,57 @@
import { Handle, type NodeProps, Position } from "@xyflow/react";
import type { TerminalNodeData } from "./types.ts";
function borderColor(state: TerminalNodeData["state"]): string {
switch (state) {
case "completed":
return "var(--color-success)";
case "active":
return "var(--color-accent)";
default:
return "var(--color-border)";
}
}
function bgColor(state: TerminalNodeData["state"]): string {
if (state === "completed") return "var(--color-success)";
if (state === "active") return "var(--color-accent)";
return "var(--color-surface)";
}
export function TerminalNode(props: NodeProps) {
const data = props.data as TerminalNodeData;
const isStart = data.kind === "start";
const isActive = data.state === "active";
const handleStyle = {
background: "var(--color-text-muted)",
width: 6,
height: 6,
border: "none",
} as const;
return (
<div
className={`rounded-full border-2 flex items-center justify-center text-[10px] font-bold ${isActive ? "wf-node-pulse" : ""}`}
style={{
width: 40,
height: 40,
background: bgColor(data.state),
borderColor: borderColor(data.state),
color: data.state === "default" ? "var(--color-text-muted)" : "var(--color-bg)",
}}
title={isStart ? "Start" : "End"}
>
{isStart ? (
<Handle
type="source"
position={Position.Bottom}
style={handleStyle}
isConnectable={false}
/>
) : (
<Handle type="target" position={Position.Top} style={handleStyle} isConnectable={false} />
)}
{isStart ? "▶" : "■"}
</div>
);
}
@@ -0,0 +1,29 @@
import type { WorkflowGraphEdge } from "../../api.ts";
export type NodeState = "default" | "completed" | "active";
export type TerminalKind = "start" | "end";
export type RoleNodeData = {
label: string;
description: string;
state: NodeState;
[key: string]: unknown;
};
export type TerminalNodeData = {
kind: TerminalKind;
state: NodeState;
[key: string]: unknown;
};
export type ConditionEdgeData = {
condition: string;
conditionDescription: string | null;
isFallback: boolean;
[key: string]: unknown;
};
export type GraphInput = {
edges: readonly WorkflowGraphEdge[];
};
@@ -0,0 +1,127 @@
import Dagre from "@dagrejs/dagre";
import type { Edge, Node } from "@xyflow/react";
import { useMemo } from "react";
import type { WorkflowGraphEdge } from "../../api.ts";
import type { ConditionEdgeData, NodeState, RoleNodeData, TerminalNodeData } from "./types.ts";
const START_ID = "__start__";
const END_ID = "__end__";
const ROLE_NODE_WIDTH = 180;
const ROLE_NODE_HEIGHT = 60;
const TERMINAL_NODE_SIZE = 40;
type LayoutInput = {
edges: readonly WorkflowGraphEdge[];
roles: Record<string, { description: string }>;
nodeStates: Map<string, NodeState>;
};
type LayoutResult = {
nodes: Node[];
edges: Edge[];
};
function collectNodeIds(edges: readonly WorkflowGraphEdge[]): Set<string> {
const ids = new Set<string>();
for (const e of edges) {
ids.add(e.from);
ids.add(e.to);
}
return ids;
}
function nodeSize(id: string): { width: number; height: number } {
if (id === START_ID || id === END_ID) {
return { width: TERMINAL_NODE_SIZE, height: TERMINAL_NODE_SIZE };
}
return { width: ROLE_NODE_WIDTH, height: ROLE_NODE_HEIGHT };
}
function buildRoleNode(
id: string,
pos: { x: number; y: number },
roles: Record<string, { description: string }>,
state: NodeState,
): Node<RoleNodeData> {
const description = roles[id]?.description ?? "";
return {
id,
type: "role",
position: pos,
data: { label: id, description, state },
draggable: false,
};
}
function buildTerminalNode(
id: string,
pos: { x: number; y: number },
state: NodeState,
): Node<TerminalNodeData> {
return {
id,
type: "terminal",
position: pos,
data: { kind: id === START_ID ? "start" : "end", state },
draggable: false,
selectable: false,
};
}
function edgeKey(e: WorkflowGraphEdge): string {
return `${e.from}->${e.to}::${e.condition}`;
}
function buildEdge(e: WorkflowGraphEdge): Edge<ConditionEdgeData> {
const isFallback = e.condition === "FALLBACK";
return {
id: edgeKey(e),
source: e.from,
target: e.to,
type: "condition",
data: {
condition: e.condition,
conditionDescription: e.conditionDescription,
isFallback,
},
};
}
export function useLayout(input: LayoutInput): LayoutResult {
return useMemo(() => {
const ids = collectNodeIds(input.edges);
const g = new Dagre.graphlib.Graph({ multigraph: true }).setDefaultEdgeLabel(() => ({}));
g.setGraph({ rankdir: "TB", nodesep: 60, ranksep: 80 });
for (const id of ids) {
const size = nodeSize(id);
g.setNode(id, { width: size.width, height: size.height });
}
for (const e of input.edges) {
if (e.from === e.to) {
continue;
}
g.setEdge(e.from, e.to, {}, edgeKey(e));
}
Dagre.layout(g);
const nodes: Node[] = [];
for (const id of ids) {
const dagNode = g.node(id);
const size = nodeSize(id);
const pos = { x: dagNode.x - size.width / 2, y: dagNode.y - size.height / 2 };
const state = input.nodeStates.get(id) ?? "default";
if (id === START_ID || id === END_ID) {
nodes.push(buildTerminalNode(id, pos, state));
} else {
nodes.push(buildRoleNode(id, pos, input.roles, state));
}
}
const edges: Edge[] = input.edges.map(buildEdge);
return { nodes, edges };
}, [input.edges, input.roles, input.nodeStates]);
}
@@ -0,0 +1,79 @@
import {
Background,
type EdgeTypes,
MarkerType,
type Node,
type NodeTypes,
type OnNodeClick,
ReactFlow,
} from "@xyflow/react";
import "@xyflow/react/dist/style.css";
import { useMemo } from "react";
import type { WorkflowGraph as WorkflowGraphData } from "../../api.ts";
import { ConditionEdge } from "./condition-edge.tsx";
import { RoleNode } from "./role-node.tsx";
import { TerminalNode } from "./terminal-node.tsx";
import type { NodeState } from "./types.ts";
import { useLayout } from "./use-layout.ts";
type Props = {
graph: WorkflowGraphData;
roles: Record<string, { description: string }>;
nodeStates: Map<string, NodeState>;
onNodeClick: ((roleName: string) => void) | null;
};
const nodeTypes: NodeTypes = {
role: RoleNode,
terminal: TerminalNode,
};
const edgeTypes: EdgeTypes = {
condition: ConditionEdge,
};
function handleRoleNodeClick(onRoleClick: (roleName: string) => void, node: Node): void {
if (node.type !== "role") return;
onRoleClick(node.id);
}
export function WorkflowGraph({ graph, roles, nodeStates, onNodeClick }: Props) {
const layout = useLayout({ edges: graph.edges, roles, nodeStates });
const onNodeClickHandler: OnNodeClick | undefined =
onNodeClick !== null ? (_e, node) => handleRoleNodeClick(onNodeClick, node) : undefined;
const styledEdges = useMemo(
() =>
layout.edges.map((e) => ({
...e,
markerEnd: {
type: MarkerType.ArrowClosed,
width: 14,
height: 14,
color: "var(--color-text)",
},
})),
[layout.edges],
);
return (
<ReactFlow
nodes={layout.nodes}
edges={styledEdges}
nodeTypes={nodeTypes}
edgeTypes={edgeTypes}
onNodeClick={onNodeClickHandler}
fitView
fitViewOptions={{ padding: 0.15 }}
nodesDraggable={false}
nodesConnectable={false}
elementsSelectable={false}
proOptions={{ hideAttribution: true }}
colorMode="dark"
style={{ background: "var(--color-bg)" }}
>
<Background color="var(--color-border)" gap={20} size={1} />
</ReactFlow>
);
}
+30
View File
@@ -19,3 +19,33 @@ body {
color: var(--color-text);
font-family: "Inter", system-ui, -apple-system, sans-serif;
}
@keyframes wf-node-pulse {
0%,
100% {
box-shadow: 0 0 0 0 rgba(124, 109, 240, 0.55);
}
50% {
box-shadow: 0 0 0 6px rgba(124, 109, 240, 0);
}
}
.wf-node-pulse {
animation: wf-node-pulse 1.6s ease-in-out infinite;
}
@keyframes wf-record-card-highlight {
0% {
border-color: var(--color-accent);
}
35% {
border-color: var(--color-accent);
}
100% {
border-color: var(--color-border);
}
}
.wf-record-card-highlight {
animation: wf-record-card-highlight 1.5s ease-out forwards;
}
+1
View File
@@ -0,0 +1 @@
workflow-execute
@@ -35,6 +35,7 @@ function noLogger(): (tag: string, content: string) => void {
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
depth: 0,
parentStateHash: null,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -144,9 +145,9 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("plan-text");
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1], childThread: null };
const h2 = await runtime.cas.put("code-text");
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2], childThread: null };
return { returnCode: 0, summary: "done" };
};
@@ -210,7 +211,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("only-step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "completed" };
};
@@ -261,7 +262,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "done" };
};
@@ -46,6 +46,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -59,6 +60,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [],
compact: null,
timestamp: 1,
childThread: null,
} satisfies StateNodePayload);
const c2 = await putContentNodeWithRefs(cas, "c1", []);
@@ -70,6 +72,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [h1],
compact: null,
timestamp: 2,
childThread: null,
} satisfies StateNodePayload);
const ec = await putContentNodeWithRefs(cas, "", []);
@@ -81,6 +84,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [h1],
compact: null,
timestamp: 3,
childThread: null,
} satisfies StateNodePayload);
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
@@ -0,0 +1,306 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasStore } from "@uncaged/workflow-cas";
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
import type { StartNode, StateNode } from "@uncaged/workflow-protocol";
import type {
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { executeThread } from "../src/engine/engine.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
const TEST_REGISTRY_YAML = `config:
maxDepth: 3
supervisorInterval: 0
providers:
stub:
baseUrl: http://127.0.0.1:9
apiKey: test
models:
default: stub/m
workflows: {}
`;
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
depth: 0,
parentStateHash: null,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot: "/tmp/never",
...overrides,
};
}
async function setupStorage(): Promise<{
storageRoot: string;
casDir: string;
}> {
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-merkle-"));
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
return { storageRoot, casDir };
}
async function loadStartNode(cas: CasStore, endHash: string): Promise<StartNode> {
const endBlob = await cas.get(endHash);
const endParsed = parseCasThreadNode(endBlob ?? "");
if (endParsed?.kind !== "state") throw new Error("expected state node");
const startBlob = await cas.get(endParsed.node.payload.start);
const startParsed = parseCasThreadNode(startBlob ?? "");
if (startParsed?.kind !== "start") throw new Error("expected start node");
return startParsed.node;
}
async function loadStateNode(cas: CasStore, hash: string): Promise<StateNode> {
const blob = await cas.get(hash);
const parsed = parseCasThreadNode(blob ?? "");
if (parsed?.kind !== "state") throw new Error("expected state node");
return parsed.node;
}
describe("Merkle call stack — cross-thread DAG linking (Phase 2)", () => {
let storageRoot: string;
let casDir: string;
beforeEach(async () => {
const setup = await setupStorage();
storageRoot = setup.storageRoot;
casDir = setup.casDir;
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("parentStateHash is written into child start node's parentState and refs", async () => {
const cas = createCasStore(casDir);
// biome-ignore lint/correctness/useYield: testing start-only path
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "parent done" };
};
const parentResult = await executeThread(
parentWf,
"parent-wf",
{ prompt: "parent task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "P_THREAD_01",
hash: "PARENTHASH0001",
infoJsonlPath: join(storageRoot, "logs", "PARENTHASH0001", "P1.info.jsonl"),
cas,
},
noLogger(),
);
// biome-ignore lint/correctness/useYield: testing start-only path
const childWf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "child done" };
};
const childResult = await executeThread(
childWf,
"child-wf",
{ prompt: "child task", steps: [] },
makeOptions({ storageRoot, depth: 1, parentStateHash: parentResult.rootHash }),
{
threadId: "C_THREAD_01",
hash: "CHILDHASH00001",
infoJsonlPath: join(storageRoot, "logs", "CHILDHASH00001", "C1.info.jsonl"),
cas,
},
noLogger(),
);
const childStart = await loadStartNode(cas, childResult.rootHash);
expect(childStart.payload.parentState).toBe(parentResult.rootHash);
expect(childStart.refs).toContain(parentResult.rootHash);
});
test("childThread on parent state node points to child's final state and is in refs", async () => {
const cas = createCasStore(casDir);
const childFinalHash = "CHILD_FINAL_001";
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("developer output");
yield {
role: "developer",
contentHash: h,
meta: { action: "delegate" },
refs: [h],
childThread: childFinalHash,
};
return { returnCode: 0, summary: "parent complete" };
};
const result = await executeThread(
parentWf,
"parent-wf",
{ prompt: "parent task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "P_THREAD_02",
hash: "CTHREAD_TEST01",
infoJsonlPath: join(storageRoot, "logs", "CTHREAD_TEST01", "P2.info.jsonl"),
cas,
},
noLogger(),
);
const endNode = await loadStateNode(cas, result.rootHash);
const devStateHash = endNode.payload.ancestors[0] ?? "";
const devNode = await loadStateNode(cas, devStateHash);
expect(devNode.payload.role).toBe("developer");
expect(devNode.payload.childThread).toBe(childFinalHash);
expect(devNode.refs).toContain(childFinalHash);
});
test("parent state with no child has childThread: null", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("prep output");
yield { role: "preparer", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "done" };
};
const result = await executeThread(
wf,
"test-wf",
{ prompt: "task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "NULL_CT_01",
hash: "NULLCT_TEST001",
infoJsonlPath: join(storageRoot, "logs", "NULLCT_TEST001", "N1.info.jsonl"),
cas,
},
noLogger(),
);
const endNode = await loadStateNode(cas, result.rootHash);
const prepHash = endNode.payload.ancestors[0] ?? "";
const prepNode = await loadStateNode(cas, prepHash);
expect(prepNode.payload.childThread).toBeNull();
expect(prepNode.refs).not.toContain(null);
});
test("full bidirectional: child parentState is traversable to parent's context", async () => {
const cas = createCasStore(casDir);
const parentHash = "BIDIR_PARENT01";
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("preparation output");
yield {
role: "preparer",
contentHash: h1,
meta: { repoPath: "/test" },
refs: [h1],
childThread: null,
};
const h2 = await runtime.cas.put("developer output");
yield {
role: "developer",
contentHash: h2,
meta: { action: "code" },
refs: [h2],
childThread: "CHILD_END_HASH1",
};
return { returnCode: 0, summary: "all done" };
};
const observedHeads: string[] = [];
const opts = makeOptions({
storageRoot,
awaitAfterEachYield: async () => {
const bundleDir = join(storageRoot, "bundles", parentHash);
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, { head: string }>;
const head = parsed.BIDIR_T_001?.head ?? null;
if (head !== null) observedHeads.push(head);
},
});
await executeThread(
parentWf,
"bidir-wf",
{ prompt: "bidir test", steps: [] },
opts,
{
threadId: "BIDIR_T_001",
hash: parentHash,
infoJsonlPath: join(storageRoot, "logs", parentHash, "BD1.info.jsonl"),
cas,
},
noLogger(),
);
expect(observedHeads.length).toBe(2);
const preparerStateHash = observedHeads[0] ?? "";
// Execute child with parentState pointing to parent's preparer state
// biome-ignore lint/correctness/useYield: testing start-only path
const childWf: WorkflowFn = async function* (
_t: ThreadContext,
_r: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "child ok" };
};
const childResult = await executeThread(
childWf,
"bidir-child",
{ prompt: "child bidir", steps: [] },
makeOptions({ storageRoot, depth: 1, parentStateHash: preparerStateHash }),
{
threadId: "BIDIR_C_001",
hash: "BIDIR_CHILD001",
infoJsonlPath: join(storageRoot, "logs", "BIDIR_CHILD001", "BC1.info.jsonl"),
cas,
},
noLogger(),
);
// Upward traversal: child start → parentState → preparer state → meta.repoPath
const childStart = await loadStartNode(cas, childResult.rootHash);
expect(childStart.payload.parentState).toBe(preparerStateHash);
const parentPrep = await loadStateNode(cas, preparerStateHash);
expect(parentPrep.payload.meta.repoPath).toBe("/test");
});
});
@@ -94,6 +94,7 @@ async function appendStateForStep(params: {
meta: Record<string, unknown>;
refs: readonly string[];
timestamp: number;
childThread: string | null;
}): Promise<{ stateHash: string; chain: ChainState }> {
const text = await getContentMerklePayload(params.cas, params.contentHash);
if (text === null) {
@@ -112,6 +113,7 @@ async function appendStateForStep(params: {
ancestors,
compact: null,
timestamp: params.timestamp,
childThread: params.childThread,
};
const stateHash = await putStateNode(params.cas, payload);
return {
@@ -137,6 +139,7 @@ async function appendEndState(params: {
ancestors,
compact: null,
timestamp: params.timestamp,
childThread: null,
};
return putStateNode(params.cas, payload);
}
@@ -329,6 +332,7 @@ async function driveWorkflowGenerator(params: {
meta: step.meta,
refs: step.refs,
timestamp: ts,
childThread: step.childThread ?? null,
});
chain = written_.chain;
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
@@ -439,6 +443,7 @@ export async function executeThread(
name: workflowName,
hash: io.hash,
depth: options.depth,
parentState: options.parentStateHash,
},
promptHash,
);
@@ -466,6 +471,7 @@ export async function executeThread(
meta: row.meta,
refs: row.refs,
timestamp: row.timestamp,
childThread: null,
});
chain = written.chain;
await publishHead({
@@ -487,11 +493,13 @@ export async function executeThread(
const thread: ThreadContext = {
threadId: io.threadId,
depth: options.depth,
bundleHash: io.hash,
start: {
role: START,
content: input.prompt,
meta: {},
timestamp: nowMs,
parentState: options.parentStateHash,
},
steps: input.steps.map((out, i) => ({
role: out.role,
@@ -144,6 +144,7 @@ async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Pr
contentHash: payload.content,
meta: payload.meta,
refs,
childThread: payload.childThread,
};
}
@@ -240,6 +241,7 @@ async function buildForkContinuation(params: {
ancestors: ancestorsMarker,
compact: null,
timestamp: Date.now(),
childThread: null,
};
const markerHash = await putStateNode(cas, markerPayload);
@@ -41,6 +41,8 @@ export type PrefilledDiskStep = {
export type ExecuteThreadOptions = {
/** Passed to the bundle thread context as `ThreadContext.depth`. */
depth: number;
/** Parent thread's head state hash at spawn time; `null` for top-level threads. */
parentStateHash: string | null;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
awaitAfterEachYield: () => Promise<void>;
@@ -72,11 +72,13 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
if (meta === null || typeof meta !== "object") {
return null;
}
const childThread = obj.childThread;
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
childThread: typeof childThread === "string" ? childThread : null,
};
}
@@ -497,6 +499,7 @@ async function main(): Promise<void> {
{ prompt: cmd.prompt, steps: cmd.steps },
{
...cmd.options,
parentStateHash: null,
signal: ac.signal,
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
forkSourceThreadId: cmd.forkSourceThreadId,
@@ -6,7 +6,7 @@ import {
getRegisteredWorkflow,
readWorkflowRegistry,
} from "@uncaged/workflow-register";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import type { AgentContext, AgentFn, AgentFnResult } from "@uncaged/workflow-runtime";
import {
createLogger,
generateUlid,
@@ -14,7 +14,7 @@ import {
getGlobalCasDir,
} from "@uncaged/workflow-util";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
import { executeThread, getBundleDir, readThreadsIndex } from "./engine/index.js";
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
@@ -37,6 +37,13 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul
return getDefaultWorkflowStorageRoot();
}
async function readParentHeadState(storageRoot: string, ctx: AgentContext): Promise<string | null> {
const bundleDir = getBundleDir(storageRoot, ctx.bundleHash);
const index = await readThreadsIndex(bundleDir);
const entry = index[ctx.threadId] ?? null;
return entry !== null ? entry.head : null;
}
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
@@ -45,30 +52,30 @@ export function workflowAsAgent(
workflowName: string,
options: WorkflowAsAgentOptions | null = null,
): AgentFn {
return async (ctx: AgentContext): Promise<string> => {
return async (ctx: AgentContext): Promise<AgentFnResult> => {
const nextDepth = ctx.depth + 1;
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
const registryResult = await readWorkflowRegistry(storageRoot);
if (!registryResult.ok) {
return `ERROR: failed to read workflow registry: ${registryResult.error.message}`;
return { output: `ERROR: failed to read workflow registry: ${registryResult.error.message}`, childThread: null };
}
const maxDepth = workflowAsAgentMaxDepth(registryResult.value.config);
if (nextDepth > maxDepth) {
return `ERROR: workflow-as-agent depth limit exceeded (max ${maxDepth})`;
return { output: `ERROR: workflow-as-agent depth limit exceeded (max ${maxDepth})`, childThread: null };
}
const entry = getRegisteredWorkflow(registryResult.value, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
return { output: `ERROR: workflow "${workflowName}" not found in registry`, childThread: null };
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExportsResult = await extractBundleExports(bundlePath, { storageRoot });
if (!bundleExportsResult.ok) {
return `ERROR: ${bundleExportsResult.error}`;
return { output: `ERROR: ${bundleExportsResult.error}`, childThread: null };
}
const input = {
@@ -89,6 +96,8 @@ export function workflowAsAgent(
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const signalNever = new AbortController();
const parentHeadState = await readParentHeadState(storageRoot, ctx);
try {
const result = await executeThread(
bundleExportsResult.value.run,
@@ -96,6 +105,7 @@ export function workflowAsAgent(
input,
{
depth: nextDepth,
parentStateHash: parentHeadState,
signal: signalNever.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
@@ -107,10 +117,11 @@ export function workflowAsAgent(
io,
logger,
);
return `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
const summary = `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
return { output: summary, childThread: result.rootHash };
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;
return { output: `ERROR: ${message}`, childThread: null };
}
};
}
+1
View File
@@ -0,0 +1 @@
workflow-protocol
@@ -21,11 +21,13 @@ function makeCtx(roles: (keyof TestMeta & string)[]): ModeratorContext<TestMeta>
return {
threadId: "test-thread",
depth: 0,
bundleHash: "TESTHASH00001",
start: {
role: START,
content: "test",
meta: {},
timestamp: Date.now(),
parentState: null,
} as StartStep,
steps,
};
@@ -4,6 +4,8 @@ export type StartNodePayload = {
name: string;
hash: string;
depth: number;
/** Parent thread's head state hash at spawn time. `null` for top-level workflows. */
parentState: string | null;
};
export type StartNode = {
@@ -20,6 +22,8 @@ export type StateNodePayload = {
ancestors: string[];
compact: string | null;
timestamp: number;
/** Child thread's final state hash (workflow-as-agent). `null` when no child spawned. */
childThread: string | null;
};
export type StateNode = {
+1
View File
@@ -13,6 +13,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
+6 -1
View File
@@ -54,6 +54,7 @@ export type RoleOutput = {
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
childThread: string | null;
};
export type StartStep = {
@@ -61,6 +62,7 @@ export type StartStep = {
content: string;
meta: Record<string, never>;
timestamp: number;
parentState: string | null;
};
export type RoleStep<M extends RoleMeta> = {
@@ -76,6 +78,7 @@ export type RoleStep<M extends RoleMeta> = {
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
depth: number;
bundleHash: string;
start: StartStep;
steps: RoleStep<M>[];
};
@@ -140,7 +143,9 @@ export type ExtractFn = <T extends Record<string, unknown>>(
contentHash: string,
) => Promise<ExtractResult<T>>;
export type AgentFn = (ctx: AgentContext) => Promise<string>;
export type AgentFnResult = string | { output: string; childThread: string | null };
export type AgentFn = (ctx: AgentContext) => Promise<AgentFnResult>;
export type AgentBinding = {
agent: AgentFn;
+1
View File
@@ -0,0 +1 @@
workflow-register
@@ -37,13 +37,7 @@ function isAllowedImportSpecifier(spec: string): boolean {
if (spec.startsWith(".") || spec.startsWith("/") || spec.startsWith("file:")) {
return false;
}
if (
spec === "@uncaged/workflow" ||
spec === "@uncaged/workflow-runtime" ||
spec === "@uncaged/workflow-protocol" ||
spec === "@uncaged/workflow-cas" ||
spec === "@uncaged/workflow-util"
) {
if (spec.startsWith("@uncaged/workflow")) {
return true;
}
return isBuiltin(spec);
@@ -114,7 +108,8 @@ function bindingInitializerIsCallable(init: Node): boolean {
return (
init.type === "FunctionExpression" ||
init.type === "ArrowFunctionExpression" ||
init.type === "CallExpression"
init.type === "CallExpression" ||
init.type === "Identifier"
);
}
@@ -48,6 +48,9 @@ export async function ensureUncagedWorkflowSymlink(storageRoot: string): Promise
{ name: "workflow-runtime", dir: siblingPackageDir("workflow-runtime") },
{ name: "workflow-cas", dir: siblingPackageDir("workflow-cas") },
{ name: "workflow-protocol", dir: siblingPackageDir("workflow-protocol") },
{ name: "workflow-util", dir: siblingPackageDir("workflow-util") },
{ name: "workflow-execute", dir: siblingPackageDir("workflow-execute") },
{ name: "workflow-register", dir: siblingPackageDir("workflow-register") },
];
for (const pkg of packages) {
+1
View File
@@ -0,0 +1 @@
workflow-runtime
@@ -27,7 +27,7 @@ describe("buildThreadContext", () => {
const bundleHash = "BHAAAAAAAAAAA";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, depth: 2 },
{ name: "demo", hash: bundleHash, depth: 2, parentState: null },
promptHash,
);
@@ -41,6 +41,7 @@ describe("buildThreadContext", () => {
ancestors: [],
compact: null,
timestamp: 1000,
childThread: null,
});
const chCode = await putContentNodeWithRefs(cas, "code body", []);
@@ -52,6 +53,7 @@ describe("buildThreadContext", () => {
ancestors: [statePlan],
compact: null,
timestamp: 2000,
childThread: null,
});
const ctx = await buildThreadContext(stateCode, cas);
@@ -71,7 +73,7 @@ describe("buildThreadContext", () => {
const promptHash = await cas.put("only-prompt");
const startHash = await putStartNode(
cas,
{ name: "solo", hash: "BHBBBBBBBBBBB", depth: 1 },
{ name: "solo", hash: "BHBBBBBBBBBBB", depth: 1, parentState: null },
promptHash,
);
@@ -87,7 +89,7 @@ describe("buildThreadContext", () => {
const bundleHash = "BHCCCCCCCCCCC";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, depth: 0 },
{ name: "demo", hash: bundleHash, depth: 0, parentState: null },
promptHash,
);
@@ -100,6 +102,7 @@ describe("buildThreadContext", () => {
ancestors: [],
compact: null,
timestamp: 500,
childThread: null,
});
const endContent = await putContentNodeWithRefs(cas, "finished", []);
@@ -111,6 +114,7 @@ describe("buildThreadContext", () => {
ancestors: [state1],
compact: null,
timestamp: 600,
childThread: null,
});
const ctx = await buildThreadContext(endState, cas);
@@ -54,11 +54,13 @@ async function threadFromStartHead<M extends RoleMeta>(
return {
threadId: "",
depth: p.depth,
bundleHash: p.hash,
start: {
role: START,
content: prompt,
meta: {},
timestamp: 0,
parentState: p.parentState,
},
steps: [],
};
@@ -113,11 +115,13 @@ async function threadFromStateHead<M extends RoleMeta>(
return {
threadId: "",
depth: sp.depth,
bundleHash: sp.hash,
start: {
role: START,
content: prompt,
meta: {},
timestamp: firstTs,
parentState: sp.parentState,
},
steps,
};
@@ -7,6 +7,7 @@ import {
type AgentBinding,
type AgentContext,
type AgentFn,
type AgentFnResult,
END,
type ModeratorContext,
type RoleDefinition,
@@ -50,6 +51,16 @@ function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[]
return out;
}
function normalizeAgentResult(result: AgentFnResult): {
output: string;
childThread: string | null;
} {
if (typeof result === "string") {
return { output: result, childThread: null };
}
return result;
}
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
const overrides = binding.overrides;
const overrideFn: AgentFn | undefined =
@@ -89,9 +100,9 @@ async function advanceOneRound<M extends RoleMeta>(
};
const agent = agentForRole(binding, next);
const raw = await agent(agentCtx as unknown as AgentContext);
const agentResult = normalizeAgentResult(await agent(agentCtx as unknown as AgentContext));
const agentContentHash = await putContentNodeWithRefs(runtime.cas, raw, []);
const agentContentHash = await putContentNodeWithRefs(runtime.cas, agentResult.output, []);
const extracted = await runtime.extract(
roleDef.schema as z.ZodType<Record<string, unknown>>,
@@ -125,6 +136,7 @@ async function advanceOneRound<M extends RoleMeta>(
contentHash: step.contentHash,
meta: step.meta,
refs: step.refs,
childThread: agentResult.childThread,
},
step,
};
+1
View File
@@ -5,6 +5,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
+1
View File
@@ -7,6 +7,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
@@ -22,6 +22,7 @@ function makeStart(): ModeratorContext<DevelopMeta>["start"] {
content: "Implement the feature",
meta: {},
timestamp: 0,
parentState: null,
};
}
@@ -29,6 +30,7 @@ function makeCtx(steps: ModeratorContext<DevelopMeta>["steps"]): ModeratorContex
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: makeStart(),
steps,
};
@@ -2,19 +2,17 @@
* develop bundle entry — 小橘 🍊
*
* All roles use cursor-agent with workspace auto-extracted from context.
*
* ENV VARS: WORKFLOW_LLM_API_KEY (required), WORKFLOW_LLM_BASE_URL,
* WORKFLOW_LLM_MODEL, WORKFLOW_CURSOR_MODEL, WORKFLOW_CURSOR_TIMEOUT.
* Must be set before the first worker spawn — workers are persistent and
* do not pick up env changes after initial import.
*/
import { createCursorAgent } from "@uncaged/workflow-agent-cursor";
import type { AgentContext, AgentFn, AgentFnResult } from "@uncaged/workflow-runtime";
import { createWorkflow } from "@uncaged/workflow-runtime";
import { buildDevelopDescriptor, developWorkflowDefinition } from "./src/index.js";
function requireEnv(name: string): string {
const value = process.env[name];
if (value === undefined || value === "") {
throw new Error(`missing required env var: ${name}`);
}
return value;
}
function optionalEnv(name: string): string | null {
const value = process.env[name];
if (value === undefined || value === "") {
@@ -23,23 +21,39 @@ function optionalEnv(name: string): string | null {
return value;
}
const llmProvider = {
baseUrl:
optionalEnv("WORKFLOW_LLM_BASE_URL") ?? "https://dashscope.aliyuncs.com/compatible-mode/v1",
apiKey: requireEnv("WORKFLOW_LLM_API_KEY"),
model: optionalEnv("WORKFLOW_LLM_MODEL") ?? "qwen-plus",
};
function requireEnv(name: string): string {
const value = process.env[name];
if (value === undefined || value === "") {
throw new Error(`missing required env var: ${name}`);
}
return value;
}
const agent = createCursorAgent({
model: optionalEnv("WORKFLOW_CURSOR_MODEL"),
timeout: optionalEnv("WORKFLOW_CURSOR_TIMEOUT")
? Number(optionalEnv("WORKFLOW_CURSOR_TIMEOUT"))
: 0,
workspace: null,
llmProvider,
});
function createLazyAgent(): AgentFn {
let cached: AgentFn | null = null;
return (ctx: AgentContext): Promise<AgentFnResult> => {
if (cached === null) {
const llmProvider = {
baseUrl:
optionalEnv("WORKFLOW_LLM_BASE_URL") ??
"https://dashscope.aliyuncs.com/compatible-mode/v1",
apiKey: requireEnv("WORKFLOW_LLM_API_KEY"),
model: optionalEnv("WORKFLOW_LLM_MODEL") ?? "qwen-plus",
};
cached = createCursorAgent({
model: optionalEnv("WORKFLOW_CURSOR_MODEL"),
timeout: optionalEnv("WORKFLOW_CURSOR_TIMEOUT")
? Number(optionalEnv("WORKFLOW_CURSOR_TIMEOUT"))
: 0,
workspace: null,
llmProvider,
});
}
return cached(ctx);
};
}
const wf = createWorkflow(developWorkflowDefinition, { agent, overrides: null });
const wf = createWorkflow(developWorkflowDefinition, { agent: createLazyAgent(), overrides: null });
export const descriptor = buildDevelopDescriptor();
export const run = wf;
@@ -12,6 +12,7 @@
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow-agent-cursor": "workspace:*",
"@uncaged/workflow-register": "workspace:*",
"@uncaged/workflow-runtime": "workspace:*",
"zod": "^4.0.0"
@@ -107,6 +107,7 @@ function makeStart(): ModeratorContext<SolveIssueMeta>["start"] {
content: "Fix the flaky login test",
meta: {},
timestamp: 0,
parentState: null,
};
}
@@ -116,6 +117,7 @@ function makeCtx(
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: makeStart(),
steps,
};
@@ -181,11 +183,13 @@ function makeThread(prompt: string) {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: {
role: START,
content: prompt,
meta: {},
timestamp: Date.now(),
parentState: null,
},
steps: [],
};
@@ -3,6 +3,10 @@
*
* preparer + submitter → hermes agent
* developer → workflow-as-agent (delegates to "develop" workflow)
*
* ENV VARS: WORKFLOW_HERMES_MODEL, WORKFLOW_HERMES_TIMEOUT.
* Must be set before the first worker spawn — workers are persistent and
* do not pick up env changes after initial import.
*/
import { createHermesAgent } from "@uncaged/workflow-agent-hermes";
import { workflowAsAgent } from "@uncaged/workflow-execute";
@@ -12,13 +12,14 @@
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow-agent-hermes": "workspace:*",
"@uncaged/workflow-execute": "workspace:*",
"@uncaged/workflow-register": "workspace:*",
"@uncaged/workflow-runtime": "workspace:*",
"zod": "^4.0.0"
},
"devDependencies": {
"@uncaged/workflow-cas": "workspace:*",
"@uncaged/workflow-execute": "workspace:*",
"@uncaged/workflow-protocol": "workspace:*"
}
}
@@ -3,12 +3,13 @@ import { type AgentContext, START } from "@uncaged/workflow-runtime";
import { buildAgentPrompt } from "../src/index.js";
function startTask(content: string): AgentContext["start"] {
function startTask(content: string, parentState: string | null = null): AgentContext["start"] {
return {
role: START,
content,
meta: {},
timestamp: 1,
parentState,
};
}
@@ -17,6 +18,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("fix the bug"),
depth: 0,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
@@ -33,6 +35,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("user task"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
steps: [
@@ -61,6 +64,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("first message full: task content here"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
steps: [
@@ -92,6 +96,35 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("parentState null omits Parent Context section", async () => {
const ctx: AgentContext = {
start: startTask("top-level task"),
depth: 0,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
};
const text = await buildAgentPrompt(ctx);
expect(text).not.toContain("## Parent Context");
});
test("parentState non-null includes Parent Context section with hash", async () => {
const parentHash = "01PARENTSTATE0000000000001";
const ctx: AgentContext = {
start: startTask("child task", parentHash),
depth: 1,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
};
const text = await buildAgentPrompt(ctx);
expect(text).toContain("## Parent Context");
expect(text).toContain(parentHash);
expect(text).toContain(`uncaged-workflow cas get ${parentHash}`);
});
test("middle steps show meta summary only and latest shows hash", async () => {
const ha = "01HASHA00000000000000000001";
const hb = "01HASHB00000000000000000001";
@@ -99,6 +132,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("start"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
steps: [
@@ -5,6 +5,19 @@ export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
const lines: string[] = [];
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
if (ctx.start.parentState !== null) {
lines.push("## Parent Context");
lines.push(
"This workflow was spawned by a parent workflow. The parent's state at spawn time is available at hash: " +
ctx.start.parentState,
);
lines.push(
`Use \`uncaged-workflow cas get ${ctx.start.parentState}\` to inspect the parent's context and trace back through its steps.`,
);
lines.push("");
}
lines.push("## Task");
lines.push(ctx.start.content);
+1
View File
@@ -0,0 +1 @@
workflow-util
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env bash
set -euo pipefail
# Packages externalized from bundles — resolved at runtime via symlinks
# created by ensureUncagedWorkflowSymlink in workflow-register.
EXTERNAL=(
--external @uncaged/workflow-runtime
--external @uncaged/workflow-protocol
--external @uncaged/workflow-cas
--external @uncaged/workflow-util
--external @uncaged/workflow-execute
--external @uncaged/workflow-register
)
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
cd "$REPO_ROOT"
mkdir -p dist
echo "Building develop bundle..."
bun build packages/workflow-template-develop/bundle-entry.ts \
--bundle --format esm --target bun \
"${EXTERNAL[@]}" \
--outfile dist/develop.esm.js
echo "Building solve-issue bundle..."
bun build packages/workflow-template-solve-issue/bundle-entry.ts \
--bundle --format esm --target bun \
"${EXTERNAL[@]}" \
--outfile dist/solve-issue.esm.js
echo "Done. Bundles written to dist/"
# Register bundles if --register flag is passed
if [[ "${1:-}" == "--register" ]]; then
STORAGE_ROOT="${UNCAGED_WORKFLOW_STORAGE_ROOT:-${WORKFLOW_STORAGE_ROOT:-$HOME/.uncaged/workflow}}"
echo "Registering bundles..."
for bundle in develop solve-issue; do
cp "dist/${bundle}.esm.js" "$STORAGE_ROOT/${bundle}.esm.js"
uncaged-workflow workflow add "$bundle" "$STORAGE_ROOT/${bundle}.esm.js"
done
echo "Bundles registered."
fi