Compare commits

...

31 Commits

Author SHA1 Message Date
xiaoju b8b557baf6 fix: migrate template test imports from @uncaged/workflow to new packages
小橘 🍊(NEKO Team)
2026-05-09 03:51:48 +00:00
xingyue 727b4bb3ed refactor(workflow): fix tsconfig references, template imports, delete old packages/workflow
- Update root tsconfig.json references: replace packages/workflow with 6 new packages
- Update cli-workflow tsconfig references to new packages
- Add tsconfig references to workflow-util, workflow-runtime, workflow-execute
- Fix workflow-agent-llm, workflow-agent-cursor, workflow-agent-hermes, workflow-util-agent
  tsconfig references (../workflow -> ../workflow-runtime)
- Remove stale @uncaged/workflow deps from agent package.json files
- Change template packages to import buildDescriptor from @uncaged/workflow-register
- Normalize package.json exports field across all new packages
- Delete old packages/workflow/ directory
2026-05-09 11:46:57 +08:00
xingyue 9bbdfc41bd feat(execute): create @uncaged/workflow-execute + CLI migration
Phase 7: Engine + extract + workflow-as-agent merged into execute package.
All CLI imports migrated from @uncaged/workflow to specific packages.
105 CLI tests pass, 0 failures.

Changes:
- New @uncaged/workflow-execute package (engine/, extract/, workflow-as-agent)
- CLI src/ and __tests__/ rewritten to import from split packages
- bundle-validator updated to allow @uncaged/workflow-cas imports
- ensure-uncaged-workflow-symlink creates symlinks for all new packages

Ref: #143, closes #150
2026-05-09 11:35:03 +08:00
xingyue b07f8cf166 feat(register): create @uncaged/workflow-register package
Merges bundle/ + registry/ + config/ modules. The config↔registry
circular dependency is resolved: ProviderConfig and WorkflowConfig
now come from @uncaged/workflow-protocol.

Ref: #143, closes #149
2026-05-09 11:16:27 +08:00
xingyue 1a1e8b3398 feat(cas,reactor): create @uncaged/workflow-cas and @uncaged/workflow-reactor
Phase 4: CAS module extracted with Merkle types, hash functions,
and fs-backed store. Imports CasStore type from protocol.

Phase 5: Reactor (ReAct loop) extracted as independent package.
Only depends on protocol — no cas or engine dependency.

Ref: #143, closes #147, closes #148
2026-05-09 11:11:33 +08:00
xingyue 39d2a61686 refactor(runtime): types down to @uncaged/workflow-protocol
All type definitions now originate from @uncaged/workflow-protocol.
Runtime re-exports them for backward compatibility. Local AdvanceOutcome
duplicate in create-workflow.ts removed (now imported from protocol).

Ref: #143, closes #146
2026-05-09 11:09:38 +08:00
xingyue bf0bc47a3f feat(util): create @uncaged/workflow-util package
Extract pure utility functions from workflow/src/util/ into standalone package.
Types (Result, ok, err) now come from @uncaged/workflow-protocol.

Contains: base32 encoding, ULID generation, structured logger,
storage-root helpers, refs-field normalization.

Ref: #143, closes #145
2026-05-09 11:08:04 +08:00
xingyue 2cffaad127 feat(protocol): create @uncaged/workflow-protocol package
Extract all cross-package type definitions and constructor functions
into a dedicated protocol layer. This is the foundation for the
seven-package split (RFC #143).

Contains:
- Result<T,E>, ok(), err()
- START, END constants
- CasStore, WorkflowFn, RoleOutput, WorkflowCompletion
- WorkflowDescriptor, WorkflowRoleDescriptor
- ProviderConfig, WorkflowConfig, ResolvedModel (fixes config↔registry cycle)
- RoleDefinition, Moderator, WorkflowDefinition
- AgentFn, ExtractFn, and all thread context types

Ref: #143, closes #144
2026-05-09 11:06:10 +08:00
xiaomo 9a3daac657 Merge pull request 'feat(workflow): ThreadReactor — generic ReAct loop + extract/supervisor migration' (#142) from feat/139-thread-reactor into main 2026-05-09 02:28:09 +00:00
xiaoju b8f9ffcb59 feat(workflow): migrate supervisor to ThreadReactor (Phase 2)
- Rewrite supervisor to use createThreadReactor + createLlmFn
- No direct fetch/HTTP calls in supervisor
- All 266 tests passing

Refs #139, relates #141
2026-05-09 02:26:39 +00:00
xiaoju a7171f05f6 feat(workflow): add ThreadReactor generic ReAct loop + migrate extract (Phase 1)
- New src/reactor/ module: createThreadReactor, createLlmFn, types
- Two-stage API: config (llm, systemPrompt, tools, toolHandler) + per-call (thread, input, schema)
- All tool failures are recoverable (returned to LLM as error message)
- Rewrite createExtract to use createThreadReactor
- Delete reactExtract old implementation
- Fix template test imports (START/END from runtime, validateWorkflowDescriptor from engine)

268 tests passing.

Refs #139, relates #140
2026-05-09 02:15:38 +00:00
xiaoju b53667a2aa Merge pull request 'refactor(workflow): move descriptor validation out of runtime' (#135) from refactor/runtime-descriptor-boundary into main 2026-05-08 15:05:24 +00:00
Scott Wei 5b60fa6454 refactor(workflow-runtime): flatten package layout and centralize types
Collapse bundle/cas/extract/util stubs into types.ts; move createWorkflow and Result helpers to src root.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 23:03:53 +08:00
xiaomo 2c0e744ebf Merge pull request 'perf(serve): SSE live pump reads incrementally' (#137) from fix/130-sse-incremental into main 2026-05-08 15:00:44 +00:00
xiaomo ae16f09688 Merge pull request 'feat(dashboard): hash routing + health check polling' (#138) from fix/128-dashboard-enhancements into main 2026-05-08 15:00:34 +00:00
xiaomo 73a3638ad9 Merge pull request 'fix(serve): error handling, CORS, body limit, CAS store reuse' (#136) from fix/120-serve-hardening into main 2026-05-08 15:00:32 +00:00
xingyue 7b0260cedd feat(dashboard): hash routing + health check polling
- Hash-based URL routing (#threads, #threads/{id}, #workflows)
  for bookmarkable/shareable thread links
- Health check polls every 10s with reconnecting state
- useHashRoute hook for clean route management

Closes #128
2026-05-08 18:16:09 +08:00
xingyue 61fc1cfe1b perf(serve): SSE live pump reads incrementally instead of full file
Use Bun.file().slice() to read only new bytes from the last known
offset instead of re-reading the entire JSONL file on every fs watch.

- readNewBytes() helper with byte-offset tracking
- Handles file truncation (resets offset)
- Early return when no new data

Closes #130
2026-05-08 18:14:04 +08:00
xingyue 6b1e728700 fix(serve): error handling, CORS, body limit, CAS store reuse
- Global error handler (app.onError → 500 JSON)
- JSON parse validation on POST routes (400)
- CORS restricted to localhost origins
- 1MB body size limit on POST (413)
- CAS store created once per route group, not per-request
- 6 new tests covering all changes

Closes #120
2026-05-08 18:11:59 +08:00
xiaomo dedab62c49 Merge pull request 'feat(dashboard): connect thread detail to SSE live stream' (#134) from feat/131-dashboard-sse into main 2026-05-08 09:53:10 +00:00
xingyue a44f1f34a8 feat(dashboard): connect thread detail to SSE live stream
Add useSSE hook that connects to /api/threads/:id/live via EventSource.
Thread detail page now shows records in real-time with auto-scroll.

- useSSE hook: EventSource connection, record accumulation, auto-reconnect
  with exponential backoff, cleanup on unmount
- Thread detail: Live badge, SSE-first with fetch fallback, smooth scroll
- Records clear on reconnect (server replays full file)

Closes #131, testing verified per #133
Refs: #118
2026-05-08 17:52:10 +08:00
Scott Wei 8ff6f7e778 refactor(workflow): move descriptor validation out of runtime
Keep @uncaged/workflow-runtime focused on bundle runtime capabilities by relocating descriptor validation implementation to @uncaged/workflow.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:45:15 +08:00
xiaoju e04e75bdee chore: remove stale self-referencing symlink
小橘 🍊(NEKO Team)
2026-05-08 09:35:32 +00:00
xiaoju c65c29c1b5 Merge pull request 'refactor(workflow): simplify extraction + thread runtime contract' (#132) from refactor/thread-context-runtime into main 2026-05-08 09:34:26 +00:00
Scott Wei cc3f2b576c refactor(workflow): decouple agent context from CAS and fix monorepo checks
Move CAS access into extract dependencies so AgentContext stays state-only, and clean up type/lint/check regressions across CLI/dashboard to keep full check green.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:30:07 +08:00
Scott Wei 884ff85205 refactor(workflow): remove dead extract retry export
Drop unused llmExtractWithRetry implementation and public exports.

Add solve-issue template coverage for tool_calls extraction path.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:10:31 +08:00
Scott Wei a11cc62a81 refactor(workflow-runtime): use full ThreadContext in WorkflowFn
Redefine WorkflowFn to accept a complete ThreadContext plus WorkflowRuntime dependencies, removing ThreadInput and WorkflowFnOptions.

Move thread context construction into engine executeThread, update runtime loop/agent paths, and align templates/docs/tests with template-only definition exports.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:08:01 +08:00
Scott Wei 34f5e655d1 refactor(workflow): unify extraction behind ExtractFn
Route createExtract through reactExtract with plain-JSON correction retry.

Remove WorkflowFnOptions.llmProvider, ExtractMode, RoleDefinition.extractMode, ResolveRoleMetaFn.

Runtime createWorkflow calls options.extract directly; engine passes extract only.

Update templates, CLI skill docs, and tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 17:08:01 +08:00
xiaomo 44fb0694aa Merge pull request 'feat(serve+dashboard): write endpoints, SSE live, run dialog' (#129) from feat/118-serve-write-sse into main 2026-05-08 09:01:09 +00:00
xingyue cdcaff15ab feat(serve+dashboard): write endpoints, SSE live, run dialog
Serve API:
- POST /api/threads — run a new thread
- POST /api/threads/:id/kill — kill thread
- POST /api/threads/:id/pause — pause thread
- POST /api/threads/:id/resume — resume thread
- GET /api/threads/:id/live — SSE stream of thread records

Dashboard:
- Run Thread dialog (select workflow, enter prompt, set maxRounds)
- Thread detail controls (pause/resume/kill buttons)
- postJson API helper for write operations

262 tests pass. Refs: #118
2026-05-08 16:07:02 +08:00
xiaomo 402479ddef Merge pull request 'feat(dashboard): workflow dashboard' (#127) from feat/118-dashboard into main 2026-05-08 07:22:00 +00:00
209 changed files with 2655 additions and 4908 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
{
"$schema": "https://biomejs.dev/schemas/2.4.14/schema.json",
"files": {
"includes": ["**", "!**/dist", "!**/node_modules"]
"includes": ["**", "!**/dist", "!**/node_modules", "!packages/workflow/workflow"]
},
"assist": { "actions": { "source": { "organizeImports": "on" } } },
"formatter": {
@@ -3,13 +3,9 @@ import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promise
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createContentMerkleNode,
getGlobalCasDir,
getRegisteredWorkflow,
readWorkflowRegistry,
serializeMerkleNode,
} from "@uncaged/workflow";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow-cas";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/commands/cas/index.js";
import {
cmdAdd,
@@ -25,7 +21,7 @@ import { addCliArgs } from "./bundle-fixture.js";
const fixtureDescriptor = `export const descriptor = { description: "fixture", roles: {} };
`;
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow-cas";
`;
function casStoredForm(raw: string): string {
@@ -2,7 +2,8 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload, getGlobalCasDir } from "@uncaged/workflow";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdFork, cmdRun } from "../src/commands/thread/index.js";
import { cmdAdd } from "../src/commands/workflow/index.js";
import { pathExists } from "../src/fs-utils.js";
@@ -10,7 +11,7 @@ import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
/** Three-role workflow that respects `input.steps` for fork/resume. */
const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow-cas";
export const descriptor = {
description: "fork-cli",
@@ -4,12 +4,9 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import {
createCasStore,
garbageCollectCas,
getGlobalCasDir,
putContentMerkleNode,
} from "@uncaged/workflow";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { cmdThreadRemove } from "../src/commands/thread/index.js";
import { pathExists } from "../src/fs-utils.js";
@@ -50,7 +50,6 @@ describe("init template", () => {
dependencies: Record<string, string>;
};
expect(pkg.type).toBe("module");
expect(pkg.dependencies["@uncaged/workflow"]).toBeDefined();
expect(pkg.dependencies["@uncaged/workflow-runtime"]).toBeDefined();
expect(pkg.dependencies.zod).toBeDefined();
expect(pkg.name).toContain("review-pr");
@@ -46,7 +46,7 @@ describe("init workspace", () => {
dependencies: Record<string, string>;
};
expect(wfPkg.type).toBe("module");
expect(wfPkg.dependencies["@uncaged/workflow"]).toBeDefined();
expect(wfPkg.dependencies["@uncaged/workflow-runtime"]).toBeDefined();
expect(wfPkg.dependencies.zod).toBeDefined();
const tsconfig = JSON.parse(await readFile(join(root, "tsconfig.json"), "utf8")) as {
+2 -1
View File
@@ -5,7 +5,8 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, getGlobalCasDir, putContentMerkleNode } from "@uncaged/workflow";
import { createCasStore, putContentMerkleNode } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import {
formatLiveDebugLine,
+78 -1
View File
@@ -1,6 +1,6 @@
import { describe, expect, test } from "bun:test";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow";
import { createContentMerkleNode, serializeMerkleNode } from "@uncaged/workflow-cas";
import { createApp } from "../src/commands/serve/app.js";
@@ -77,6 +77,83 @@ describe("serve /api/cas", () => {
});
});
describe("serve error handling", () => {
test("POST /api/threads with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/cas with invalid JSON body → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/cas", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "not json",
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("invalid JSON body");
});
test("POST /api/threads with missing required fields → 400", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const res = await fetch("/api/threads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ foo: "bar" }),
});
expect(res.status).toBe(400);
const body = (await res.json()) as { error: string };
expect(body.error).toContain("required");
});
test("global error handler returns 500 with JSON", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
app.get("/test-error", () => {
throw new Error("boom");
});
const res = await app.fetch(new Request("http://localhost/test-error"));
expect(res.status).toBe(500);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Internal server error");
});
});
describe("serve security", () => {
test("CORS headers present on responses", async () => {
const app = createApp("/tmp/uncaged-serve-test-nonexistent");
const res2 = await app.fetch(
new Request("http://localhost/healthz", {
headers: { Origin: "http://localhost:5173" },
}),
);
expect(res2.headers.get("Access-Control-Allow-Origin")).toBe("http://localhost:5173");
});
test("POST with body > 1MB → 413", async () => {
const { fetch } = buildApp("/tmp/uncaged-serve-test-nonexistent");
const largeBody = "x".repeat(1_048_577);
const res = await fetch("/api/cas", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Content-Length": String(largeBody.length),
},
body: largeBody,
});
expect(res.status).toBe(413);
const body = (await res.json()) as { error: string };
expect(body.error).toBe("Payload too large");
});
});
describe("serve CAS round-trip", () => {
const tmpDir = `/tmp/uncaged-serve-cas-test-${Date.now()}`;
@@ -1,5 +1,5 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow";
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow-util";
import { resolveWorkflowStorageRoot } from "../src/storage-env.js";
describe("resolveWorkflowStorageRoot", () => {
@@ -4,7 +4,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { getGlobalCasDir } from "@uncaged/workflow";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { cmdCasPut } from "../src/commands/cas/index.js";
import {
cmdKill,
@@ -21,7 +21,7 @@ import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
import { ensureTestWorkflowRegistryConfig } from "./workflow-registry-fixture.js";
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow-cas";
`;
const threadFixtureDescriptor = `export const descriptor = {
+5 -1
View File
@@ -6,8 +6,12 @@
"uncaged-workflow": "src/cli.ts"
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*",
"@uncaged/workflow-util": "workspace:*",
"@uncaged/workflow-cas": "workspace:*",
"@uncaged/workflow-execute": "workspace:*",
"@uncaged/workflow-register": "workspace:*",
"@uncaged/workflow-runtime": "workspace:*",
"@uncaged/workflow": "workspace:*",
"hono": "^4.12.18",
"yaml": "^2.8.4"
},
+1 -1
View File
@@ -1,7 +1,7 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists } from "./fs-utils.js";
+2 -1
View File
@@ -1,4 +1,5 @@
import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow";
import type { Result } from "@uncaged/workflow-protocol";
import { type GcResult, garbageCollectCas } from "@uncaged/workflow-execute";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
@@ -1,4 +1,6 @@
import { createCasStore, err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasGet(
storageRoot: string,
@@ -1,4 +1,6 @@
import { createCasStore, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasList(storageRoot: string): Promise<Result<string[], string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -1,4 +1,6 @@
import { createCasStore, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasPut(
storageRoot: string,
+3 -1
View File
@@ -1,4 +1,6 @@
import { createCasStore, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
import { ok, type Result } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
export async function cmdCasRm(storageRoot: string, hash: string): Promise<Result<void, string>> {
const cas = createCasStore(getGlobalCasDir(storageRoot));
@@ -1,7 +1,7 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join, resolve } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists } from "../../fs-utils.js";
@@ -6,7 +6,6 @@ export function templatePackageJson(templateName: string): string {
private: true,
type: "module",
dependencies: {
"@uncaged/workflow": "^0.1.0",
"@uncaged/workflow-runtime": "^0.1.0",
zod: "^4.0.0",
},
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
/** Validates a single path segment for workspace / template names (no separators, not `.` / `..`). */
export function validateWorkspaceSegment(name: string): Result<void, string> {
@@ -1,7 +1,7 @@
import { mkdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { pathExists } from "../../fs-utils.js";
import type { CmdInitWorkspaceSuccess } from "./types.js";
@@ -28,7 +28,7 @@ function workflowsPackageJson(): string {
private: true,
type: "module",
dependencies: {
"@uncaged/workflow": "^0.1.0",
"@uncaged/workflow-runtime": "^0.1.0",
zod: "^4.0.0",
},
},
@@ -107,7 +107,7 @@ Init 生成的骨架:\`templates/\` 下放可复用定义,\`workflows/\` 下
2. **编写 RoleDefinition**:为每个角色写 Zod \`schema\`,补齐 \`systemPrompt\` / \`extractPrompt\` / \`description\`
3. **编写 Moderator**:根据 \`ctx.steps\` 与业务状态返回下一个角色名或 \`END\`
4. **组装 WorkflowDefinition**:在模板 \`index\` 中导出 definition(以及必要的角色 / moderator 导出)。
5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding)\`(或项目约定的封装)绑定 **AgentFn**;**ExtractFn** 由引擎从 **workflow.yaml** 注入 \`WorkflowFnOptions\`
5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding)\`(或项目约定的封装)绑定 **AgentFn**;**ExtractFn** 由引擎从 **workflow.yaml** 注入 \`WorkflowRuntime\`
6. **构建**:打包为单个 **.esm.js** bundle,使用 **uncaged-workflow add** 注册。
## 4. 编码规范
@@ -2,18 +2,46 @@ import { Hono } from "hono";
import { cors } from "hono/cors";
import { createCasRoutes } from "./routes-cas.js";
import { createLiveRoutes } from "./routes-live.js";
import { createThreadRoutes } from "./routes-thread.js";
import { createWorkflowRoutes } from "./routes-workflow.js";
const MAX_BODY_SIZE = 1_048_576; // 1 MB
export function createApp(storageRoot: string): Hono {
const app = new Hono();
app.use("*", cors());
app.onError((_err, c) => {
return c.json({ error: "Internal server error" }, 500);
});
app.use(
"*",
cors({
origin: [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:7860",
"http://127.0.0.1:7860",
],
}),
);
app.use("*", async (c, next) => {
if (c.req.method === "POST") {
const contentLength = c.req.header("content-length");
if (contentLength !== undefined && Number(contentLength) > MAX_BODY_SIZE) {
return c.json({ error: "Payload too large" }, 413);
}
}
await next();
});
app.get("/healthz", (c) => c.json({ ok: true }));
app.route("/api/workflows", createWorkflowRoutes(storageRoot));
app.route("/api/threads", createThreadRoutes(storageRoot));
app.route("/api/threads", createLiveRoutes(storageRoot));
app.route("/api/cas", createCasRoutes(storageRoot));
return app;
@@ -1,19 +1,19 @@
import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore } from "@uncaged/workflow-cas";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { Hono } from "hono";
export function createCasRoutes(storageRoot: string): Hono {
const app = new Hono();
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
app.get("/", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hashes = await cas.list();
return c.json({ hashes });
});
app.get("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const content = await cas.get(c.req.param("hash"));
if (content === null) {
return c.json({ error: "not found" }, 404);
@@ -22,19 +22,20 @@ export function createCasRoutes(storageRoot: string): Hono {
});
app.post("/", async (c) => {
const body = await c.req.json<{ content: string }>();
let body: { content: string };
try {
body = (await c.req.json()) as { content: string };
} catch {
return c.json({ error: "invalid JSON body" }, 400);
}
if (typeof body.content !== "string") {
return c.json({ error: "content field required" }, 400);
}
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = await cas.put(body.content);
return c.json({ hash }, 201);
});
app.delete("/:hash", async (c) => {
const casDir = getGlobalCasDir(storageRoot);
const cas = createCasStore(casDir);
const hash = c.req.param("hash");
const content = await cas.get(hash);
if (content === null) {
@@ -0,0 +1,198 @@
import { statSync, watch } from "node:fs";
import { dirname, join } from "node:path";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { resolveThreadDataPath } from "../../thread-scan.js";
type PumpState = {
contentOffset: number;
carry: string;
};
function fileSize(path: string): number {
try {
return statSync(path).size;
} catch {
return 0;
}
}
async function readNewBytes(path: string, state: PumpState): Promise<string | null> {
const size = fileSize(path);
if (size < state.contentOffset) {
// File was truncated — reset
state.contentOffset = 0;
state.carry = "";
}
if (size <= state.contentOffset) {
return null;
}
const blob = Bun.file(path).slice(state.contentOffset, size);
const chunk = await blob.text();
state.contentOffset = size;
return chunk;
}
function parseJsonLine(line: string): unknown {
try {
return JSON.parse(line) as unknown;
} catch {
return { raw: line };
}
}
function isWorkflowResult(record: unknown): boolean {
return (
record !== null &&
typeof record === "object" &&
"type" in (record as Record<string, unknown>) &&
(record as Record<string, unknown>).type === "workflow-result"
);
}
function parseNewLines(chunk: string, state: PumpState): string[] {
state.carry += chunk;
const parts = state.carry.split("\n");
state.carry = parts.pop() ?? "";
const lines: string[] = [];
for (const line of parts) {
const trimmed = line.trim();
if (trimmed !== "") {
lines.push(trimmed);
}
}
return lines;
}
export function createLiveRoutes(storageRoot: string): Hono {
const app = new Hono();
app.get("/:threadId/live", async (c) => {
const threadId = c.req.param("threadId");
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return c.json({ error: `thread not found: ${threadId}` }, 404);
}
const resolvedDataPath = dataPath;
const infoPath = join(dirname(resolvedDataPath), `${threadId}.info.jsonl`);
return streamSSE(c, async (stream) => {
const dataState: PumpState = { contentOffset: 0, carry: "" };
const infoState: PumpState = { contentOffset: 0, carry: "" };
let eventId = 0;
async function pumpData(): Promise<boolean> {
let chunk: string | null;
try {
chunk = await readNewBytes(resolvedDataPath, dataState);
} catch {
return false;
}
if (chunk === null) {
return false;
}
const lines = parseNewLines(chunk, dataState);
for (const line of lines) {
const record = parseJsonLine(line);
eventId++;
await stream.writeSSE({
event: "record",
data: JSON.stringify(record),
id: String(eventId),
});
if (isWorkflowResult(record)) {
return true;
}
}
return false;
}
async function pumpInfo(): Promise<void> {
let chunk: string | null;
try {
chunk = await readNewBytes(infoPath, infoState);
} catch {
return;
}
if (chunk === null) {
return;
}
const lines = parseNewLines(chunk, infoState);
for (const line of lines) {
const record = parseJsonLine(line);
if (
typeof record === "object" &&
record !== null &&
"raw" in (record as Record<string, unknown>)
) {
continue;
}
eventId++;
await stream.writeSSE({
event: "info",
data: JSON.stringify(record),
id: String(eventId),
});
}
}
// Initial pump
const done = await pumpData();
await pumpInfo();
if (done) {
return;
}
// Watch for changes
const controller = new AbortController();
let completed = false;
const dataWatcher = watch(resolvedDataPath, async () => {
if (completed) return;
const finished = await pumpData();
if (finished) {
completed = true;
controller.abort();
}
});
let infoWatcher: ReturnType<typeof watch> | null = null;
try {
infoWatcher = watch(infoPath, async () => {
if (completed) return;
await pumpInfo();
});
} catch {
// info file may not exist
}
stream.onAbort(() => {
completed = true;
dataWatcher.close();
infoWatcher?.close();
});
// Keep stream alive until completion or client disconnect
await new Promise<void>((resolve) => {
if (completed) {
resolve();
return;
}
controller.signal.addEventListener("abort", () => resolve(), { once: true });
stream.onAbort(() => resolve());
});
dataWatcher.close();
infoWatcher?.close();
});
});
return app;
}
@@ -6,6 +6,8 @@ import {
listRunningThreads,
resolveThreadDataPath,
} from "../../thread-scan.js";
import { cmdKill, cmdPause, cmdResume } from "../thread/control.js";
import { cmdRun } from "../thread/run.js";
export function createThreadRoutes(storageRoot: string): Hono {
const app = new Hono();
@@ -42,5 +44,55 @@ export function createThreadRoutes(storageRoot: string): Hono {
return c.json({ threadId, records });
});
app.post("/", async (c) => {
let body: Record<string, unknown>;
try {
body = (await c.req.json()) as Record<string, unknown>;
} catch {
return c.json({ error: "invalid JSON body" }, 400);
}
const name = body.workflow;
const prompt = body.prompt;
const maxRounds = typeof body.maxRounds === "number" ? body.maxRounds : 10;
if (typeof name !== "string" || typeof prompt !== "string") {
return c.json({ error: "workflow (string) and prompt (string) are required" }, 400);
}
const result = await cmdRun(storageRoot, name, prompt, maxRounds);
if (!result.ok) {
return c.json({ error: result.error }, 400);
}
return c.json({ threadId: result.value.threadId }, 201);
});
app.post("/:threadId/kill", async (c) => {
const threadId = c.req.param("threadId");
const result = await cmdKill(storageRoot, threadId);
if (!result.ok) {
return c.json({ error: result.error }, 400);
}
return c.json({ ok: true });
});
app.post("/:threadId/pause", async (c) => {
const threadId = c.req.param("threadId");
const result = await cmdPause(storageRoot, threadId);
if (!result.ok) {
return c.json({ error: result.error }, 400);
}
return c.json({ ok: true });
});
app.post("/:threadId/resume", async (c) => {
const threadId = c.req.param("threadId");
const result = await cmdResume(storageRoot, threadId);
if (!result.ok) {
return c.json({ error: result.error }, 400);
}
return c.json({ ok: true });
});
return app;
}
@@ -2,7 +2,7 @@ import {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
readWorkflowRegistry,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
import { Hono } from "hono";
export function createWorkflowRoutes(storageRoot: string): Hono {
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { serve } from "bun";
import { printCliLine } from "../../cli-output.js";
@@ -1,4 +1,4 @@
import type { Result } from "@uncaged/workflow";
import type { Result } from "@uncaged/workflow-protocol";
import {
readWorkerCtl,
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import type { ParsedForkArgv } from "./types.js";
@@ -1,6 +1,8 @@
import { join } from "node:path";
import { buildForkPlan, err, generateUlid, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { buildForkPlan } from "@uncaged/workflow-execute";
import { pathExists, readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { listHistoricalThreads } from "../../thread-scan.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -2,15 +2,10 @@ import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import {
type CasStore,
createCasStore,
getContentMerklePayload,
getGlobalCasDir,
tryParseRoleStepRecord,
tryParseWorkflowResultRecord,
} from "@uncaged/workflow";
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
import type { CasStore, WorkflowCompletion } from "@uncaged/workflow-protocol";
import { getGlobalCasDir } from "@uncaged/workflow-util";
import { createCasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { tryParseRoleStepRecord, tryParseWorkflowResultRecord } from "@uncaged/workflow-execute";
import { dimGreyLine, highlightLiveRole } from "../../cli-color.js";
import { printCliError, printCliLine } from "../../cli-output.js";
@@ -1,7 +1,8 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { garbageCollectCas } from "@uncaged/workflow-execute";
import { resolveThreadDataPath } from "../../thread-scan.js";
@@ -1,13 +1,8 @@
import { join } from "node:path";
import {
err,
generateUlid,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
} from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "../../worker-spawn.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { readTextFileIfExists } from "../../fs-utils.js";
import { resolveThreadDataPath } from "../../thread-scan.js";
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import type { ParsedAddArgv } from "./types.js";
@@ -1,18 +1,16 @@
import { readFile, stat } from "node:fs/promises";
import { basename, resolve } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { hashWorkflowBundleBytes } from "@uncaged/workflow-cas";
import {
err,
extractBundleExports,
hashWorkflowBundleBytes,
ok,
type Result,
readWorkflowRegistry,
registerWorkflowVersion,
stringifyWorkflowDescriptor,
validateWorkflowBundle,
writeWorkflowRegistry,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
import { storeWorkflowBundleArtifacts } from "../../bundle-store.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,10 +1,5 @@
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
} from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,11 +1,9 @@
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
err,
listRegisteredWorkflowNames,
ok,
type Result,
readWorkflowRegistry,
type WorkflowRegistryFile,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
export async function cmdList(storageRoot: string): Promise<Result<WorkflowRegistryFile, string>> {
const reg = await readWorkflowRegistry(storageRoot);
@@ -1,11 +1,9 @@
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
err,
ok,
type Result,
readWorkflowRegistry,
unregisterWorkflow,
writeWorkflowRegistry,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,14 +1,12 @@
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
rollbackWorkflowToHistoryHash,
writeWorkflowRegistry,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
import { pathExists } from "../../fs-utils.js";
import { validateCliWorkflowName } from "../../workflow-name.js";
@@ -1,11 +1,9 @@
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
type WorkflowRegistryEntry,
} from "@uncaged/workflow";
} from "@uncaged/workflow-register";
import { stringify } from "yaml";
import { validateCliWorkflowName } from "../../workflow-name.js";
+1 -1
View File
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
export type ParsedLiveArgv = {
threadId: string | null;
+1 -1
View File
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
export type ParsedRunArgv = {
name: string;
-1
View File
@@ -203,7 +203,6 @@ Each role has:
| \`extractPrompt\` | string | Instruction for extracting structured meta |
| \`schema\` | ZodSchema | Validates the extracted meta |
| \`extractRefs\` | fn or null | Extracts CAS hashes from meta for DAG linking |
| \`extractMode\` | "single" | Extraction mode |
## Development Workflow
+1 -1
View File
@@ -1,4 +1,4 @@
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow";
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow-util";
/**
* Resolve storage root with env var override support.
+2 -1
View File
@@ -3,7 +3,8 @@ import { mkdir, readdir, unlink, writeFile } from "node:fs/promises";
import { createConnection } from "node:net";
import { join } from "node:path";
import { err, getWorkerHostScriptPath, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import { getWorkerHostScriptPath } from "@uncaged/workflow-execute";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
+1 -1
View File
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
const WORKFLOW_NAME_RE = /^[a-z][a-z0-9]*(-[a-z0-9]+)*$/;
+8 -1
View File
@@ -17,6 +17,13 @@
"rootDir": "src",
"types": ["bun-types"]
},
"references": [{ "path": "../workflow-runtime" }, { "path": "../workflow" }],
"references": [
{ "path": "../workflow-runtime" },
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" },
{ "path": "../workflow-cas" },
{ "path": "../workflow-execute" },
{ "path": "../workflow-register" }
],
"include": ["src/**/*.ts"]
}
+33
View File
@@ -1,5 +1,18 @@
const BASE = "/api";
async function postJson<T>(path: string, body: unknown): Promise<T> {
const res = await fetch(`${BASE}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!res.ok) {
const err = (await res.json().catch(() => ({ error: res.statusText }))) as { error: string };
throw new Error(err.error || `API ${res.status}`);
}
return res.json() as Promise<T>;
}
async function fetchJson<T>(path: string): Promise<T> {
const res = await fetch(`${BASE}${path}`);
if (!res.ok) {
@@ -46,6 +59,26 @@ export function getThread(id: string): Promise<{ records: ThreadRecord[] }> {
return fetchJson(`/threads/${id}`);
}
export function runThread(
workflow: string,
prompt: string,
maxRounds: number = 10,
): Promise<{ threadId: string }> {
return postJson("/threads", { workflow, prompt, maxRounds });
}
export function killThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/kill`, {});
}
export function pauseThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/pause`, {});
}
export function resumeThread(threadId: string): Promise<{ ok: boolean }> {
return postJson(`/threads/${threadId}/resume`, {});
}
export function getHealth(): Promise<{ ok: boolean }> {
return fetchJson("/healthz");
}
+20 -13
View File
@@ -1,31 +1,38 @@
import { useState } from "react";
import { RunDialog } from "./components/run-dialog.tsx";
import { Sidebar } from "./components/sidebar.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
import { StatusBar } from "./components/status-bar.tsx";
type View = "threads" | "workflows";
import { ThreadDetail } from "./components/thread-detail.tsx";
import { ThreadList } from "./components/thread-list.tsx";
import { WorkflowList } from "./components/workflow-list.tsx";
import { useHashRoute } from "./use-hash-route.ts";
export function App() {
const [view, setView] = useState<View>("threads");
const [selectedThread, setSelectedThread] = useState<string | null>(null);
const { view, threadId, setView, setThreadId } = useHashRoute();
const [showRun, setShowRun] = useState(false);
return (
<div className="flex h-screen">
<Sidebar view={view} onViewChange={setView} />
<main className="flex-1 overflow-hidden flex flex-col">
<StatusBar />
<StatusBar onRun={() => setShowRun(true)} />
<div className="flex-1 overflow-auto p-6">
{view === "threads" && !selectedThread && (
<ThreadList onSelect={setSelectedThread} />
)}
{view === "threads" && selectedThread && (
<ThreadDetail threadId={selectedThread} onBack={() => setSelectedThread(null)} />
{view === "threads" && threadId === null && <ThreadList onSelect={setThreadId} />}
{view === "threads" && threadId !== null && (
<ThreadDetail threadId={threadId} onBack={() => setThreadId(null)} />
)}
{view === "workflows" && <WorkflowList />}
</div>
</main>
{showRun && (
<RunDialog
onClose={() => setShowRun(false)}
onCreated={(id) => {
setShowRun(false);
setThreadId(id);
}}
/>
)}
</div>
);
}
@@ -0,0 +1,147 @@
import { useState } from "react";
import { listWorkflows, runThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
type Props = {
onClose: () => void;
onCreated: (threadId: string) => void;
};
export function RunDialog({ onClose, onCreated }: Props) {
const workflows = useFetch(() => listWorkflows(), []);
const [workflow, setWorkflow] = useState("");
const [prompt, setPrompt] = useState("");
const [maxRounds, setMaxRounds] = useState(10);
const [submitting, setSubmitting] = useState(false);
const [error, setError] = useState<string | null>(null);
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!workflow || !prompt) return;
setSubmitting(true);
setError(null);
try {
const result = await runThread(workflow, prompt, maxRounds);
onCreated(result.threadId);
} catch (err) {
setError(err instanceof Error ? err.message : String(err));
setSubmitting(false);
}
}
return (
<div
className="fixed inset-0 flex items-center justify-center z-50"
style={{ background: "rgba(0,0,0,0.6)" }}
>
<div
className="w-full max-w-lg p-6 rounded-lg border"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
<h3 className="text-lg font-semibold mb-4">Run Thread</h3>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label
htmlFor="run-workflow"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Workflow
</label>
<select
id="run-workflow"
value={workflow}
onChange={(e) => setWorkflow(e.target.value)}
className="w-full px-3 py-2 rounded border text-sm"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-border)",
color: "var(--color-text)",
}}
>
<option value="">Select a workflow...</option>
{workflows.status === "ok" &&
workflows.data.workflows.map((w) => (
<option key={w.name} value={w.name}>
{w.name}
</option>
))}
</select>
</div>
<div>
<label
htmlFor="run-prompt"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Prompt
</label>
<textarea
id="run-prompt"
value={prompt}
onChange={(e) => setPrompt(e.target.value)}
rows={4}
className="w-full px-3 py-2 rounded border text-sm"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-border)",
color: "var(--color-text)",
}}
placeholder="Enter the task prompt..."
/>
</div>
<div>
<label
htmlFor="run-max-rounds"
className="text-sm block mb-1"
style={{ color: "var(--color-text-muted)" }}
>
Max Rounds
</label>
<input
id="run-max-rounds"
type="number"
value={maxRounds}
onChange={(e) => setMaxRounds(Number(e.target.value))}
min={1}
max={100}
className="w-24 px-3 py-2 rounded border text-sm"
style={{
background: "var(--color-bg)",
borderColor: "var(--color-border)",
color: "var(--color-text)",
}}
/>
</div>
{error && (
<p className="text-sm" style={{ color: "var(--color-error)" }}>
{error}
</p>
)}
<div className="flex gap-2 justify-end">
<button
type="button"
onClick={onClose}
className="px-4 py-2 text-sm rounded border"
style={{ borderColor: "var(--color-border)", color: "var(--color-text-muted)" }}
>
Cancel
</button>
<button
type="submit"
disabled={submitting || !workflow || !prompt}
className="px-4 py-2 text-sm rounded"
style={{
background: submitting ? "var(--color-accent-dim)" : "var(--color-accent)",
color: "#fff",
opacity: !workflow || !prompt ? 0.5 : 1,
}}
>
{submitting ? "Starting..." : "Run"}
</button>
</div>
</form>
</div>
</div>
);
}
@@ -10,16 +10,22 @@ export function Sidebar({ view, onViewChange }: Props) {
];
return (
<aside className="w-56 border-r flex flex-col" style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}>
<aside
className="w-56 border-r flex flex-col"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<div className="p-4 border-b" style={{ borderColor: "var(--color-border)" }}>
<h1 className="text-lg font-semibold" style={{ color: "var(--color-accent)" }}>
Workflow
</h1>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>Dashboard</p>
<p className="text-xs mt-1" style={{ color: "var(--color-text-muted)" }}>
Dashboard
</p>
</div>
<nav className="flex-1 p-2 space-y-1">
{items.map((item) => (
<button
type="button"
key={item.key}
onClick={() => onViewChange(item.key)}
className="w-full text-left px-3 py-2 rounded text-sm transition-colors"
@@ -1,24 +1,65 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { getHealth } from "../api.ts";
import { useFetch } from "../hooks.ts";
export function StatusBar() {
const health = useFetch(() => getHealth(), []);
type HealthStatus = "connected" | "disconnected" | "reconnecting";
type Props = {
onRun: () => void;
};
function statusLabel(status: HealthStatus): { text: string; color: string } {
if (status === "connected") {
return { text: "● Connected", color: "var(--color-success)" };
}
if (status === "reconnecting") {
return { text: "● Reconnecting...", color: "var(--color-warning, #f59e0b)" };
}
return { text: "● Offline", color: "var(--color-error)" };
}
export function StatusBar({ onRun }: Props) {
const [status, setStatus] = useState<HealthStatus>("disconnected");
const wasConnectedRef = useRef(false);
const checkHealth = useCallback(async () => {
try {
await getHealth();
wasConnectedRef.current = true;
setStatus("connected");
} catch {
if (wasConnectedRef.current) {
setStatus("reconnecting");
} else {
setStatus("disconnected");
}
}
}, []);
useEffect(() => {
checkHealth();
const interval = setInterval(checkHealth, 10_000);
return () => clearInterval(interval);
}, [checkHealth]);
const label = statusLabel(status);
return (
<div
className="flex items-center justify-between px-6 py-2 text-xs border-b"
style={{ borderColor: "var(--color-border)", background: "var(--color-surface)" }}
>
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
<span>
{health.status === "loading" && "⏳ Connecting..."}
{health.status === "ok" && (
<span style={{ color: "var(--color-success)" }}> Connected</span>
)}
{health.status === "error" && (
<span style={{ color: "var(--color-error)" }}> Offline</span>
)}
</span>
<div className="flex items-center gap-4">
<span style={{ color: "var(--color-text-muted)" }}>Local API: 127.0.0.1:7860</span>
<button
type="button"
onClick={onRun}
className="px-3 py-1 rounded text-xs font-medium"
style={{ background: "var(--color-accent)", color: "#fff" }}
>
Run Thread
</button>
</div>
<span style={{ color: label.color }}>{label.text}</span>
</div>
);
}
@@ -1,5 +1,7 @@
import { getThread } from "../api.ts";
import { useEffect, useRef, useState } from "react";
import { getThread, killThread, pauseThread, resumeThread } from "../api.ts";
import { useFetch } from "../hooks.ts";
import { useSSE } from "../use-sse.ts";
type Props = {
threadId: string;
@@ -7,26 +9,101 @@ type Props = {
};
export function ThreadDetail({ threadId, onBack }: Props) {
const sse = useSSE(threadId);
const { status, data, error } = useFetch(() => getThread(threadId), [threadId]);
const [actionStatus, setActionStatus] = useState<string | null>(null);
const recordsEndRef = useRef<HTMLDivElement>(null);
const liveActive = sse.connected && !sse.completed;
const records = liveActive
? sse.records
: status === "ok"
? data.records
: ([] as typeof sse.records);
// biome-ignore lint/correctness/useExhaustiveDependencies: scroll when the rendered record list grows
useEffect(() => {
recordsEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [records.length]);
async function handleAction(action: "kill" | "pause" | "resume") {
setActionStatus(`${action}ing...`);
try {
const fn = action === "kill" ? killThread : action === "pause" ? pauseThread : resumeThread;
await fn(threadId);
setActionStatus(`${action} sent ✓`);
} catch (e) {
setActionStatus(`${action} failed: ${e instanceof Error ? e.message : String(e)}`);
}
}
return (
<div>
<button
onClick={onBack}
className="text-sm mb-4 hover:underline"
style={{ color: "var(--color-accent)" }}
>
Back to threads
</button>
<h2 className="text-xl font-semibold mb-4 font-mono">{threadId}</h2>
<div className="flex items-center justify-between mb-4">
<button
type="button"
onClick={onBack}
className="text-sm hover:underline"
style={{ color: "var(--color-accent)" }}
>
Back to threads
</button>
<div className="flex gap-2">
<button
type="button"
onClick={() => handleAction("pause")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-warning)", color: "var(--color-warning)" }}
>
Pause
</button>
<button
type="button"
onClick={() => handleAction("resume")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-success)", color: "var(--color-success)" }}
>
Resume
</button>
<button
type="button"
onClick={() => handleAction("kill")}
className="px-3 py-1 text-xs rounded border"
style={{ borderColor: "var(--color-error)", color: "var(--color-error)" }}
>
Kill
</button>
</div>
</div>
{status === "loading" && <p style={{ color: "var(--color-text-muted)" }}>Loading...</p>}
{status === "error" && <p style={{ color: "var(--color-error)" }}>Error: {error}</p>}
{status === "ok" && (
<h2 className="text-xl font-semibold mb-2 font-mono flex items-center gap-2 flex-wrap">
<span>{threadId}</span>
{sse.connected && (
<span
className="text-xs font-medium px-2 py-0.5 rounded"
style={{ background: "var(--color-success)", color: "var(--color-bg)" }}
>
Live
</span>
)}
</h2>
{actionStatus && (
<p className="text-xs mb-4" style={{ color: "var(--color-text-muted)" }}>
{actionStatus}
</p>
)}
{status === "loading" && !liveActive && records.length === 0 && (
<p style={{ color: "var(--color-text-muted)" }}>Loading...</p>
)}
{status === "error" && !liveActive && (
<p style={{ color: "var(--color-error)" }}>Error: {error}</p>
)}
{(status === "ok" || liveActive || records.length > 0) && (
<div className="space-y-3">
{data.records.map((r, i) => (
{records.map((r) => (
<div
key={i}
key={`${threadId}-${r.type}-${String(r.timestamp)}-${r.role ?? ""}-${r.content ?? ""}`}
className="p-3 rounded border text-sm"
style={{ background: "var(--color-surface)", borderColor: "var(--color-border)" }}
>
@@ -42,19 +119,23 @@ export function ThreadDetail({ threadId, onBack }: Props) {
{r.role}
</span>
)}
{r.timestamp && (
{r.timestamp !== null && (
<span className="text-xs ml-auto" style={{ color: "var(--color-text-muted)" }}>
{new Date(r.timestamp).toLocaleTimeString()}
</span>
)}
</div>
{r.content && (
<pre className="whitespace-pre-wrap text-xs mt-1" style={{ color: "var(--color-text)" }}>
<pre
className="whitespace-pre-wrap text-xs mt-1"
style={{ color: "var(--color-text)" }}
>
{typeof r.content === "string" ? r.content : JSON.stringify(r.content, null, 2)}
</pre>
)}
</div>
))}
<div ref={recordsEndRef} aria-hidden />
</div>
)}
</div>
@@ -8,7 +8,8 @@ type Props = {
export function ThreadList({ onSelect }: Props) {
const { status, data, error } = useFetch(() => listThreads(), []);
if (status === "loading") return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading threads...</p>;
if (status === "error") return <p style={{ color: "var(--color-error)" }}>Error: {error}</p>;
const threads = data.threads;
@@ -22,6 +23,7 @@ export function ThreadList({ onSelect }: Props) {
<div className="space-y-2">
{threads.map((t) => (
<button
type="button"
key={t.threadId}
onClick={() => onSelect(t.threadId)}
className="w-full text-left p-4 rounded-lg border transition-colors hover:border-[var(--color-accent-dim)]"
@@ -4,7 +4,8 @@ import { useFetch } from "../hooks.ts";
export function WorkflowList() {
const { status, data, error } = useFetch(() => listWorkflows(), []);
if (status === "loading") return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
if (status === "loading")
return <p style={{ color: "var(--color-text-muted)" }}>Loading workflows...</p>;
if (status === "error") return <p style={{ color: "var(--color-error)" }}>Error: {error}</p>;
const workflows = data.workflows;
@@ -28,7 +29,10 @@ export function WorkflowList() {
{w.versions} version{w.versions !== 1 ? "s" : ""}
</span>
</div>
<code className="text-xs mt-1 block font-mono" style={{ color: "var(--color-accent)" }}>
<code
className="text-xs mt-1 block font-mono"
style={{ color: "var(--color-accent)" }}
>
{w.currentHash}
</code>
</div>
+1
View File
@@ -30,6 +30,7 @@ export function useFetch<T>(fetcher: () => Promise<T>, deps: unknown[] = []): Fe
return () => {
cancelled = true;
};
// biome-ignore lint/correctness/useExhaustiveDependencies: this helper intentionally accepts caller-provided dependency arrays
}, deps);
return state;
+64
View File
@@ -0,0 +1,64 @@
import { useCallback, useEffect, useState } from "react";
type View = "threads" | "workflows";
type HashRoute = {
view: View;
threadId: string | null;
};
function parseHash(hash: string): HashRoute {
const raw = hash.replace(/^#\/?/, "");
if (raw.startsWith("threads/")) {
const id = raw.slice("threads/".length);
if (id.length > 0) {
return { view: "threads", threadId: id };
}
}
if (raw === "workflows") {
return { view: "workflows", threadId: null };
}
return { view: "threads", threadId: null };
}
function buildHash(route: HashRoute): string {
if (route.view === "workflows") {
return "#workflows";
}
if (route.threadId !== null) {
return `#threads/${route.threadId}`;
}
return "#threads";
}
export function useHashRoute(): {
view: View;
threadId: string | null;
setView: (v: View) => void;
setThreadId: (id: string | null) => void;
} {
const [route, setRoute] = useState<HashRoute>(() => parseHash(window.location.hash));
useEffect(() => {
function onHashChange(): void {
setRoute(parseHash(window.location.hash));
}
window.addEventListener("hashchange", onHashChange);
return () => window.removeEventListener("hashchange", onHashChange);
}, []);
const navigate = useCallback((next: HashRoute) => {
const hash = buildHash(next);
window.location.hash = hash;
setRoute(next);
}, []);
const setView = useCallback((v: View) => navigate({ view: v, threadId: null }), [navigate]);
const setThreadId = useCallback(
(id: string | null) => navigate({ view: "threads", threadId: id }),
[navigate],
);
return { view: route.view, threadId: route.threadId, setView, setThreadId };
}
+161
View File
@@ -0,0 +1,161 @@
import {
type Dispatch,
type MutableRefObject,
type SetStateAction,
useEffect,
useRef,
useState,
} from "react";
import type { ThreadRecord } from "./api.ts";
export type UseSSEReturn = {
records: ThreadRecord[];
connected: boolean;
completed: boolean;
};
function isWorkflowResult(record: ThreadRecord): boolean {
return record.type === "workflow-result";
}
function parseRecord(data: string): ThreadRecord | null {
try {
return JSON.parse(data) as ThreadRecord;
} catch {
return null;
}
}
type RecordEventContext = {
cancelled: boolean;
completedRef: MutableRefObject<boolean>;
setRecords: Dispatch<SetStateAction<ThreadRecord[]>>;
setCompleted: (value: boolean) => void;
setConnected: (value: boolean) => void;
cleanupEs: () => void;
};
function handleRecordEvent(ev: Event, ctx: RecordEventContext): void {
if (ctx.cancelled) {
return;
}
const msg = ev as MessageEvent;
const raw = typeof msg.data === "string" ? msg.data : "";
const parsed = parseRecord(raw);
if (parsed === null) {
return;
}
ctx.setRecords((prev) => [...prev, parsed]);
if (!isWorkflowResult(parsed)) {
return;
}
ctx.completedRef.current = true;
ctx.setCompleted(true);
ctx.setConnected(false);
ctx.cleanupEs();
}
export function useSSE(threadId: string | null): UseSSEReturn {
const [records, setRecords] = useState<ThreadRecord[]>([]);
const [connected, setConnected] = useState(false);
const [completed, setCompleted] = useState(false);
const completedRef = useRef(false);
const reconnectAttemptsRef = useRef(0);
useEffect(() => {
if (threadId === null) {
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
return;
}
const tid = threadId;
completedRef.current = false;
reconnectAttemptsRef.current = 0;
setRecords([]);
setConnected(false);
setCompleted(false);
let es: EventSource | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let cancelled = false;
function cleanupEs(): void {
if (es !== null) {
es.close();
es = null;
}
}
function scheduleReconnect(): void {
if (cancelled || completedRef.current) {
return;
}
const delayMs = Math.min(1000 * 2 ** reconnectAttemptsRef.current, 8000);
reconnectAttemptsRef.current += 1;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (!cancelled && !completedRef.current) {
connect();
}
}, delayMs);
}
function connect(): void {
if (cancelled || completedRef.current) {
return;
}
cleanupEs();
const url = `/api/threads/${encodeURIComponent(tid)}/live`;
es = new EventSource(url);
es.onopen = () => {
if (cancelled) {
return;
}
reconnectAttemptsRef.current = 0;
setConnected(true);
setRecords([]);
};
es.addEventListener("record", (ev: Event) =>
handleRecordEvent(ev, {
cancelled,
completedRef,
setRecords,
setCompleted,
setConnected,
cleanupEs,
}),
);
es.onerror = () => {
if (cancelled || completedRef.current) {
return;
}
setConnected(false);
cleanupEs();
scheduleReconnect();
};
}
connect();
return () => {
cancelled = true;
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
}
cleanupEs();
};
}, [threadId]);
return { records, connected, completed };
}
+1
View File
@@ -2,6 +2,7 @@ import tailwindcss from "@tailwindcss/vite";
import react from "@vitejs/plugin-react";
import { defineConfig } from "vite";
// biome-ignore lint/style/noDefaultExport: Vite loads config from default export.
export default defineConfig({
plugins: [react(), tailwindcss()],
server: {
+1 -1
View File
@@ -6,5 +6,5 @@
"composite": true
},
"include": ["src/**/*.ts"],
"references": [{ "path": "../workflow" }, { "path": "../workflow-util-agent" }]
"references": [{ "path": "../workflow-runtime" }, { "path": "../workflow-util-agent" }]
}
+1 -1
View File
@@ -6,5 +6,5 @@
"composite": true
},
"include": ["src/**/*.ts"],
"references": [{ "path": "../workflow" }, { "path": "../workflow-util-agent" }]
"references": [{ "path": "../workflow-runtime" }, { "path": "../workflow-util-agent" }]
}
@@ -1,16 +1,9 @@
import { describe, expect, test } from "bun:test";
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow";
import { START, type ThreadContext } from "@uncaged/workflow-runtime";
import { type AgentContext, START } from "@uncaged/workflow-runtime";
import { createLlmAdapter } from "../src/create-llm-adapter.js";
const casDir = mkdtempSync(join(tmpdir(), "wf-llm-adapter-cas-"));
const testCas = createCasStore(casDir);
function makeCtx(userContent: string): ThreadContext {
function makeCtx(userContent: string): AgentContext {
return {
start: {
role: START,
@@ -22,7 +15,6 @@ function makeCtx(userContent: string): ThreadContext {
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
cas: testCas,
};
}
-1
View File
@@ -8,7 +8,6 @@
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow": "workspace:*",
"@uncaged/workflow-runtime": "workspace:*"
}
}
+1 -1
View File
@@ -6,5 +6,5 @@
"composite": true
},
"include": ["src/**/*.ts"],
"references": [{ "path": "../workflow" }]
"references": [{ "path": "../workflow-runtime" }]
}
+20
View File
@@ -0,0 +1,20 @@
{
"name": "@uncaged/workflow-cas",
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./src/index.ts"
}
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*",
"@uncaged/workflow-util": "workspace:*",
"xxhashjs": "^0.2.2",
"yaml": "^2.7.1"
},
"devDependencies": {
"@types/bun": "latest"
}
}
@@ -2,7 +2,7 @@ import { Buffer } from "node:buffer";
import XXH from "xxhashjs";
import { encodeUint64AsCrockford } from "../util/index.js";
import { encodeUint64AsCrockford } from "@uncaged/workflow-util";
function digestToUint64(digest: { toString(radix?: number): string }): bigint {
const hex = digest.toString(16).padStart(16, "0");
@@ -1,4 +1,4 @@
export type { CasStore } from "@uncaged/workflow-runtime";
export type { CasStore } from "@uncaged/workflow-protocol";
export type MerkleNodeType = "content" | "step" | "thread";
+12
View File
@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" }
]
}
+29
View File
@@ -0,0 +1,29 @@
{
"name": "@uncaged/workflow-execute",
"version": "0.2.0",
"type": "module",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./src/index.ts"
}
},
"scripts": {
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*",
"@uncaged/workflow-runtime": "workspace:*",
"@uncaged/workflow-util": "workspace:*",
"@uncaged/workflow-cas": "workspace:*",
"@uncaged/workflow-reactor": "workspace:*",
"@uncaged/workflow-register": "workspace:*",
"yaml": "^2.7.1"
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"zod": "^4.0.0"
}
}
@@ -0,0 +1,8 @@
/**
* Re-export of {@link createWorkflow} from `@uncaged/workflow-runtime`.
*
* The runtime's `createWorkflow` already binds role definitions + agents to a workflow loop
* and delegates structured meta extraction to `WorkflowRuntime.extract`, which the engine
* supplies (resolved from the `extract` scene in workflow.yaml).
*/
export { createWorkflow } from "@uncaged/workflow-runtime";
@@ -2,31 +2,35 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type {
LlmProvider,
ThreadInput,
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowFnOptions,
WorkflowResult,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { START } from "@uncaged/workflow-runtime";
import {
type CasStore,
getContentMerklePayload,
putStepMerkleNode,
putThreadMerkleNode,
} from "../cas/index.js";
import { resolveModel } from "../config/index.js";
} from "@uncaged/workflow-cas";
import { resolveModel } from "@uncaged/workflow-register";
import { createExtract } from "../extract/index.js";
import { readWorkflowRegistry, type WorkflowConfig } from "../registry/index.js";
import { err, type LogFn, normalizeRefsField, ok, type Result } from "../util/index.js";
import { readWorkflowRegistry, type WorkflowConfig } from "@uncaged/workflow-register";
import { err, type LogFn, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import { runSupervisor } from "./supervisor.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
async function resolveEngineRegistryRuntime(storageRoot: string): Promise<
async function resolveEngineRegistryRuntime(
storageRoot: string,
cas: CasStore,
): Promise<
Result<
{
extract: ReturnType<typeof createExtract>;
llmProvider: LlmProvider;
workflowConfig: WorkflowConfig;
},
string
@@ -50,7 +54,7 @@ async function resolveEngineRegistryRuntime(storageRoot: string): Promise<
apiKey: ex.apiKey,
model: ex.model,
};
return ok({ extract: createExtract(llmProvider), llmProvider, workflowConfig: cfg });
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
@@ -104,7 +108,7 @@ async function finalizeAbortedThread(params: {
async function maybeSupervisorHaltsThread(params: {
workflowConfig: WorkflowConfig;
input: ThreadInput;
thread: ThreadContext;
written: number;
recentSupervisorSteps: readonly { role: string; summary: string }[];
logger: LogFn;
@@ -119,7 +123,7 @@ async function maybeSupervisorHaltsThread(params: {
}
const sup = await runSupervisor({
config: params.workflowConfig,
prompt: params.input.prompt,
prompt: params.thread.start.content,
recentSteps: params.recentSupervisorSteps,
logger: params.logger,
});
@@ -144,8 +148,8 @@ async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
workflowConfig: WorkflowConfig;
input: ThreadInput;
bundleOptions: WorkflowFnOptions;
thread: ThreadContext;
runtime: WorkflowRuntime;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
@@ -157,8 +161,8 @@ async function driveWorkflowGenerator(params: {
fn,
workflowName,
workflowConfig,
input,
bundleOptions,
thread,
runtime,
executeOptions,
dataJsonlPath,
threadId,
@@ -166,9 +170,9 @@ async function driveWorkflowGenerator(params: {
cas,
stepMerkleHashes,
} = params;
const gen = fn(input, bundleOptions);
const gen = fn(thread, runtime);
let written = 0;
const recentSupervisorSteps: { role: string; summary: string }[] = input.steps.map((s) => ({
const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({
role: s.role,
summary: JSON.stringify(s.meta),
}));
@@ -268,7 +272,7 @@ async function driveWorkflowGenerator(params: {
const supervised = await maybeSupervisorHaltsThread({
workflowConfig,
input,
thread,
written,
recentSupervisorSteps,
logger,
@@ -290,7 +294,7 @@ async function driveWorkflowGenerator(params: {
export async function executeThread(
fn: WorkflowFn,
workflowName: string,
input: ThreadInput,
input: { prompt: string; steps: RoleOutput[] },
options: ExecuteThreadOptions,
io: ExecuteThreadIo,
logger: LogFn,
@@ -367,26 +371,40 @@ export async function executeThread(
});
}
const registryRuntime = await resolveEngineRegistryRuntime(options.storageRoot);
const registryRuntime = await resolveEngineRegistryRuntime(options.storageRoot, io.cas);
if (!registryRuntime.ok) {
throw new Error(registryRuntime.error);
}
const bundleOptions: WorkflowFnOptions = {
const thread: ThreadContext = {
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
start: {
role: START,
content: input.prompt,
meta: { maxRounds: options.maxRounds },
timestamp: nowMs,
},
steps: input.steps.map((out, i) => ({
role: out.role,
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: prefilled?.[i]?.timestamp ?? nowMs + i,
})),
};
const runtime: WorkflowRuntime = {
cas: io.cas,
extract: registryRuntime.value.extract,
llmProvider: registryRuntime.value.llmProvider,
};
return await driveWorkflowGenerator({
fn,
workflowName,
workflowConfig: registryRuntime.value.workflowConfig,
input,
bundleOptions,
thread,
runtime,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
@@ -1,5 +1,5 @@
import type { WorkflowCompletion } from "@uncaged/workflow-runtime";
import { err, normalizeRefsField, ok, type Result } from "../util/index.js";
import { err, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import type { ForkHistoricalStep, ForkPlan, ParsedThreadStartRecord } from "./types.js";
@@ -1,7 +1,7 @@
import { readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "../cas/index.js";
import { err, getGlobalCasDir, ok, type Result } from "../util/index.js";
import { type CasStore, createCasStore } from "@uncaged/workflow-cas";
import { err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow-util";
import { parseThreadDataJsonl } from "./fork-thread.js";
import type { GcResult } from "./types.js";
@@ -0,0 +1,85 @@
import * as z from "zod/v4";
import { resolveModel } from "@uncaged/workflow-register";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import type { SupervisorDecision } from "./types.js";
const SUPERVISOR_RECENT_STEP_LIMIT = 12;
const SUPERVISOR_MAX_REACT_ROUNDS = 4;
const supervisorDecisionSchema = z
.object({
decision: z.enum(["continue", "stop"]),
})
.meta({
title: "supervisor_decision",
description:
'Workflow supervisor decision. "continue" when the thread is making progress; "stop" when done, looping, or stuck.',
});
type SupervisorThreadContext = Record<string, never>;
type RunSupervisorArgs = {
config: WorkflowConfig;
prompt: string;
recentSteps: readonly { role: string; summary: string }[];
logger: LogFn;
};
function buildSupervisorInput(args: RunSupervisorArgs): string {
const recent = args.recentSteps.slice(-SUPERVISOR_RECENT_STEP_LIMIT);
const stepsBlock = recent.map((s, index) => `${index + 1}. [${s.role}] ${s.summary}`).join("\n");
return `Original task:\n${args.prompt}\n\nRecent steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}`;
}
/** Calls the `supervisor` scene via {@link createThreadReactor}; opt-out when {@link resolveModel} fails (returns ok(`continue`)). */
export async function runSupervisor(
args: RunSupervisorArgs,
): Promise<Result<SupervisorDecision, string>> {
const resolved = resolveModel(args.config, "supervisor");
if (!resolved.ok) {
return ok("continue");
}
const reactor = createThreadReactor<SupervisorThreadContext>({
llm: createLlmFn(resolved.value),
maxRounds: SUPERVISOR_MAX_REACT_ROUNDS,
staticTools: [],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You supervise a multi-step workflow. Decide whether the thread should keep running or halt. Reply with "continue" when the thread is making progress toward the task, or "stop" when it is finished, looping, or no longer making progress. Call the ${structuredToolName} tool with JSON arguments matching the schema, or reply with only a JSON object such as {"decision":"stop"}.`,
toolHandler: async (call) => `Unknown tool: ${call.function.name}`,
});
const result = await reactor({
thread: {} as SupervisorThreadContext,
input: buildSupervisorInput(args),
schema: supervisorDecisionSchema,
});
if (!result.ok) {
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
return err(`supervisor: ${result.error}`);
}
const decision: SupervisorDecision = result.value.decision;
args.logger("Z8KM5QWT", `supervisor says ${decision}`);
return ok(decision);
}
@@ -1,4 +1,4 @@
import { err, ok, type Result } from "../util/index.js";
import { err, ok, type Result } from "@uncaged/workflow-util";
import type { ThreadPauseGate } from "./types.js";
@@ -1,6 +1,6 @@
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { CasStore } from "../cas/index.js";
import type { Result } from "../util/index.js";
import type { CasStore } from "@uncaged/workflow-cas";
import type { Result } from "@uncaged/workflow-util";
export type SupervisorDecision = "continue" | "stop";
@@ -23,7 +23,7 @@ export type PrefilledDiskStep = {
export type ExecuteThreadOptions = {
maxRounds: number;
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
/** Passed to the bundle thread context as `ThreadContext.depth`. */
depth: number;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
@@ -2,8 +2,8 @@ import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import type { RoleOutput, WorkflowFn, WorkflowResult } from "@uncaged/workflow-runtime";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "../bundle/index.js";
import { createCasStore } from "../cas/index.js";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import {
createLogger,
err,
@@ -11,7 +11,7 @@ import {
normalizeRefsField,
ok,
type Result,
} from "../util/index.js";
} from "@uncaged/workflow-util";
import { executeThread } from "./engine.js";
import { createThreadPauseGate } from "./thread-pause-gate.js";
import type { ExecuteThreadIo, PrefilledDiskStep, ThreadPauseGate } from "./types.js";
@@ -0,0 +1,136 @@
import type { ExtractContext, ExtractFn, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
export type ExtractDeps = {
cas: CasStore;
};
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
export type ExtractThreadContext = {
cas: CasStore;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
/** Builds the user-side extraction prompt (thread + agent output + instruction). */
export async function buildExtractUserContent(
ctx: ExtractContext,
prompt: string,
deps: ExtractDeps,
): Promise<string> {
const lines: string[] = [];
lines.push(`## Role: ${ctx.currentRole.name}`);
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
lines.push("## Task");
lines.push(ctx.start.content);
lines.push("");
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(deps.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
lines.push(`### ${step.role}`);
lines.push(body);
lines.push(`Meta: ${JSON.stringify(step.meta)}`);
lines.push("");
}
}
lines.push("## Agent Output");
lines.push(ctx.agentContent);
lines.push("");
lines.push("## Extraction Instruction");
lines.push(prompt);
return lines.join("\n");
}
/**
* Create an ExtractFn backed by an LLM provider.
*
* Internally runs a multi-turn ReAct loop with two tools (`cas_get` for traversing the
* Merkle DAG and a schema-shaped extract tool); the loop also accepts a plain-JSON
* assistant reply as a short-circuit, which covers the legacy "single" extraction path.
*/
export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn {
const llm = createLlmFn(provider);
const reactor = createThreadReactor<ExtractThreadContext>({
llm,
maxRounds: MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${structuredToolName} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires a JSON object with a string "hash" field.';
}
hash = ta.hash;
} catch {
return 'cas_get arguments were not valid JSON. Provide {"hash": "<cas-hash>"}.';
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
const text = await buildExtractUserContent(ctx, prompt, deps);
const result = await reactor({
thread: { cas: deps.cas },
input: text,
schema,
});
if (!result.ok) {
throw new Error(`extract failed: ${result.error}`);
}
return result.value;
};
}
@@ -1,17 +1,11 @@
export {
buildExtractUserContent,
createExtract,
type ExtractThreadContext,
} from "./extract-fn.js";
export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
llmExtractWithRetry,
} from "./llm-extract.js";
export { reactExtract } from "./react-extract.js";
export type {
ExtractFn,
LlmError,
LlmExtractArgs,
ReactExtractArgs,
} from "./types.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./types.js";
@@ -1,6 +1,6 @@
import * as z from "zod/v4";
import { err, ok, type Result } from "../util/index.js";
import { err, ok, type Result } from "@uncaged/workflow-util";
import type { LlmError, LlmExtractArgs } from "./types.js";
@@ -92,20 +92,6 @@ function readToolArgumentsJson(parsed: unknown, previewSource: string): Result<s
return ok(argsRaw);
}
function isRetryableExtractError(error: LlmError): boolean {
return error.kind === "schema_validation_failed" || error.kind === "tool_arguments_invalid_json";
}
function describeRetryHint(error: LlmError): string {
if (error.kind === "schema_validation_failed") {
return `Schema validation failed: ${error.message}`;
}
if (error.kind === "tool_arguments_invalid_json") {
return `Tool arguments were not valid JSON: ${error.message}`;
}
return JSON.stringify(error);
}
export function llmErrorToCause(error: LlmError): Error {
switch (error.kind) {
case "http_error":
@@ -206,40 +192,3 @@ async function performLlmExtract<T>(
export async function llmExtract<T>(options: LlmExtractArgs<T>): Promise<Result<T, LlmError>> {
return performLlmExtract({ ...options, userContent: options.text });
}
/**
* Runs extract up to two times: on the first schema/tool-args parse failure, resends the agent
* output plus the error so the model can correct the tool call.
*/
export async function llmExtractWithRetry<T>(
options: LlmExtractArgs<T>,
): Promise<Result<T, LlmError>> {
const first = await performLlmExtract({
...options,
userContent: options.text,
});
if (first.ok) {
return first;
}
if (!isRetryableExtractError(first.error)) {
return first;
}
const hint = describeRetryHint(first.error);
const correction = `The previous extraction attempt failed.
${hint}
Respond again with a single tool call whose \`arguments\` JSON strictly matches the schema.`;
const secondContent = `${options.text}
---
${correction}`;
return performLlmExtract({
...options,
userContent: secondContent,
});
}
@@ -1,15 +1,8 @@
import type { CasStore, LlmProvider } from "@uncaged/workflow-runtime";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
export type { ExtractFn } from "@uncaged/workflow-runtime";
export type ReactExtractArgs<T extends Record<string, unknown>> = {
text: string;
schema: z.ZodType<T>;
provider: LlmProvider;
cas: CasStore;
};
export type LlmExtractArgs<T> = {
text: string;
schema: z.ZodType<T>;
+35
View File
@@ -0,0 +1,35 @@
export { createWorkflow } from "./engine/create-workflow.js";
export { executeThread } from "./engine/engine.js";
export {
buildForkPlan,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
tryParseWorkflowResultRecord,
} from "./engine/fork-thread.js";
export { garbageCollectCas } from "./engine/gc.js";
export { createThreadPauseGate } from "./engine/thread-pause-gate.js";
export type {
ExecuteThreadIo,
ExecuteThreadOptions,
ForkHistoricalStep,
ForkPlan,
GcResult,
ParsedThreadStartRecord,
PrefilledDiskStep,
SupervisorDecision,
ThreadPauseGate,
} from "./engine/types.js";
export { getWorkerHostScriptPath } from "./engine/worker-entry-path.js";
export {
buildExtractUserContent,
createExtract,
type ExtractThreadContext,
} from "./extract/index.js";
export {
extractFunctionToolFromZodSchema,
llmErrorToCause,
llmExtract,
} from "./extract/index.js";
export type { ExtractFn, LlmError, LlmExtractArgs } from "./extract/index.js";
export { workflowAsAgent, type WorkflowAsAgentOptions } from "./workflow-as-agent.js";
@@ -1,17 +1,17 @@
import { join } from "node:path";
import type { AgentContext, AgentFn, ThreadInput } from "@uncaged/workflow-runtime";
import { extractBundleExports } from "./bundle/index.js";
import { createCasStore } from "./cas/index.js";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import { extractBundleExports } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
import type { WorkflowConfig } from "./registry/index.js";
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry/index.js";
import type { WorkflowConfig } from "@uncaged/workflow-register";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow-register";
import {
createLogger,
generateUlid,
getDefaultWorkflowStorageRoot,
getGlobalCasDir,
} from "./util/index.js";
} from "@uncaged/workflow-util";
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
@@ -36,7 +36,7 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
*/
export function workflowAsAgent(
workflowName: string,
@@ -68,7 +68,7 @@ export function workflowAsAgent(
return `ERROR: ${bundleExportsResult.error}`;
}
const input: ThreadInput = {
const input = {
prompt: ctx.start.content,
steps: [],
};
+16
View File
@@ -0,0 +1,16 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-runtime" },
{ "path": "../workflow-util" },
{ "path": "../workflow-cas" },
{ "path": "../workflow-reactor" },
{ "path": "../workflow-register" }
]
}
+18
View File
@@ -0,0 +1,18 @@
{
"name": "@uncaged/workflow-protocol",
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./src/index.ts"
}
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"zod": "^4.0.0",
"typescript": "^5.8.3"
}
}
+40
View File
@@ -0,0 +1,40 @@
// ── Types ──────────────────────────────────────────────────────────
export type {
Result,
CasStore,
WorkflowRoleSchema,
WorkflowRoleDescriptor,
WorkflowDescriptor,
RoleMeta,
RoleOutput,
StartStep,
RoleStep,
ThreadContext,
ModeratorContext,
AgentContext,
ExtractContext,
WorkflowCompletion,
WorkflowResult,
LlmProvider,
ProviderConfig,
ResolvedModel,
WorkflowConfig,
ExtractFn,
AgentFn,
AgentBinding,
WorkflowRuntime,
WorkflowFn,
RoleDefinition,
Moderator,
WorkflowDefinition,
AdvanceOutcome,
} from "./types.js";
// ── Constants ──────────────────────────────────────────────────────
export { START, END } from "./types.js";
// ── Constructor functions ──────────────────────────────────────────
export { ok, err } from "./result.js";
@@ -1,9 +1,9 @@
import type { Result } from "./types.js";
export function ok<T>(value: T): Result<T, never> {
return { ok: true, value };
return { ok: true, value };
}
export function err<E>(error: E): Result<never, E> {
return { ok: false, error };
return { ok: false, error };
}
+167
View File
@@ -0,0 +1,167 @@
import type * as z from "zod/v4";
// ── Constants ──────────────────────────────────────────────────────
export const START = "__start__" as const;
export const END = "__end__" as const;
// ── Result ─────────────────────────────────────────────────────────
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
// ── CAS ────────────────────────────────────────────────────────────
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
// ── Workflow Descriptor ────────────────────────────────────────────
export type WorkflowRoleSchema = Record<string, unknown>;
export type WorkflowRoleDescriptor = {
description: string;
schema: WorkflowRoleSchema;
};
export type WorkflowDescriptor = {
description: string;
roles: Record<string, WorkflowRoleDescriptor>;
};
// ── Role & Thread ──────────────────────────────────────────────────
export type RoleMeta = Record<string, Record<string, unknown>>;
export type RoleOutput = {
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
};
export type StartStep = {
role: typeof START;
content: string;
meta: { maxRounds: number };
timestamp: number;
};
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: {
role: K;
meta: M[K];
contentHash: string;
refs: string[];
timestamp: number;
};
}[keyof M & string];
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
export type ModeratorContext<M extends RoleMeta = RoleMeta> = ThreadContext<M>;
export type AgentContext<M extends RoleMeta = RoleMeta> = ModeratorContext<M> & {
currentRole: {
name: string;
systemPrompt: string;
};
};
export type ExtractContext<M extends RoleMeta = RoleMeta> = AgentContext<M> & {
agentContent: string;
};
// ── Workflow Completion ────────────────────────────────────────────
export type WorkflowCompletion = {
returnCode: number;
summary: string;
};
export type WorkflowResult = WorkflowCompletion & {
rootHash: string;
};
// ── LLM Provider ───────────────────────────────────────────────────
export type LlmProvider = {
baseUrl: string;
apiKey: string;
model: string;
};
export type ProviderConfig = {
baseUrl: string;
apiKey: string;
};
export type ResolvedModel = {
baseUrl: string;
apiKey: string;
model: string;
};
export type WorkflowConfig = {
maxDepth: number;
supervisorInterval: number;
providers: Record<string, ProviderConfig>;
models: Record<string, string>;
};
// ── Functions ──────────────────────────────────────────────────────
export type ExtractFn = <T extends Record<string, unknown>>(
schema: z.ZodType<T>,
prompt: string,
ctx: ExtractContext,
) => Promise<T>;
export type AgentFn = (ctx: AgentContext) => Promise<string>;
export type AgentBinding = {
agent: AgentFn;
overrides: Partial<Record<string, AgentFn>> | null;
};
// ── Workflow Runtime & Definition ──────────────────────────────────
export type WorkflowRuntime = {
cas: CasStore;
extract: ExtractFn;
};
export type WorkflowFn = (
thread: ThreadContext,
runtime: WorkflowRuntime,
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
export type RoleDefinition<Meta extends Record<string, unknown>> = {
description: string;
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
extractRefs: ((meta: Meta) => string[]) | null;
};
export type Moderator<M extends RoleMeta> = (
ctx: ModeratorContext<M>,
) => (keyof M & string) | typeof END;
export type WorkflowDefinition<M extends RoleMeta> = {
description: string;
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
export type AdvanceOutcome<M extends RoleMeta> =
| { kind: "complete"; completion: WorkflowCompletion }
| { kind: "yield"; output: RoleOutput; step: RoleStep<M> };
+8
View File
@@ -0,0 +1,8 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"]
}
+21
View File
@@ -0,0 +1,21 @@
{
"name": "@uncaged/workflow-reactor",
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./src/index.ts"
}
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*"
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"zod": "^4.0.0",
"typescript": "^5.8.3"
}
}
+12
View File
@@ -0,0 +1,12 @@
export { createLlmFn } from "./llm-fn.js";
export { createThreadReactor } from "./thread-reactor.js";
export type {
ChatMessage,
LlmFn,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ThreadReactorInvokeArgs,
ToolCall,
ToolDefinition,
} from "./types.js";
+48
View File
@@ -0,0 +1,48 @@
import type { LlmProvider } from "@uncaged/workflow-protocol";
import { err, ok } from "@uncaged/workflow-protocol";
import type { ChatMessage, LlmFn, ToolDefinition } from "./types.js";
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
/**
* Wraps provider credentials into an {@link LlmFn}: single POST to chat/completions,
* returns raw JSON body text or a {@link Result} error. Callers parse assistant messages.
*/
export function createLlmFn(provider: LlmProvider): LlmFn {
return async ({
messages,
tools,
}: {
messages: ChatMessage[];
tools: readonly ToolDefinition[];
}) => {
try {
const response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: provider.model,
messages,
tools,
tool_choice: "auto",
}),
});
const responseText = await response.text();
if (!response.ok) {
return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`);
}
return ok(responseText);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`network_error:${message}`);
}
};
}
@@ -0,0 +1,317 @@
import type * as z from "zod/v4";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import type {
ChatMessage,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ToolCall,
ToolDefinition,
} from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function tryParseJsonContent(content: string): unknown | null {
const trimmed = content.trim();
const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed);
const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed;
try {
return JSON.parse(payload) as unknown;
} catch {
return null;
}
}
function firstAssistantMessage(responseText: string): Result<Record<string, unknown>, string> {
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`invalid_response_json:${message}`);
}
if (!isRecord(parsed)) {
return err("invalid_response_top_level");
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return err("no_choices_in_response");
}
const firstChoice = choices[0];
if (!isRecord(firstChoice)) {
return err("invalid_choice");
}
const messageObj = firstChoice.message;
if (!isRecord(messageObj)) {
return err("invalid_message");
}
return ok(messageObj);
}
function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string> {
const toolCalls: ToolCall[] = [];
for (const tc of toolCallsRaw) {
if (!isRecord(tc)) {
return err("invalid_tool_call");
}
const id = tc.id;
const tcType = tc.type;
const fn = tc.function;
if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) {
return err("invalid_tool_call_shape");
}
const name = fn.name;
const argumentsStr = fn.arguments;
if (typeof name !== "string" || typeof argumentsStr !== "string") {
return err("invalid_tool_call_function");
}
toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } });
}
return ok(toolCalls);
}
type AssistantTurn<T> =
| { kind: "plain_json"; value: T }
| { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null };
type AssistantTurnOrCorrection<T> =
| AssistantTurn<T>
| { kind: "plain_json_invalid"; rawContent: string; correction: string };
function classifyAssistantTurn<T>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
structuredToolName: string,
): Result<AssistantTurnOrCorrection<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
if (typeof content !== "string") {
return err("no_tool_calls_and_no_string_content");
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
return ok({ kind: "plain_json", value: validated.data });
}
const callsResult = normalizeToolCalls(toolCallsRaw);
if (!callsResult.ok) {
return err(callsResult.error);
}
const assistantContent = messageObj.content;
return ok({
kind: "tool_calls",
calls: callsResult.value,
assistantContent: typeof assistantContent === "string" ? assistantContent : null,
});
}
function toolNamesFromDefinitions(tools: readonly { function: { name: string } }[]): Set<string> {
return new Set(tools.map((t) => t.function.name));
}
function appendStructuredToolResult<T>(
tc: ToolCall,
schema: z.ZodType<T>,
messages: ChatMessage[],
): T | null {
let parsedArgs: unknown;
try {
parsedArgs = JSON.parse(tc.function.arguments) as unknown;
} catch {
messages.push({
role: "tool",
tool_call_id: tc.id,
content:
"Tool arguments were not valid JSON. Provide valid JSON object arguments matching the schema.",
});
return null;
}
const validated = schema.safeParse(parsedArgs);
if (!validated.success) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Schema validation failed: ${validated.error.message}. Fix the arguments and call the tool again with a JSON object that matches the schema.`,
});
return null;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: '{"ok":true}',
});
return validated.data;
}
async function dispatchToolCall<T, TThread>(
tc: ToolCall,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<T | null> {
if (!knownNames.has(tc.function.name)) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Unknown tool: ${tc.function.name}. Use one of the declared tools only.`,
});
return null;
}
if (tc.function.name === spec.name) {
return appendStructuredToolResult(tc, schema, messages);
}
let toolContent: string;
try {
toolContent = await toolHandler(tc, thread);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
toolContent = `Tool execution failed: ${message}`;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: toolContent,
});
return null;
}
async function resolveToolCallRound<T, TThread>(
turn: Extract<AssistantTurn<T>, { kind: "tool_calls" }>,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
messages.push({
role: "assistant",
content: turn.assistantContent,
tool_calls: turn.calls,
});
let extractedRound: T | null = null;
for (const tc of turn.calls) {
const extracted = await dispatchToolCall(
tc,
spec,
knownNames,
schema,
thread,
toolHandler,
messages,
);
if (extracted !== null) {
extractedRound = extracted;
}
}
if (extractedRound !== null) {
return ok(extractedRound);
}
return null;
}
async function runOneReactRound<T, TThread>(
config: ThreadReactorConfig<TThread>,
args: { thread: TThread; schema: z.ZodType<T> },
tools: readonly ToolDefinition[],
knownNames: Set<string>,
spec: StructuredToolSpec,
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
const bodyResult = await config.llm({ messages, tools });
if (!bodyResult.ok) {
return bodyResult;
}
const msgResult = firstAssistantMessage(bodyResult.value);
if (!msgResult.ok) {
return msgResult;
}
const classified = classifyAssistantTurn(msgResult.value, args.schema, spec.name);
if (!classified.ok) {
return classified;
}
const turn = classified.value;
if (turn.kind === "plain_json") {
return ok(turn.value);
}
if (turn.kind === "plain_json_invalid") {
messages.push({ role: "assistant", content: turn.rawContent });
messages.push({ role: "user", content: turn.correction });
return null;
}
return resolveToolCallRound(
turn,
spec,
knownNames,
args.schema,
args.thread,
config.toolHandler,
messages,
);
}
/**
* Generic ReAct loop: LLM round-trips with tools until structured output validates,
* plain JSON matches schema, or {@link ThreadReactorConfig.maxRounds} is exceeded.
*/
export function createThreadReactor<TThread>(
config: ThreadReactorConfig<TThread>,
): ThreadReactorFn<TThread> {
return async <T>(args: {
thread: TThread;
input: string;
schema: z.ZodType<T>;
}): Promise<Result<T, string>> => {
const spec = config.structuredToolFromSchema(args.schema);
const tools = [...config.staticTools, spec.tool];
const knownNames = toolNamesFromDefinitions(tools);
const systemPrompt = config.systemPromptForStructuredTool(spec.name);
const messages: ChatMessage[] = [
{ role: "system", content: systemPrompt },
{ role: "user", content: args.input },
];
for (let round = 0; round < config.maxRounds; round++) {
const step = await runOneReactRound(
config,
{ thread: args.thread, schema: args.schema },
tools,
knownNames,
spec,
messages,
);
if (step !== null) {
return step;
}
}
return err("max_react_rounds_exceeded");
};
}

Some files were not shown because too many files have changed in this diff Show More