Compare commits

...

11 Commits

Author SHA1 Message Date
xingyue 8fe26417cf feat(cli): add --latest, --debug, --role flags to live command (#37 Phase 2)
- --latest: auto-find most recent thread by start timestamp
- --debug: display .info.jsonl debug log with tags
- --role: filter output to specific role
- Add live-argv.ts for flag parsing
- Add fixtures and test coverage for all flags

Testing: #50
2026-05-07 21:44:19 +08:00
xingyue 990200230b feat(cli): add live command for real-time thread monitoring (#37 Phase 1)
- Add cmd-live.ts: tail .data.jsonl with formatted output
- Display role steps with timestamp, role name, truncated content, meta
- fs.watch for running threads, auto-exit on completion
- Write WorkflowResult to .data.jsonl in worker.ts for completion detection
- Add live.test.ts with JSONL fixtures

Testing: #49
2026-05-07 21:42:32 +08:00
xiaoju 4eaefd9974 Merge pull request 'feat: tester role + develop workflow template' (#61) from feat/58-develop-workflow into main 2026-05-07 13:42:16 +00:00
xiaoju 1a685583bd feat: tester role + develop workflow template
- New workflow-role-tester: runs tests/build/lint, reports pass/fail
- Committer: removed push, only creates branch and commits
- New workflow-template-develop: planner → coder ⟲ → reviewer ⟲ → tester → committer
- 173 tests passing

Fixes #58
2026-05-07 13:42:01 +00:00
xiaomo 19769efea6 Merge pull request 'feat(cli): init command — scaffold workflow workspace' (#56) from feat/36-init-command into main 2026-05-07 13:37:56 +00:00
xiaoju 7f64541c5b Merge pull request 'feat: ReAct ExtractFn with tool-use' (#53) from feat/44-react-extract into main 2026-05-07 13:28:22 +00:00
xiaoju 43a6600378 feat: ReAct ExtractFn with tool-use
- RoleDefinition.extractMode: "single" | "react"
- reactExtract: multi-turn LLM with cas_get tool for DAG traversal
- Max 10 tool-call rounds, schema validation on final output
- create-workflow routes to reactExtract when extractMode is "react"
- All existing roles set to "single" (no behavior change)
- 162 tests passing

Fixes #44
2026-05-07 13:28:00 +00:00
xingyue 74e3f5434c feat(cli): complete AGENTS.md generation (#36 Phase 3)
- Replace placeholder with comprehensive coding agent instructions
- Covers: project structure, core concepts, dev workflow, coding
  conventions, template reuse, build/test, common pitfalls
- Add test coverage for AGENTS.md sections and terms

Testing: #48
2026-05-07 21:23:41 +08:00
xiaoju 220c9c5224 Merge pull request 'feat: global extract provider config' (#52) from feat/43-extract-provider-config into main 2026-05-07 13:21:57 +00:00
xingyue 703ac9dfcc feat(cli): add init template command (#36 Phase 2)
- Implement cmdInitTemplate: find workspace root, generate template package
- Generate roles.ts, moderator.ts, index.ts with hello-world boilerplate
- Detect workspace by walking up to find package.json with workspaces
- Error on existing template dir or outside workspace
- Add init-template.test.ts

Testing: #47
2026-05-07 21:21:23 +08:00
xingyue 2df8accf2f feat(cli): add init workspace command (#36 Phase 1)
- Add cmd-init.ts with cmdInitWorkspace and stub cmdInitTemplate
- Wire init subcommands into cli-dispatch.ts
- Generate monorepo skeleton: package.json (bun workspace), biome.json,
  tsconfig.json, AGENTS.md placeholder, README.md, templates/, workflows/
- Error on existing directory
- Add init-workspace.test.ts (all passing)

Testing: #46
2026-05-07 21:18:58 +08:00
50 changed files with 3240 additions and 106 deletions
+2
View File
@@ -29,6 +29,7 @@ const greeter: RoleDefinition<Roles["greeter"]> = {
extractPrompt: "Extract the greeting string produced for the user.",
schema: greeterMetaSchema,
extractRefs: null,
extractMode: "single",
};
const extract = createExtract({
@@ -48,4 +49,5 @@ export const run = createWorkflow<Roles>(
agent: async (ctx) => `Hello, ${ctx.start.content}`,
},
extract,
null,
);
@@ -0,0 +1,4 @@
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVECMPLT01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963400000}
{"role":"planner","contentHash":"FF7YQ5W3S2EV6","meta":{"phase":"plan","flags":[1,2]},"refs":[],"timestamp":1714963201000}
{"role":"coder","contentHash":"EN34XX1W4WAFJ","meta":{},"refs":[],"timestamp":1714963202000}
{"returnCode":0,"summary":"fixture completed"}
@@ -0,0 +1,2 @@
{"tag":"DEBUGTAG1","content":"bundle loaded","timestamp":1714963400050}
{"tag":"DEBUGTAG2","content":"multi\nline","timestamp":1714963400500}
@@ -0,0 +1,2 @@
{"name":"demo-live","hash":"C9NMV6V2TQT81","threadId":"01LIVEINFLY01DDDDDDDDDDDDG","parameters":{"prompt":"hello","options":{"maxRounds":5,"depth":0}},"timestamp":1714963200000}
{"role":"planner","contentHash":"P6M9FHE1GSBN0","meta":{"x":1},"refs":[],"timestamp":1714963201000}
@@ -0,0 +1,2 @@
{"name":"demo-live-old","hash":"C9NMV6V2TQT81","threadId":"01LIVEOLDER01DDDDDDDDDDDDG","parameters":{"prompt":"old","options":{"maxRounds":5,"depth":0}},"timestamp":1714963000000}
{"returnCode":0,"summary":"older thread"}
@@ -111,7 +111,7 @@ describe("cli fork", () => {
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 4);
await waitUntilMinDataLines(sourceData, 5);
const forked = await cmdFork(storageRoot, sourceId, "planner");
expect(forked.ok).toBe(true);
@@ -122,22 +122,22 @@ describe("cli fork", () => {
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 4);
await waitUntilMinDataLines(newData, 5);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(4);
expect(lines.length).toBe(5);
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(start.threadId).toBe(newId);
expect(start.forkFrom).toEqual({ threadId: sourceId });
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1");
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-1");
});
test("fork without --from-role retries last role", async () => {
@@ -162,7 +162,7 @@ describe("cli fork", () => {
const sourceData = join(storageRoot, "logs", hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 4);
await waitUntilMinDataLines(sourceData, 5);
const forked = await cmdFork(storageRoot, sourceId, null);
expect(forked.ok).toBe(true);
@@ -173,23 +173,23 @@ describe("cli fork", () => {
const newData = join(storageRoot, "logs", hash, `${newId}.data.jsonl`);
const newRunning = join(storageRoot, "logs", hash, `${newId}.running`);
await waitUntilRunningAbsent(newRunning);
await waitUntilMinDataLines(newData, 4);
await waitUntilMinDataLines(newData, 5);
const text = await readFile(newData, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(4);
expect(lines.length).toBe(5);
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(replayCoder.role).toBe("coder");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2");
const lastRoleLine = JSON.parse(lines[lines.length - 2] ?? "{}") as Record<string, unknown>;
expect(lastRoleLine.role).toBe("reviewer");
expect(await getContentMerklePayload(cas, String(lastRoleLine.contentHash))).toBe("rev-2");
});
test("fork rejects unknown role with available names", async () => {
@@ -213,7 +213,7 @@ describe("cli fork", () => {
const sourceData = join(storageRoot, "logs", added.value.hash, `${sourceId}.data.jsonl`);
const sourceRunning = join(storageRoot, "logs", added.value.hash, `${sourceId}.running`);
await waitUntilRunningAbsent(sourceRunning);
await waitUntilMinDataLines(sourceData, 4);
await waitUntilMinDataLines(sourceData, 5);
const bad = await cmdFork(storageRoot, sourceId, "ghost-role");
expect(bad.ok).toBe(false);
@@ -0,0 +1,142 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { runCli } from "../src/cli-dispatch.js";
import { cmdInitTemplate, cmdInitWorkspace } from "../src/cmd-init.js";
import { pathExists } from "../src/fs-utils.js";
describe("init template", () => {
let parent: string;
beforeEach(async () => {
parent = join(
tmpdir(),
`wf-init-template-${Date.now()}-${Math.random().toString(36).slice(2)}`,
);
await mkdir(parent, { recursive: true });
});
afterEach(async () => {
await rm(parent, { recursive: true, force: true });
});
test("creates templates/<name> with expected files", async () => {
const ws = await cmdInitWorkspace(parent, "my-workflows");
expect(ws.ok).toBe(true);
if (!ws.ok) {
return;
}
const root = ws.value.rootPath;
const created = await cmdInitTemplate(root, "review-pr");
expect(created.ok).toBe(true);
if (!created.ok) {
return;
}
const tdir = join(root, "templates", "review-pr");
expect(created.value.templatePath).toBe(tdir);
expect(await pathExists(join(tdir, "package.json"))).toBe(true);
expect(await pathExists(join(tdir, "tsconfig.json"))).toBe(true);
expect(await pathExists(join(tdir, "src", "roles.ts"))).toBe(true);
expect(await pathExists(join(tdir, "src", "moderator.ts"))).toBe(true);
expect(await pathExists(join(tdir, "src", "index.ts"))).toBe(true);
const pkg = JSON.parse(await readFile(join(tdir, "package.json"), "utf8")) as {
name: string;
type: string;
dependencies: Record<string, string>;
};
expect(pkg.type).toBe("module");
expect(pkg.dependencies["@uncaged/workflow"]).toBeDefined();
expect(pkg.dependencies.zod).toBeDefined();
expect(pkg.name).toContain("review-pr");
const idx = await readFile(join(tdir, "src", "index.ts"), "utf8");
expect(idx).toContain("WorkflowDefinition");
const roles = await readFile(join(tdir, "src", "roles.ts"), "utf8");
expect(roles).not.toContain("interface ");
expect(roles).not.toContain("?:");
expect(roles).not.toContain("export default");
const moder = await readFile(join(tdir, "src", "moderator.ts"), "utf8");
expect(moder).not.toContain("export default");
});
test("finds workspace walking up from nested cwd", async () => {
const ws = await cmdInitWorkspace(parent, "ws");
expect(ws.ok).toBe(true);
if (!ws.ok) {
return;
}
const root = ws.value.rootPath;
const nested = join(root, "a", "b");
await mkdir(nested, { recursive: true });
const created = await cmdInitTemplate(nested, "nested-tpl");
expect(created.ok).toBe(true);
if (!created.ok) {
return;
}
expect(await pathExists(join(root, "templates", "nested-tpl", "src", "index.ts"))).toBe(true);
});
test("errors when not inside a workflow workspace", async () => {
const orphan = join(parent, "nowhere");
await mkdir(orphan, { recursive: true });
const r = await cmdInitTemplate(orphan, "x");
expect(r.ok).toBe(false);
if (!r.ok) {
expect(r.error).toContain("templates/*");
}
});
test("errors when template directory already exists", async () => {
const ws = await cmdInitWorkspace(parent, "ws");
expect(ws.ok).toBe(true);
if (!ws.ok) {
return;
}
const root = ws.value.rootPath;
const first = await cmdInitTemplate(root, "dup");
expect(first.ok).toBe(true);
const second = await cmdInitTemplate(root, "dup");
expect(second.ok).toBe(false);
if (!second.ok) {
expect(second.error).toContain("already exists");
}
});
test("errors on invalid template name", async () => {
const ws = await cmdInitWorkspace(parent, "ws");
expect(ws.ok).toBe(true);
if (!ws.ok) {
return;
}
const bad = await cmdInitTemplate(ws.value.rootPath, "a/b");
expect(bad.ok).toBe(false);
});
test.serial("runCli init template uses cwd and succeeds in workspace", async () => {
const ws = await cmdInitWorkspace(parent, "cli-ws");
expect(ws.ok).toBe(true);
if (!ws.ok) {
return;
}
const root = ws.value.rootPath;
const prev = process.cwd();
try {
process.chdir(root);
const code = await runCli(join(parent, "_storage"), ["init", "template", "from-cli"]);
expect(code).toBe(0);
expect(await pathExists(join(root, "templates", "from-cli", "package.json"))).toBe(true);
} finally {
process.chdir(prev);
}
});
});
@@ -0,0 +1,152 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { formatCliUsage, runCli } from "../src/cli-dispatch.js";
import { cmdInitWorkspace } from "../src/cmd-init.js";
import { pathExists } from "../src/fs-utils.js";
describe("init workspace", () => {
let parent: string;
beforeEach(async () => {
parent = join(tmpdir(), `wf-init-${Date.now()}-${Math.random().toString(36).slice(2)}`);
await mkdir(parent, { recursive: true });
});
afterEach(async () => {
await rm(parent, { recursive: true, force: true });
});
test("creates expected files and directories", async () => {
const created = await cmdInitWorkspace(parent, "my-workflows");
expect(created.ok).toBe(true);
if (!created.ok) {
return;
}
const root = created.value.rootPath;
expect(await pathExists(join(root, "package.json"))).toBe(true);
expect(await pathExists(join(root, "biome.json"))).toBe(true);
expect(await pathExists(join(root, "tsconfig.json"))).toBe(true);
expect(await pathExists(join(root, "AGENTS.md"))).toBe(true);
expect(await pathExists(join(root, "README.md"))).toBe(true);
expect(await pathExists(join(root, "templates"))).toBe(true);
expect(await pathExists(join(root, "templates", ".gitkeep"))).toBe(true);
expect(await pathExists(join(root, "workflows", "package.json"))).toBe(true);
const rootPkg = JSON.parse(await readFile(join(root, "package.json"), "utf8")) as {
workspaces: string[];
};
expect(rootPkg.workspaces).toEqual(["templates/*", "workflows"]);
const wfPkg = JSON.parse(await readFile(join(root, "workflows", "package.json"), "utf8")) as {
type: string;
dependencies: Record<string, string>;
};
expect(wfPkg.type).toBe("module");
expect(wfPkg.dependencies["@uncaged/workflow"]).toBeDefined();
expect(wfPkg.dependencies.zod).toBeDefined();
const tsconfig = JSON.parse(await readFile(join(root, "tsconfig.json"), "utf8")) as {
compilerOptions: { strict: boolean; module: string; target: string };
};
expect(tsconfig.compilerOptions.strict).toBe(true);
expect(tsconfig.compilerOptions.module).toBe("ESNext");
expect(tsconfig.compilerOptions.target).toBe("ESNext");
});
test("AGENTS.md contains coding agent guide sections and terms", async () => {
const created = await cmdInitWorkspace(parent, "my-workflows");
expect(created.ok).toBe(true);
if (!created.ok) {
return;
}
const agentsPath = join(created.value.rootPath, "AGENTS.md");
const body = await readFile(agentsPath, "utf8");
for (const section of [
"项目结构",
"核心概念",
"开发流程",
"编码规范",
"Template",
"Build",
"常见陷阱",
]) {
expect(body).toContain(section);
}
for (const term of [
"RoleDefinition",
"WorkflowDefinition",
"Moderator",
"AgentFn",
"ExtractFn",
"RoleMeta",
]) {
expect(body).toContain(term);
}
expect(body).toMatch(/type[\s\S]*interface/i);
expect(body).toMatch(/function[\s\S]*class/i);
expect(body).toContain("Crockford Base32");
expect(body).toMatch(/no[\s\S]*default export/i);
expect(body).toMatch(/no[\s\S]*console/i);
expect(body).toMatch(/no[\s\S]*dynamic import/i);
expect(body).toContain("bun run check");
expect(body).toContain("bun test");
expect(body).toContain("uncaged-workflow");
expect(body).toContain("bun build");
expect(body).toContain("CLAUDE.md");
expect(body).toContain("docs/architecture.md");
});
test("errors when directory already exists", async () => {
const first = await cmdInitWorkspace(parent, "dup");
expect(first.ok).toBe(true);
const second = await cmdInitWorkspace(parent, "dup");
expect(second.ok).toBe(false);
if (!second.ok) {
expect(second.error).toContain("already exists");
}
});
test("errors on invalid workspace name", async () => {
const slash = await cmdInitWorkspace(parent, "a/b");
expect(slash.ok).toBe(false);
const dots = await cmdInitWorkspace(parent, "..");
expect(dots.ok).toBe(false);
const empty = await cmdInitWorkspace(parent, "");
expect(empty.ok).toBe(false);
});
test("usage lists init subcommands", () => {
const u = formatCliUsage();
expect(u).toContain("uncaged-workflow init workspace <name>");
expect(u).toContain("uncaged-workflow init template <name>");
});
test("runCli rejects unknown init subcommand", async () => {
const code = await runCli(join(parent, "_storage"), ["init", "bogus", "name"]);
expect(code).toBe(1);
});
test.serial("runCli init workspace uses cwd", async () => {
const prev = process.cwd();
try {
process.chdir(parent);
const code = await runCli(join(parent, "_storage"), ["init", "workspace", "from-cli"]);
expect(code).toBe(0);
expect(await pathExists(join(parent, "from-cli", "workflows", "package.json"))).toBe(true);
} finally {
process.chdir(prev);
}
});
});
@@ -0,0 +1,369 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawn, spawnSync } from "node:child_process";
import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, getGlobalCasDir, putContentMerkleNode } from "@uncaged/workflow";
import {
formatLiveDebugLine,
formatLiveTimeLabel,
LIVE_CONTENT_MAX_LINES,
type LiveRoleRow,
renderLiveRoleStepLines,
} from "../src/cmd-live.js";
import { parseLiveArgv } from "../src/live-argv.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const fixtureRoot = fileURLToPath(new URL("./fixtures/live", import.meta.url));
/** Bodies for Merkle content nodes; hashes must match `.data.jsonl` fixtures. */
const LIVE_FIXTURE_PLANNER_BODY =
"alpha\nbeta\ngamma\nLINE4\nLINE5\nLINE6\nLINE7\nLINE8\nLINE9\nLINE10\nLINE11";
describe("live helpers", () => {
test("formatLiveTimeLabel pads HH:MM:SS", () => {
const label = formatLiveTimeLabel(new Date("2024-06-01T09:08:07.000Z").getTime());
expect(label).toMatch(/^\d{2}:\d{2}:\d{2}$/);
});
test("formatLiveDebugLine flattens newlines in message", () => {
const line = formatLiveDebugLine(0, "TAG1", "a\nb");
expect(line).toContain("[TAG1]");
expect(line).toContain("a b");
expect(line).not.toContain("\n");
});
test("renderLiveRoleStepLines truncates content to LIVE_CONTENT_MAX_LINES", () => {
const lines = Array.from({ length: LIVE_CONTENT_MAX_LINES + 3 }, (_, i) => `L${i + 1}`);
const row: LiveRoleRow = {
role: "r",
content: lines.join("\n"),
meta: { k: "v" },
timestamp: 0,
};
const out = renderLiveRoleStepLines(row, "r");
const body = out.filter((l) => l.startsWith(" L"));
expect(body.length).toBe(LIVE_CONTENT_MAX_LINES);
expect(out.some((l) => l.includes("more line"))).toBe(true);
expect(out.some((l) => l.startsWith(" meta: "))).toBe(true);
});
});
describe("parseLiveArgv", () => {
test("parses thread id and flags in any order", () => {
const a = parseLiveArgv(["01ABC", "--debug", "--role", "planner"]);
expect(a.ok).toBe(true);
if (a.ok) {
expect(a.value.threadId).toBe("01ABC");
expect(a.value.latest).toBe(false);
expect(a.value.debug).toBe(true);
expect(a.value.role).toBe("planner");
}
const b = parseLiveArgv(["--latest", "--role", "x"]);
expect(b.ok).toBe(true);
if (b.ok) {
expect(b.value.latest).toBe(true);
expect(b.value.threadId).toBe(null);
expect(b.value.role).toBe("x");
}
});
test("rejects --latest with thread id", () => {
const r = parseLiveArgv(["--latest", "01ABC"]);
expect(r.ok).toBe(false);
});
});
describe("live CLI", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
await mkdir(join(storageRoot, "logs", "C9NMV6V2TQT81"), { recursive: true });
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVECMPLT01DDDDDDDDDDDDG.info.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl"),
);
await cp(
join(fixtureRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
join(storageRoot, "logs", "C9NMV6V2TQT81", "01LIVEOLDER01DDDDDDDDDDDDG.data.jsonl"),
);
const cas = createCasStore(getGlobalCasDir(storageRoot));
await putContentMerkleNode(cas, LIVE_FIXTURE_PLANNER_BODY);
await putContentMerkleNode(cas, "patch");
await putContentMerkleNode(cas, "still running");
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("prints role steps and summary for a completed thread", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("coder");
expect(stdout).toContain("meta:");
expect(stdout).toContain('"phase":"plan"');
expect(stdout).toContain("LINE10");
expect(stdout).not.toContain("LINE11");
expect(stdout).toContain("more line");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("fixture completed");
});
test("--latest tails the newest thread by start timestamp", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(process.execPath, [cliEntryPath, "live", "--latest"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("fixture completed");
expect(stdout).not.toContain("older thread");
});
test("--debug prints .info.jsonl records after data output", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--debug"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("bundle loaded");
expect(stdout).toContain("[DEBUGTAG2]");
expect(stdout).toContain("multi line");
});
test("--role filters out non-matching roles", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "01LIVECMPLT01DDDDDDDDDDDDG", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("completed: returnCode=0");
});
test("--latest --debug --role combine", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawn(
process.execPath,
[cliEntryPath, "live", "--latest", "--debug", "--role", "planner"],
{
env,
stdio: ["ignore", "pipe", "pipe"],
},
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("[DEBUGTAG1]");
expect(stdout).toContain("planner");
expect(stdout).not.toContain("patch");
expect(stdout).toContain("fixture completed");
});
test("unknown thread id exits 1", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const r = spawnSync(process.execPath, [cliEntryPath, "live", "01UNKNOWNXXXXXXXXXXXXXXXXX"], {
env,
encoding: "utf8",
});
expect(r.status).toBe(1);
expect(String(r.stderr ?? "")).toContain("thread not found");
});
test("follows file until WorkflowResult is appended", async () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const dataPath = join(
storageRoot,
"logs",
"C9NMV6V2TQT81",
"01LIVEINFLY01DDDDDDDDDDDDG.data.jsonl",
);
const proc = spawn(process.execPath, [cliEntryPath, "live", "01LIVEINFLY01DDDDDDDDDDDDG"], {
env,
stdio: ["ignore", "pipe", "pipe"],
});
await new Promise((r) => setTimeout(r, 120));
const prior = await readFile(dataPath, "utf8");
await writeFile(
dataPath,
`${prior.replace(/\s*$/, "")}\n${JSON.stringify({ returnCode: 0, summary: "caught up" })}\n`,
"utf8",
);
const stdout = await new Promise<string>((resolve, reject) => {
let buf = "";
proc.stdout?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.stderr?.on("data", (c: Buffer) => {
buf += c.toString("utf8");
});
proc.on("error", reject);
proc.on("exit", (code: number | null) => {
if (code === 0) {
resolve(buf);
} else {
reject(new Error(`exit ${code}: ${buf}`));
}
});
});
expect(stdout).toContain("planner");
expect(stdout).toContain("completed: returnCode=0");
expect(stdout).toContain("caught up");
});
});
describe("live --latest with empty storage", () => {
let prevEnv: string | undefined;
let emptyRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
emptyRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-live-empty-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = emptyRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(emptyRoot, { recursive: true, force: true });
});
test("exits 1 when no threads exist", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: emptyRoot };
const r = spawnSync(process.execPath, [cliEntryPath, "live", "--latest"], {
env,
encoding: "utf8",
});
expect(r.status).toBe(1);
expect(String(r.stderr ?? "")).toContain("no threads");
});
});
@@ -323,7 +323,7 @@ describe("cli thread commands", () => {
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
expect(lines.length).toBe(3);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
@@ -362,8 +362,8 @@ describe("cli thread commands", () => {
const resumed = await cmdResume(storageRoot, threadId);
expect(resumed.ok).toBe(true);
await waitUntilMinDataLines(dataPath, 3, 120);
expect(await countDataJsonlLines(dataPath)).toBe(3);
await waitUntilMinDataLines(dataPath, 4, 120);
expect(await countDataJsonlLines(dataPath)).toBe(4);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 100);
+75 -25
View File
@@ -4,8 +4,10 @@ import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
import { cmdGc } from "./cmd-gc.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdInitTemplate, cmdInitWorkspace } from "./cmd-init.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
import { cmdLive } from "./cmd-live.js";
import { cmdPause } from "./cmd-pause.js";
import { cmdPs } from "./cmd-ps.js";
import { cmdRemove } from "./cmd-remove.js";
@@ -15,9 +17,10 @@ import { cmdRun } from "./cmd-run.js";
import { cmdShow, formatShowYaml } from "./cmd-show.js";
import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js";
import { cmdThreads } from "./cmd-threads.js";
import { parseLiveArgv } from "./live-argv.js";
import { parseRunArgv } from "./run-argv.js";
function usage(): string {
export function formatCliUsage(): string {
return [
"Usage:",
" uncaged-workflow add <name> <file.esm.js> [--types <path>]",
@@ -27,6 +30,8 @@ function usage(): string {
" uncaged-workflow run <name> [--prompt <text>] [--max-rounds N]",
" uncaged-workflow ps",
" uncaged-workflow kill <thread-id>",
" uncaged-workflow live <thread-id> [--debug] [--role <name>]",
" uncaged-workflow live --latest [--debug] [--role <name>]",
" uncaged-workflow history <name>",
" uncaged-workflow rollback <name> [hash]",
" uncaged-workflow pause <thread-id>",
@@ -40,13 +45,47 @@ function usage(): string {
" uncaged-workflow cas put <thread-id> <content>",
" uncaged-workflow cas list <thread-id>",
" uncaged-workflow cas rm <thread-id> <hash>",
" uncaged-workflow init workspace <name>",
" uncaged-workflow init template <name>",
].join("\n");
}
async function dispatchInit(_storageRoot: string, argv: string[]): Promise<number> {
const sub = argv[0];
const name = argv[1];
if (sub === undefined || name === undefined || argv.length > 2) {
printCliError(`${formatCliUsage()}\n\nerror: init requires workspace|template <name>`);
return 1;
}
if (sub === "workspace") {
const result = await cmdInitWorkspace(process.cwd(), name);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`initialized workflow workspace at ${result.value.rootPath}`);
return 0;
}
if (sub === "template") {
const result = await cmdInitTemplate(process.cwd(), name);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`initialized template at ${result.value.templatePath}`);
return 0;
}
printCliError(`${formatCliUsage()}\n\nerror: unknown init subcommand: ${sub}`);
return 1;
}
async function dispatchAdd(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseAddArgv(argv);
if (!parsed.ok) {
printCliError(`${usage()}\n\nerror: ${parsed.error}`);
printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`);
return 1;
}
const result = await cmdAdd(storageRoot, parsed.value);
@@ -63,7 +102,7 @@ async function dispatchAdd(storageRoot: string, argv: string[]): Promise<number>
async function dispatchList(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: list takes no arguments`);
printCliError(`${formatCliUsage()}\n\nerror: list takes no arguments`);
return 1;
}
const result = await cmdList(storageRoot);
@@ -80,7 +119,7 @@ async function dispatchList(storageRoot: string, argv: string[]): Promise<number
async function dispatchShow(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: show requires <name>`);
printCliError(`${formatCliUsage()}\n\nerror: show requires <name>`);
return 1;
}
const result = await cmdShow(storageRoot, name);
@@ -95,7 +134,7 @@ async function dispatchShow(storageRoot: string, argv: string[]): Promise<number
async function dispatchRemove(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: remove requires <name>`);
printCliError(`${formatCliUsage()}\n\nerror: remove requires <name>`);
return 1;
}
const result = await cmdRemove(storageRoot, name);
@@ -110,7 +149,7 @@ async function dispatchRemove(storageRoot: string, argv: string[]): Promise<numb
async function dispatchRun(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseRunArgv(argv);
if (!parsed.ok) {
printCliError(`${usage()}\n\nerror: ${parsed.error}`);
printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`);
return 1;
}
@@ -131,7 +170,7 @@ async function dispatchRun(storageRoot: string, argv: string[]): Promise<number>
async function dispatchPs(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: ps takes no arguments`);
printCliError(`${formatCliUsage()}\n\nerror: ps takes no arguments`);
return 1;
}
for (const line of await cmdPs(storageRoot)) {
@@ -143,7 +182,7 @@ async function dispatchPs(storageRoot: string, argv: string[]): Promise<number>
async function dispatchKill(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: kill requires <thread-id>`);
printCliError(`${formatCliUsage()}\n\nerror: kill requires <thread-id>`);
return 1;
}
const result = await cmdKill(storageRoot, threadId);
@@ -155,10 +194,19 @@ async function dispatchKill(storageRoot: string, argv: string[]): Promise<number
return 0;
}
async function dispatchLive(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseLiveArgv(argv);
if (!parsed.ok) {
printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`);
return 1;
}
return cmdLive(storageRoot, parsed.value);
}
async function dispatchHistory(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: history requires <name>`);
printCliError(`${formatCliUsage()}\n\nerror: history requires <name>`);
return 1;
}
const result = await cmdHistory(storageRoot, name);
@@ -175,7 +223,7 @@ async function dispatchHistory(storageRoot: string, argv: string[]): Promise<num
async function dispatchRollback(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 2) {
printCliError(`${usage()}\n\nerror: rollback requires <name> [hash]`);
printCliError(`${formatCliUsage()}\n\nerror: rollback requires <name> [hash]`);
return 1;
}
const hashArg = argv[1];
@@ -191,7 +239,7 @@ async function dispatchRollback(storageRoot: string, argv: string[]): Promise<nu
async function dispatchPause(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: pause requires <thread-id>`);
printCliError(`${formatCliUsage()}\n\nerror: pause requires <thread-id>`);
return 1;
}
const result = await cmdPause(storageRoot, threadId);
@@ -206,7 +254,7 @@ async function dispatchPause(storageRoot: string, argv: string[]): Promise<numbe
async function dispatchResume(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: resume requires <thread-id>`);
printCliError(`${formatCliUsage()}\n\nerror: resume requires <thread-id>`);
return 1;
}
const result = await cmdResume(storageRoot, threadId);
@@ -233,7 +281,7 @@ async function dispatchThreads(storageRoot: string, argv: string[]): Promise<num
async function dispatchThread(storageRoot: string, argv: string[]): Promise<number> {
const id = argv[0];
if (id === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: thread requires <id>`);
printCliError(`${formatCliUsage()}\n\nerror: thread requires <id>`);
return 1;
}
const result = await cmdThreadShow(storageRoot, id);
@@ -248,7 +296,7 @@ async function dispatchThread(storageRoot: string, argv: string[]): Promise<numb
async function dispatchThreadRm(storageRoot: string, argv: string[]): Promise<number> {
const id = argv[0];
if (id === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: thread rm requires <id>`);
printCliError(`${formatCliUsage()}\n\nerror: thread rm requires <id>`);
return 1;
}
const result = await cmdThreadRemove(storageRoot, id);
@@ -270,7 +318,7 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis
async function dispatchGc(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: gc takes no arguments`);
printCliError(`${formatCliUsage()}\n\nerror: gc takes no arguments`);
return 1;
}
const result = await cmdGc(storageRoot);
@@ -288,7 +336,7 @@ async function dispatchGc(storageRoot: string, argv: string[]): Promise<number>
async function dispatchFork(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseForkArgv(argv);
if (!parsed.ok) {
printCliError(`${usage()}\n\nerror: ${parsed.error}`);
printCliError(`${formatCliUsage()}\n\nerror: ${parsed.error}`);
return 1;
}
const result = await cmdFork(storageRoot, parsed.value.threadId, parsed.value.fromRole);
@@ -304,7 +352,7 @@ async function dispatchCasGet(storageRoot: string, rest: string[]): Promise<numb
const threadId = rest[0];
const hash = rest[1];
if (threadId === undefined || hash === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas get requires <thread-id> <hash>`);
printCliError(`${formatCliUsage()}\n\nerror: cas get requires <thread-id> <hash>`);
return 1;
}
const result = await cmdCasGet(storageRoot, threadId, hash);
@@ -320,7 +368,7 @@ async function dispatchCasPut(storageRoot: string, rest: string[]): Promise<numb
const threadId = rest[0];
const content = rest[1];
if (threadId === undefined || content === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas put requires <thread-id> <content>`);
printCliError(`${formatCliUsage()}\n\nerror: cas put requires <thread-id> <content>`);
return 1;
}
const result = await cmdCasPut(storageRoot, threadId, content);
@@ -335,7 +383,7 @@ async function dispatchCasPut(storageRoot: string, rest: string[]): Promise<numb
async function dispatchCasList(storageRoot: string, rest: string[]): Promise<number> {
const threadId = rest[0];
if (threadId === undefined || rest.length > 1) {
printCliError(`${usage()}\n\nerror: cas list requires <thread-id>`);
printCliError(`${formatCliUsage()}\n\nerror: cas list requires <thread-id>`);
return 1;
}
const result = await cmdCasList(storageRoot, threadId);
@@ -353,7 +401,7 @@ async function dispatchCasRm(storageRoot: string, rest: string[]): Promise<numbe
const threadId = rest[0];
const hash = rest[1];
if (threadId === undefined || hash === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas rm requires <thread-id> <hash>`);
printCliError(`${formatCliUsage()}\n\nerror: cas rm requires <thread-id> <hash>`);
return 1;
}
const result = await cmdCasRm(storageRoot, threadId, hash);
@@ -378,12 +426,12 @@ const CAS_SUBCOMMAND_TABLE: Record<
async function dispatchCas(storageRoot: string, argv: string[]): Promise<number> {
const sub = argv[0];
if (sub === undefined) {
printCliError(`${usage()}\n\nerror: unknown cas subcommand: (none)`);
printCliError(`${formatCliUsage()}\n\nerror: unknown cas subcommand: (none)`);
return 1;
}
const handler = CAS_SUBCOMMAND_TABLE[sub];
if (handler === undefined) {
printCliError(`${usage()}\n\nerror: unknown cas subcommand: ${sub}`);
printCliError(`${formatCliUsage()}\n\nerror: unknown cas subcommand: ${sub}`);
return 1;
}
return handler(storageRoot, argv.slice(1));
@@ -393,12 +441,14 @@ type DispatchFn = (storageRoot: string, argv: string[]) => Promise<number>;
const COMMAND_TABLE: Record<string, DispatchFn> = {
add: dispatchAdd,
init: dispatchInit,
list: dispatchList,
show: dispatchShow,
remove: dispatchRemove,
run: dispatchRun,
ps: dispatchPs,
kill: dispatchKill,
live: dispatchLive,
history: dispatchHistory,
rollback: dispatchRollback,
pause: dispatchPause,
@@ -412,18 +462,18 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
export async function runCli(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length === 0) {
printCliError(usage());
printCliError(formatCliUsage());
return 1;
}
const command = argv[0];
if (command === undefined) {
printCliError(usage());
printCliError(formatCliUsage());
return 1;
}
const rest = argv.slice(1);
const dispatch = COMMAND_TABLE[command];
if (dispatch === undefined) {
printCliError(`${usage()}\n\nerror: unknown command ${command}`);
printCliError(`${formatCliUsage()}\n\nerror: unknown command ${command}`);
return 1;
}
return dispatch(storageRoot, rest);
Regular → Executable
View File
+415
View File
@@ -0,0 +1,415 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join, resolve } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { pathExists } from "./fs-utils.js";
export type CmdInitWorkspaceSuccess = {
rootPath: string;
};
export type CmdInitTemplateSuccess = {
templatePath: string;
};
function validateWorkspaceSegment(name: string): Result<void, string> {
if (name.length === 0) {
return err("workspace name must not be empty");
}
if (name === "." || name === "..") {
return err("invalid workspace name");
}
if (name.includes("/") || name.includes("\\")) {
return err("workspace name must not contain path separators");
}
return ok(undefined);
}
function rootPackageJson(workspaceName: string): string {
return `${JSON.stringify(
{
name: workspaceName,
private: true,
type: "module",
workspaces: ["templates/*", "workflows"],
},
null,
2,
)}\n`;
}
function workflowsPackageJson(): string {
return `${JSON.stringify(
{
name: "workflows",
version: "0.0.0",
private: true,
type: "module",
dependencies: {
"@uncaged/workflow": "^0.1.0",
zod: "^4.0.0",
},
},
null,
2,
)}\n`;
}
function biomeJson(): string {
return `${JSON.stringify(
{
$schema: "https://biomejs.dev/schemas/2.4.14/schema.json",
files: {
includes: ["**", "!**/node_modules", "!**/dist"],
},
formatter: {
indentWidth: 2,
},
linter: {
enabled: true,
rules: {
recommended: true,
},
},
},
null,
2,
)}\n`;
}
function tsconfigJson(): string {
return `${JSON.stringify(
{
compilerOptions: {
strict: true,
target: "ESNext",
module: "ESNext",
moduleResolution: "Bundler",
skipLibCheck: true,
},
},
null,
2,
)}\n`;
}
function agentsMd(): string {
return `# AGENTS — Workflow 工作区开发指南
面向在本仓库中编写 workflow 的 coding agent。引擎层术语与架构细节与 **@uncaged/workflow** 上游文档一致,编写时可对照 \`CLAUDE.md\`\`docs/architecture.md\`
## 1. 项目结构(workspace / template / workflow instance)
| 层级 | 目录 / 产物 | 职责 |
|------|----------------|------|
| **Workspace** | 仓库根(\`package.json\`\`workspaces: ["templates/*", "workflows"]\`) | Bun monorepo:统一管理本地模板包与 workflow 实例 |
| **Template** | \`templates/<name>/\`(如 \`src/roles.ts\`\`src/moderator.ts\`\`src/index.ts\`) | 纯数据:**WorkflowDefinition**(各 **RoleDefinition** + **Moderator**),**不绑定**具体 Agent |
| **Workflow instance** | \`workflows/\`(或单独包) | 把模板与运行时 **AgentFn** / **ExtractFn** 组合,产出可注册的 **单文件 ESM bundle**(\`run\` + \`descriptor\` 命名导出) |
Init 生成的骨架:\`templates/\` 下放可复用定义,\`workflows/\` 下放绑定与打包入口。
## 2. 核心概念
- **RoleMeta**:\`Record<string, Record<string, unknown>>\`,角色名 → 该角色结构化 meta 的形状约定。
- **RoleDefinition<Meta>**:纯数据——\`description\`\`systemPrompt\`\`extractPrompt\`\`schema\`(Zod v4)。不含执行逻辑。
- **WorkflowDefinition<M extends RoleMeta>**:\`description\` + \`roles\`(各角色定义)+ **Moderator**。
- **Moderator**:\`(ctx: ModeratorContext<M>) => (角色名) | END\`。同步、纯函数,只做路由。
- **AgentFn**:\`(ctx: AgentContext) => Promise<string>\`,原始文本输出;从上下文读取当前角色的 \`systemPrompt\`
- **ExtractFn**:从上下文与 prompt 解析结构化数据(引擎与 Agent 都可使用)。
引擎循环简述:**Moderator** → 选角色 → **Agent** 产出文本 → **Extract** 写入 **meta** → 追加 step,重复直至 **END**。详见 \`docs/architecture.md\` 中的三阶段说明。
## 3. 开发流程
1. **定义 RoleMeta**:为每个角色约定 meta 的 TypeScript 类型(与 Zod schema 对齐)。
2. **编写 RoleDefinition**:为每个角色写 Zod \`schema\`,补齐 \`systemPrompt\` / \`extractPrompt\` / \`description\`
3. **编写 Moderator**:根据 \`ctx.steps\` 与业务状态返回下一个角色名或 \`END\`
4. **组装 WorkflowDefinition**:在模板 \`index\` 中导出 definition(以及必要的角色 / moderator 导出)。
5. **实例化**:在 workflow 包中使用 \`createWorkflow(def, binding, extract)\`(或项目约定的封装)绑定 **AgentFn** / **ExtractFn**。
6. **构建**:打包为单个 **.esm.js** bundle,使用 **uncaged-workflow add** 注册。
## 4. 编码规范
与 **CLAUDE.md** 对齐,摘要如下:
- **Functional-first**:优先 \`function\` + \`type\`,避免面向对象业务模型。
- **type 而非 interface**:类型别名一律用 \`type\`,不要使用 \`interface\`
- **显式可空**:不要用 \`?:\`;可空字段写成 \`T | null\`
- **function 而非 class**:不用 class(第三方库要求或 \`Error\` 子类除外)。
- **Crockford Base32**:日志 tag、bundle hash、thread id 等标识约定(引擎侧);工作区内自定义日志若沿用引擎 logger,tag 为 8 字符 Crockford Base32,且每个调用点唯一。
- **Named exports only**:不要使用 **default export**;workflow bundle 须 **export const run** 与 **export const descriptor**。
- **No console.log**:库代码用结构化 logger;CLI 用户输出可按项目 Biome 规则例外标注。
- **No dynamic import**:业务与 bundle 内禁止 \`import()\`;例外仅限「运行时路径由用户提供的 bundle 加载器」(引擎内部)。
## 5. Template 复用
- **已发布模板**:可通过 npm 依赖 \`@uncaged/workflow-template-*\` 等包,在 workflow 实例中 import 其 **WorkflowDefinition** 再绑定 Agent。
- **本地模板**:放在本仓库 \`templates/<name>/\`,由 workspace 协议引用(如 \`"template-foo": "workspace:*"\` 或相对路径),便于同源修改与版本控制。
选择模板时保持 **definition 与 agent 绑定分离**:模板只描述「做什么、顺序如何」,实例决定「谁执行、如何抽取 meta」。
## 6. Build and Test
日常命令:
\`\`\`sh
bun install
bun run check # Biome:lint + format
bun test
bun build # 若包内配置了 build 脚本则用于产出 dist / bundle
uncaged-workflow add <name> <path/to/bundle.esm.js>
\`\`\`
提交前至少运行 **bun run check** 与 **bun test**;registry 与本地运行流参见 README 与 CLI 文档。
## 7. 常见陷阱
- **No dynamic import**:bundle 须静态可分析;动态 \`import()\` 会破坏哈希与加载约束。
- **No default export**:引擎只接受命名导出 \`run\` / \`descriptor\`
- **No console.log**:避免在可被 Biome \`noConsole\` 规则覆盖的代码路径直接使用 console。
- **Single-file ESM bundle**:交付物是单一 \`.esm.js\`;静态 import 仅限 Node 内置(见 architecture 文档中的 Bundle Contract)。
---
编写新 workflow 时,先对齐 **RoleMeta → RoleDefinition(Zod)→ Moderator → 绑定 → 单文件 bundle**,再对照本节规范自检。
`;
}
function readmeMd(workspaceName: string): string {
return `# ${workspaceName}
Local workflow development workspace (Bun monorepo).
## Layout
- \`templates/\` — reusable workflow definition packages (roles + moderator), no agent binding
- \`workflows/\` — workflow instances that bind templates to agents and export \`run\` + \`descriptor\`
## Commands
\`\`\`sh
bun install
bun run check # after you add scripts / Biome
uncaged-workflow add <name> <bundle.esm.js>
uncaged-workflow run <name>
\`\`\`
Create this skeleton with:
\`\`\`sh
uncaged-workflow init workspace ${workspaceName}
\`\`\`
`;
}
export async function cmdInitWorkspace(
parentDir: string,
workspaceName: string,
): Promise<Result<CmdInitWorkspaceSuccess, string>> {
const validated = validateWorkspaceSegment(workspaceName);
if (!validated.ok) {
return validated;
}
const rootPath = join(parentDir, workspaceName);
if (await pathExists(rootPath)) {
return err(`directory already exists: ${rootPath}`);
}
await mkdir(rootPath, { recursive: false });
await mkdir(join(rootPath, "templates"), { recursive: false });
await mkdir(join(rootPath, "workflows"), { recursive: false });
await Promise.all([
writeFile(join(rootPath, "package.json"), rootPackageJson(workspaceName), "utf8"),
writeFile(join(rootPath, "biome.json"), biomeJson(), "utf8"),
writeFile(join(rootPath, "tsconfig.json"), tsconfigJson(), "utf8"),
writeFile(join(rootPath, "AGENTS.md"), agentsMd(), "utf8"),
writeFile(join(rootPath, "README.md"), readmeMd(workspaceName), "utf8"),
writeFile(join(rootPath, "templates", ".gitkeep"), "", "utf8"),
writeFile(join(rootPath, "workflows", "package.json"), workflowsPackageJson(), "utf8"),
]);
return ok({ rootPath });
}
function hasTemplatesWorkspaceGlob(workspaces: unknown): boolean {
return Array.isArray(workspaces) && workspaces.includes("templates/*");
}
async function readPackageJsonWorkspaces(dir: string): Promise<unknown | null> {
const pkgPath = join(dir, "package.json");
if (!(await pathExists(pkgPath))) {
return null;
}
let raw: string;
try {
raw = await readFile(pkgPath, "utf8");
} catch {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(raw) as unknown;
} catch {
return null;
}
if (typeof parsed !== "object" || parsed === null || !("workspaces" in parsed)) {
return null;
}
return (parsed as { workspaces: unknown }).workspaces;
}
/** Resolve uncaged-workflow workspace root (package.json with `templates/*` in `workspaces`). */
async function findWorkflowWorkspaceRoot(startDir: string): Promise<Result<string, string>> {
let dir = resolve(startDir);
for (;;) {
const workspaces = await readPackageJsonWorkspaces(dir);
if (workspaces !== null && hasTemplatesWorkspaceGlob(workspaces)) {
return ok(dir);
}
const parent = dirname(dir);
if (parent === dir) {
return err(
'not inside a workflow workspace (no package.json with workspaces containing "templates/*")',
);
}
dir = parent;
}
}
function templatePackageJson(templateName: string): string {
return `${JSON.stringify(
{
name: `template-${templateName}`,
version: "0.0.0",
private: true,
type: "module",
dependencies: {
"@uncaged/workflow": "^0.1.0",
zod: "^4.0.0",
},
},
null,
2,
)}\n`;
}
function templateTsconfigJson(): string {
return `${JSON.stringify(
{
extends: "../../tsconfig.json",
compilerOptions: {
rootDir: "src",
outDir: "dist",
},
include: ["src/**/*.ts"],
},
null,
2,
)}\n`;
}
function templateRolesTs(): string {
return `import type { RoleDefinition } from "@uncaged/workflow";
import * as z from "zod/v4";
export const HELLO_TEMPLATE_DESCRIPTION =
"Minimal starter template: one greeter role, then END.";
export type HelloTemplateMeta = {
greeter: {
message: string;
};
};
const greeterMetaSchema = z.object({
message: z.string(),
});
export const greeterRole: RoleDefinition<HelloTemplateMeta["greeter"]> = {
description: "Says hello — replace with your first role.",
systemPrompt: "You are a helpful assistant. Reply with one short friendly sentence.",
extractPrompt: "Extract the assistant's greeting as message.",
schema: greeterMetaSchema,
extractRefs: null,
};
`;
}
function templateModeratorTs(): string {
return `import { END, type Moderator, type ModeratorContext } from "@uncaged/workflow";
import type { HelloTemplateMeta } from "./roles.js";
export const helloTemplateModerator: Moderator<HelloTemplateMeta> = (
ctx: ModeratorContext<HelloTemplateMeta>,
) => {
if (ctx.steps.length === 0) {
return "greeter";
}
return END;
};
`;
}
function templateIndexTs(): string {
return `import type { WorkflowDefinition } from "@uncaged/workflow";
import { helloTemplateModerator } from "./moderator.js";
import {
HELLO_TEMPLATE_DESCRIPTION,
type HelloTemplateMeta,
greeterRole,
} from "./roles.js";
export {
HELLO_TEMPLATE_DESCRIPTION,
type HelloTemplateMeta,
greeterRole,
} from "./roles.js";
export { helloTemplateModerator } from "./moderator.js";
export const helloTemplateWorkflowDefinition: WorkflowDefinition<HelloTemplateMeta> = {
description: HELLO_TEMPLATE_DESCRIPTION,
roles: {
greeter: greeterRole,
},
moderator: helloTemplateModerator,
};
`;
}
export async function cmdInitTemplate(
startDir: string,
templateName: string,
): Promise<Result<CmdInitTemplateSuccess, string>> {
const validated = validateWorkspaceSegment(templateName);
if (!validated.ok) {
return validated;
}
const rootResult = await findWorkflowWorkspaceRoot(startDir);
if (!rootResult.ok) {
return rootResult;
}
const workspaceRoot = rootResult.value;
const templateDir = join(workspaceRoot, "templates", templateName);
if (await pathExists(templateDir)) {
return err(`template already exists: ${templateDir}`);
}
await mkdir(join(templateDir, "src"), { recursive: true });
await Promise.all([
writeFile(join(templateDir, "package.json"), templatePackageJson(templateName), "utf8"),
writeFile(join(templateDir, "tsconfig.json"), templateTsconfigJson(), "utf8"),
writeFile(join(templateDir, "src", "roles.ts"), templateRolesTs(), "utf8"),
writeFile(join(templateDir, "src", "moderator.ts"), templateModeratorTs(), "utf8"),
writeFile(join(templateDir, "src", "index.ts"), templateIndexTs(), "utf8"),
]);
return ok({ templatePath: templateDir });
}
+463
View File
@@ -0,0 +1,463 @@
import { watch } from "node:fs";
import { readFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import {
type CasStore,
createCasStore,
getContentMerklePayload,
getGlobalCasDir,
tryParseRoleStepRecord,
tryParseWorkflowResultRecord,
type WorkflowCompletion,
} from "@uncaged/workflow";
import { printCliError, printCliLine } from "./cli-output.js";
import { pathExists } from "./fs-utils.js";
import type { ParsedLiveArgv } from "./live-argv.js";
import { findLatestThreadDataPath, resolveThreadDataPath } from "./thread-scan.js";
export const LIVE_CONTENT_MAX_LINES = 10;
export type LiveRoleRow = {
role: string;
content: string;
meta: Record<string, unknown>;
timestamp: number;
};
export function formatLiveTimeLabel(timestampMs: number): string {
const d = new Date(timestampMs);
const hh = String(d.getHours()).padStart(2, "0");
const mm = String(d.getMinutes()).padStart(2, "0");
const ss = String(d.getSeconds()).padStart(2, "0");
return `${hh}:${mm}:${ss}`;
}
function shouldUseColor(): boolean {
return process.stdout.isTTY === true && process.env.NO_COLOR === undefined;
}
function highlightLiveRole(name: string): string {
if (!shouldUseColor()) {
return name;
}
return `\x1b[1m\x1b[36m${name}\x1b[0m`;
}
function dimGreyLine(line: string): string {
if (!shouldUseColor()) {
return line;
}
return `\x1b[2m\x1b[90m${line}\x1b[0m`;
}
export function formatLiveDebugLine(timestampMs: number, tag: string, message: string): string {
const label = `[${formatLiveTimeLabel(timestampMs)}] [${tag}] ${message.replace(/\n/g, " ")}`;
return dimGreyLine(label);
}
export function renderLiveRoleStepLines(row: LiveRoleRow, roleDisplay: string): string[] {
const header = `[${formatLiveTimeLabel(row.timestamp)}] ▶ ${roleDisplay}`;
const lines: string[] = [header];
const parts = row.content.split("\n");
const shown = parts.slice(0, LIVE_CONTENT_MAX_LINES);
for (const ln of shown) {
lines.push(` ${ln}`);
}
const omitted = parts.length - shown.length;
if (omitted > 0) {
lines.push(` … (${omitted} more line${omitted === 1 ? "" : "s"})`);
}
lines.push(` meta: ${JSON.stringify(row.meta)}`);
return lines;
}
function printSummary(result: WorkflowCompletion): void {
printCliLine(`completed: returnCode=${result.returnCode}${result.summary}`);
}
type LiveSessionState = {
sawStart: boolean;
completed: boolean;
carry: string;
contentOffset: number;
};
type InfoLiveState = {
carry: string;
contentOffset: number;
};
function tryParseInfoRecord(obj: Record<string, unknown>): {
tag: string;
content: string;
timestamp: number;
} | null {
const tag = obj.tag;
const content = obj.content;
const timestamp = obj.timestamp;
if (
typeof tag !== "string" ||
typeof content !== "string" ||
typeof timestamp !== "number" ||
!Number.isFinite(timestamp)
) {
return null;
}
return { tag, content, timestamp };
}
async function handleJsonlLine(
rawLine: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<{ parseError: string | null; workflowResult: WorkflowCompletion | null }> {
const trimmed = rawLine.trim();
if (trimmed === "") {
return { parseError: null, workflowResult: null };
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
return { parseError: "invalid JSON in thread data file", workflowResult: null };
}
if (rec === null || typeof rec !== "object") {
return { parseError: "invalid record in thread data file", workflowResult: null };
}
const obj = rec as Record<string, unknown>;
if (!state.sawStart) {
state.sawStart = true;
return { parseError: null, workflowResult: null };
}
const wf = tryParseWorkflowResultRecord(obj);
if (wf !== null) {
state.completed = true;
return { parseError: null, workflowResult: wf };
}
const roleRow = tryParseRoleStepRecord(obj);
if (roleRow === null) {
return {
parseError: "unrecognized record in thread data (expected role step or result)",
workflowResult: null,
};
}
if (roleFilter !== null && roleRow.role !== roleFilter) {
return { parseError: null, workflowResult: null };
}
const payload = await getContentMerklePayload(cas, roleRow.contentHash);
const content =
payload !== null ? payload : `(content not in CAS; contentHash=${roleRow.contentHash})`;
const row: LiveRoleRow = {
role: roleRow.role,
content,
meta: roleRow.meta,
timestamp: roleRow.timestamp,
};
for (const outLine of renderLiveRoleStepLines(row, highlightLiveRole(row.role))) {
printCliLine(outLine);
}
return { parseError: null, workflowResult: null };
}
async function pumpNewContent(
dataPath: string,
state: LiveSessionState,
roleFilter: string | null,
cas: CasStore,
): Promise<number | null> {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch {
return null;
}
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
state.carry += chunk;
const parts = state.carry.split("\n");
state.carry = parts.pop() ?? "";
for (const line of parts) {
const { parseError, workflowResult } = await handleJsonlLine(line, state, roleFilter, cas);
if (parseError !== null) {
printCliError(parseError);
return 1;
}
if (workflowResult !== null) {
printSummary(workflowResult);
return 0;
}
}
return null;
}
async function pumpNewInfoContent(infoPath: string, state: InfoLiveState): Promise<void> {
let text: string;
try {
text = await readFile(infoPath, "utf8");
} catch {
return;
}
if (text.length < state.contentOffset) {
state.contentOffset = 0;
state.carry = "";
}
const chunk = text.slice(state.contentOffset);
state.contentOffset = text.length;
state.carry += chunk;
const parts = state.carry.split("\n");
state.carry = parts.pop() ?? "";
for (const line of parts) {
const trimmed = line.trim();
if (trimmed === "") {
continue;
}
let rec: unknown;
try {
rec = JSON.parse(trimmed) as unknown;
} catch {
continue;
}
if (rec === null || typeof rec !== "object") {
continue;
}
const parsed = tryParseInfoRecord(rec as Record<string, unknown>);
if (parsed === null) {
continue;
}
printCliLine(formatLiveDebugLine(parsed.timestamp, parsed.tag, parsed.content));
}
}
type WatchPumpTask = {
path: string;
pump: () => Promise<number | null>;
};
async function runWatchPumpStep(
settled: () => boolean,
pump: () => Promise<number | null>,
closeAll: () => void,
finish: (code: number) => void,
): Promise<void> {
if (settled()) {
return;
}
try {
const code = await pump();
if (code !== null) {
closeAll();
finish(code);
}
} catch (e) {
closeAll();
throw e instanceof Error ? e : new Error(String(e));
}
}
function watchLivePaths(params: { tasks: WatchPumpTask[]; signal: AbortSignal }): Promise<number> {
const { tasks, signal } = params;
return new Promise((resolve, reject) => {
let settled = false;
const finish = (code: number): void => {
if (settled) {
return;
}
settled = true;
resolve(code);
};
const pumpChains = new Map<string, Promise<void>>();
for (const t of tasks) {
pumpChains.set(t.path, Promise.resolve());
}
const watchers: ReturnType<typeof watch>[] = [];
const closeAll = (): void => {
for (const w of watchers) {
w.close();
}
};
function schedulePump(path: string, pump: () => Promise<number | null>): void {
const prev = pumpChains.get(path) ?? Promise.resolve();
const next = (async () => {
await prev;
await runWatchPumpStep(() => settled, pump, closeAll, finish);
})();
pumpChains.set(path, next);
}
for (const { path, pump } of tasks) {
const watcher = watch(path, (eventType) => {
if (eventType === "rename") {
return;
}
schedulePump(path, pump);
});
watchers.push(watcher);
watcher.on("error", (err: Error) => {
closeAll();
reject(err);
});
}
const onAbort = (): void => {
closeAll();
finish(0);
};
signal.addEventListener("abort", onAbort, { once: true });
for (const { path, pump } of tasks) {
schedulePump(path, pump);
}
});
}
type LiveThreadTarget = {
threadId: string;
dataPath: string;
};
async function resolveLiveThreadTarget(
storageRoot: string,
parsed: ParsedLiveArgv,
): Promise<LiveThreadTarget | null> {
if (parsed.latest) {
const found = await findLatestThreadDataPath(storageRoot);
if (found === null) {
printCliError("live: no threads found");
return null;
}
return found;
}
const id = parsed.threadId;
if (id === null) {
printCliError("live: internal error: missing thread id");
return null;
}
const resolved = await resolveThreadDataPath(storageRoot, id);
if (resolved === null) {
printCliError(`thread not found: ${id}`);
return null;
}
return { threadId: id, dataPath: resolved };
}
async function buildLiveWatchTasks(params: {
dataPath: string;
infoPath: string;
debug: boolean;
dataState: LiveSessionState;
infoState: InfoLiveState;
roleFilter: string | null;
cas: CasStore;
}): Promise<WatchPumpTask[]> {
const { dataPath, infoPath, debug, dataState, infoState, roleFilter, cas } = params;
const tasks: WatchPumpTask[] = [
{
path: dataPath,
pump: () => pumpNewContent(dataPath, dataState, roleFilter, cas),
},
];
if (debug && (await pathExists(infoPath))) {
tasks.push({
path: infoPath,
pump: async () => {
await pumpNewInfoContent(infoPath, infoState);
return null;
},
});
}
return tasks;
}
export async function cmdLive(storageRoot: string, parsed: ParsedLiveArgv): Promise<number> {
const target = await resolveLiveThreadTarget(storageRoot, parsed);
if (target === null) {
return 1;
}
const { threadId, dataPath } = target;
const roleFilter = parsed.role;
const infoPath = join(dirname(dataPath), `${threadId}.info.jsonl`);
const cas = createCasStore(getGlobalCasDir(storageRoot));
const dataState: LiveSessionState = {
sawStart: false,
completed: false,
carry: "",
contentOffset: 0,
};
const infoState: InfoLiveState = {
carry: "",
contentOffset: 0,
};
const controller = new AbortController();
const onSigInt = (): void => {
controller.abort();
};
process.on("SIGINT", onSigInt);
try {
const firstData = await pumpNewContent(dataPath, dataState, roleFilter, cas);
if (firstData === 1) {
return 1;
}
if (parsed.debug && (await pathExists(infoPath))) {
await pumpNewInfoContent(infoPath, infoState);
}
if (firstData === 0 || dataState.completed) {
return 0;
}
const tasks = await buildLiveWatchTasks({
dataPath,
infoPath,
debug: parsed.debug,
dataState,
infoState,
roleFilter,
cas,
});
return await watchLivePaths({ tasks, signal: controller.signal });
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
printCliError(`live: ${message}`);
return 1;
} finally {
process.off("SIGINT", onSigInt);
}
}
+75
View File
@@ -0,0 +1,75 @@
import { err, ok, type Result } from "@uncaged/workflow";
export type ParsedLiveArgv = {
threadId: string | null;
latest: boolean;
debug: boolean;
role: string | null;
};
type LiveArgvScan = {
latest: boolean;
debug: boolean;
role: string | null;
threadId: string | null;
};
function applyLiveArgvToken(argv: string[], i: number, s: LiveArgvScan): Result<number, string> {
const a = argv[i];
if (a === "--latest") {
s.latest = true;
return ok(i + 1);
}
if (a === "--debug") {
s.debug = true;
return ok(i + 1);
}
if (a === "--role") {
const v = argv[i + 1];
if (v === undefined || v.startsWith("--")) {
return err("missing value for --role");
}
s.role = v;
return ok(i + 2);
}
if (a.startsWith("--")) {
return err(`unknown live flag: ${a}`);
}
if (s.threadId !== null) {
return err("unexpected extra argument");
}
s.threadId = a;
return ok(i + 1);
}
export function parseLiveArgv(argv: string[]): Result<ParsedLiveArgv, string> {
const s: LiveArgvScan = {
latest: false,
debug: false,
role: null,
threadId: null,
};
let i = 0;
while (i < argv.length) {
const step = applyLiveArgvToken(argv, i, s);
if (!step.ok) {
return step;
}
i = step.value;
}
if (s.latest && s.threadId !== null) {
return err("live --latest does not take <thread-id>");
}
if (!s.latest && s.threadId === null) {
return err("live requires <thread-id> or --latest");
}
return ok({
threadId: s.threadId,
latest: s.latest,
debug: s.debug,
role: s.role,
});
}
+67 -1
View File
@@ -1,4 +1,4 @@
import { readdir } from "node:fs/promises";
import { readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
@@ -15,6 +15,28 @@ export type HistoricalThreadRow = {
workflowName: string | null;
};
async function readThreadStartTimestampMs(dataPath: string): Promise<number | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
}
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
}
const ts = (parsed as Record<string, unknown>).timestamp;
return typeof ts === "number" && Number.isFinite(ts) ? ts : null;
}
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
@@ -124,6 +146,50 @@ export async function listHistoricalThreads(
return out;
}
/**
* Picks the thread whose `.data.jsonl` is newest by start-record `timestamp`,
* falling back to file `mtime` when the timestamp is missing.
* Tie-breaker: larger `mtime` wins when start timestamps are equal.
*/
export async function findLatestThreadDataPath(
storageRoot: string,
): Promise<{ threadId: string; dataPath: string } | null> {
const threads = await listHistoricalThreads(storageRoot, null);
if (threads.length === 0) {
return null;
}
let best: {
threadId: string;
dataPath: string;
primary: number;
secondary: number;
} | null = null;
for (const t of threads) {
const dataPath = join(storageRoot, "logs", t.hash, `${t.threadId}.data.jsonl`);
let mtimeMs = 0;
try {
const st = await stat(dataPath);
mtimeMs = st.mtimeMs;
} catch {
continue;
}
const startTs = await readThreadStartTimestampMs(dataPath);
const primary = startTs !== null ? startTs : mtimeMs;
const secondary = mtimeMs;
if (
best === null ||
primary > best.primary ||
(primary === best.primary && secondary > best.secondary)
) {
best = { threadId: t.threadId, dataPath, primary, secondary };
}
}
return best === null ? null : { threadId: best.threadId, dataPath: best.dataPath };
}
export async function resolveThreadDataPath(
storageRoot: string,
threadId: string,
@@ -39,4 +39,5 @@ export const coderRole: RoleDefinition<CoderMeta> = {
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
schema: coderMetaSchema,
extractRefs: (meta) => [meta.completedPhase],
extractMode: "single",
};
@@ -21,15 +21,16 @@ export const committerMetaSchema = z.discriminatedUnion("status", [
export type CommitterMeta = z.infer<typeof committerMetaSchema>;
const COMMITTER_SYSTEM = `You are the git committer. Create a branch, commit the changes, and push.
const COMMITTER_SYSTEM = `You are the git committer. Create a branch and commit the changes.
Report the branch name and commit SHA. On failure, classify as recoverable or unrecoverable.
Do not attempt to fix failures yourself.`;
export const committerRole: RoleDefinition<CommitterMeta> = {
description: "Creates branch, commits, and pushes when review passes.",
description: "Creates a branch and commits changes.",
systemPrompt: COMMITTER_SYSTEM,
extractPrompt:
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
schema: committerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -50,4 +50,5 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
extractMode: "single",
};
@@ -48,4 +48,5 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
schema: preparerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -22,4 +22,5 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
schema: reviewerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -0,0 +1,15 @@
{
"name": "@uncaged/workflow-role-tester",
"version": "0.1.0",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"scripts": {
"build": "echo 'TODO'",
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow": "workspace:*",
"zod": "^4.0.0"
}
}
@@ -0,0 +1 @@
export { type TesterMeta, testerMetaSchema, testerRole } from "./tester.js";
@@ -0,0 +1,27 @@
import type { RoleDefinition } from "@uncaged/workflow";
import * as z from "zod/v4";
export const testerMetaSchema = z.discriminatedUnion("status", [
z.object({
status: z.literal("passed"),
details: z.string(),
}),
z.object({
status: z.literal("failed"),
details: z.string(),
}),
]);
export type TesterMeta = z.infer<typeof testerMetaSchema>;
const TESTER_SYSTEM = `You are a tester. Run the project's test suite, build, and lint commands. Check what commands are available from the preparer's output in the thread. Report pass/fail with details of what failed.`;
export const testerRole: RoleDefinition<TesterMeta> = {
description: "Runs test, build, and lint commands and reports pass or fail with details.",
systemPrompt: TESTER_SYSTEM,
extractPrompt:
"Extract the verification result: passed with summary details, or failed with details of what broke.",
schema: testerMetaSchema,
extractRefs: null,
extractMode: "single",
};
@@ -0,0 +1,10 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist",
"composite": true
},
"include": ["src/**/*.ts"],
"references": [{ "path": "../workflow" }]
}
@@ -0,0 +1,260 @@
import { describe, expect, test } from "bun:test";
import {
END,
type ModeratorContext,
type RoleStep,
START,
validateWorkflowDescriptor,
} from "@uncaged/workflow";
import type { CommitterMeta } from "@uncaged/workflow-role-committer";
import type { PlannerMeta } from "@uncaged/workflow-role-planner";
import { buildDevelopDescriptor } from "../src/descriptor.js";
import { developModerator } from "../src/index.js";
import type { DevelopMeta } from "../src/roles.js";
const DEFAULT_PHASES: PlannerMeta["phases"] = [
{
hash: "4KNMR2PX",
title: "Do the work",
},
];
function makeStart(maxRounds: number): ModeratorContext<DevelopMeta>["start"] {
return {
role: START,
content: "Implement the feature",
meta: { maxRounds },
timestamp: 0,
};
}
function makeCtx(
maxRounds: number,
steps: ModeratorContext<DevelopMeta>["steps"],
): ModeratorContext<DevelopMeta> {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
start: makeStart(maxRounds),
steps,
};
}
function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<DevelopMeta> {
return {
role: "planner",
contentHash: "STUBHASHPLANNER001",
meta: { phases },
refs: phases.map((p) => p.hash),
timestamp: 1,
};
}
function coderStep(completedPhase = "4KNMR2PX"): RoleStep<DevelopMeta> {
return {
role: "coder",
contentHash: "STUBHASHCODER00001",
meta: { completedPhase, filesChanged: ["a.ts"], summary: "implemented" },
refs: [completedPhase],
timestamp: 2,
};
}
function reviewerStep(approved: boolean): RoleStep<DevelopMeta> {
return {
role: "reviewer",
contentHash: "STUBHASHREVIEWER01",
meta: approved
? { status: "approved" as const }
: { status: "rejected" as const, issues: ["needs fix"] },
refs: [],
timestamp: 3,
};
}
function testerStep(passed: boolean): RoleStep<DevelopMeta> {
return {
role: "tester",
contentHash: "STUBHASHTESTER01",
meta: passed
? { status: "passed" as const, details: "all checks passed" }
: { status: "failed" as const, details: "lint failed" },
refs: [],
timestamp: 4,
};
}
function committerStep(meta: CommitterMeta): RoleStep<DevelopMeta> {
return {
role: "committer",
contentHash: "STUBHASHCOMMITTER1",
meta,
refs: [],
timestamp: 5,
};
}
describe("developModerator", () => {
test("routes initial → planner → coder → reviewer → tester → committer → END", () => {
expect(developModerator(makeCtx(20, []))).toBe("planner");
expect(developModerator(makeCtx(20, [plannerStep()]))).toBe("coder");
expect(developModerator(makeCtx(20, [plannerStep(), coderStep()]))).toBe("reviewer");
expect(developModerator(makeCtx(20, [plannerStep(), coderStep(), reviewerStep(true)]))).toBe(
"tester",
);
expect(
developModerator(
makeCtx(20, [plannerStep(), coderStep(), reviewerStep(true), testerStep(true)]),
),
).toBe("committer");
expect(
developModerator(
makeCtx(20, [
plannerStep(),
coderStep(),
reviewerStep(true),
testerStep(true),
committerStep({ status: "committed", branch: "feat/x", commitSha: "abc1234" }),
]),
),
).toBe(END);
});
test("reviewer rejects → coder retry when budget allows", () => {
const steps: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(),
coderStep(),
reviewerStep(false),
];
expect(developModerator(makeCtx(20, steps))).toBe("coder");
});
test("reviewer rejects → END when max rounds exhausted", () => {
const steps: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(),
coderStep(),
reviewerStep(false),
];
expect(developModerator(makeCtx(4, steps))).toBe(END);
});
test("tester failed → coder retry when budget allows", () => {
const steps: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(),
coderStep(),
reviewerStep(true),
testerStep(false),
];
expect(developModerator(makeCtx(20, steps))).toBe("coder");
});
test("tester failed → END when max rounds exhausted", () => {
const steps: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(),
coderStep(),
reviewerStep(true),
testerStep(false),
];
expect(developModerator(makeCtx(5, steps))).toBe(END);
});
test("multiple planner phases → coder until all complete, then reviewer", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "AA000001", title: "first phase" },
{ hash: "AA000002", title: "second phase" },
];
expect(developModerator(makeCtx(20, [plannerStep(phases)]))).toBe("coder");
expect(developModerator(makeCtx(20, [plannerStep(phases), coderStep("AA000001")]))).toBe(
"coder",
);
expect(
developModerator(
makeCtx(20, [plannerStep(phases), coderStep("AA000001"), coderStep("AA000002")]),
),
).toBe("reviewer");
});
test("one-shot coder reports only last phase hash → reviewer (moderator treats as all phases done)", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "BB000001", title: "setup branch" },
{ hash: "BB000002", title: "write tests" },
{ hash: "BB000003", title: "verify" },
{ hash: "BB000004", title: "polish" },
];
expect(developModerator(makeCtx(20, [plannerStep(phases), coderStep("BB000004")]))).toBe(
"reviewer",
);
});
test("unrecognised completedPhase hash → coder retry when budget allows", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "CC000001", title: "first phase" },
{ hash: "CC000002", title: "second phase" },
];
expect(developModerator(makeCtx(20, [plannerStep(phases), coderStep("all-done")]))).toBe(
"coder",
);
});
test("incomplete phases → END when max rounds exhausted", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "DD000001", title: "first phase" },
{ hash: "DD000002", title: "second phase" },
];
const steps: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(phases),
coderStep("DD000001"),
];
expect(developModerator(makeCtx(3, steps))).toBe(END);
});
test("committer → END for any committer meta status", () => {
const committed = committerStep({ status: "committed", branch: "f", commitSha: "x" });
const recoverable = committerStep({
status: "recoverable",
error: "merge conflict",
logRef: null,
});
const unrecoverable = committerStep({
status: "unrecoverable",
error: "repo missing",
logRef: "log1",
});
const base: ModeratorContext<DevelopMeta>["steps"] = [
plannerStep(),
coderStep(),
reviewerStep(true),
testerStep(true),
];
expect(developModerator(makeCtx(20, [...base, committed]))).toBe(END);
expect(developModerator(makeCtx(20, [...base, recoverable]))).toBe(END);
expect(developModerator(makeCtx(20, [...base, unrecoverable]))).toBe(END);
});
});
describe("buildDevelopDescriptor", () => {
test("lists all roles with schemas that validate", () => {
const descriptor = buildDevelopDescriptor();
const validated = validateWorkflowDescriptor(descriptor);
expect(validated.ok).toBe(true);
if (!validated.ok) {
throw new Error(validated.error);
}
expect(Object.keys(validated.value.roles).sort()).toEqual([
"coder",
"committer",
"planner",
"reviewer",
"tester",
]);
for (const key of ["planner", "coder", "reviewer", "tester", "committer"] as const) {
const role = validated.value.roles[key];
expect(role).toBeDefined();
expect(typeof role.schema).toBe("object");
expect(role.schema).not.toBeNull();
expect(Array.isArray(role.schema)).toBe(false);
}
});
});
@@ -0,0 +1,19 @@
{
"name": "@uncaged/workflow-template-develop",
"version": "0.1.0",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"scripts": {
"build": "echo 'TODO'",
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow": "workspace:*",
"@uncaged/workflow-role-coder": "workspace:*",
"@uncaged/workflow-role-committer": "workspace:*",
"@uncaged/workflow-role-planner": "workspace:*",
"@uncaged/workflow-role-reviewer": "workspace:*",
"@uncaged/workflow-role-tester": "workspace:*"
}
}
@@ -0,0 +1,12 @@
import { buildDescriptor } from "@uncaged/workflow";
import { developModerator } from "./moderator.js";
import { DEVELOP_WORKFLOW_DESCRIPTION, developRoles } from "./roles.js";
export function buildDevelopDescriptor() {
return buildDescriptor({
description: DEVELOP_WORKFLOW_DESCRIPTION,
roles: developRoles,
moderator: developModerator,
});
}
@@ -0,0 +1,60 @@
import {
type AgentBinding,
createWorkflow,
type ExtractFn,
type LlmProvider,
type WorkflowDefinition,
type WorkflowFn,
} from "@uncaged/workflow";
import { developModerator } from "./moderator.js";
import { DEVELOP_WORKFLOW_DESCRIPTION, type DevelopMeta, developRoles } from "./roles.js";
export {
type CoderMeta,
coderMetaSchema,
coderRole,
} from "@uncaged/workflow-role-coder";
export {
type CommitterMeta,
committerMetaSchema,
committerRole,
} from "@uncaged/workflow-role-committer";
export {
type PlannerMeta,
phaseSchema,
plannerMetaSchema,
plannerRole,
} from "@uncaged/workflow-role-planner";
export {
type ReviewerMeta,
reviewerMetaSchema,
reviewerRole,
} from "@uncaged/workflow-role-reviewer";
export {
type TesterMeta,
testerMetaSchema,
testerRole,
} from "@uncaged/workflow-role-tester";
export { buildDevelopDescriptor } from "./descriptor.js";
export { developModerator } from "./moderator.js";
export {
DEVELOP_WORKFLOW_DESCRIPTION,
type DevelopMeta,
type DevelopRoles,
developRoles,
} from "./roles.js";
export const developWorkflowDefinition: WorkflowDefinition<DevelopMeta> = {
description: DEVELOP_WORKFLOW_DESCRIPTION,
roles: developRoles,
moderator: developModerator,
};
export function createDevelopRun(
binding: AgentBinding,
extract: ExtractFn,
llmProvider: LlmProvider | null,
): WorkflowFn {
return createWorkflow(developWorkflowDefinition, binding, extract, llmProvider);
}
@@ -0,0 +1,89 @@
import type { Moderator, ModeratorContext } from "@uncaged/workflow";
import { END } from "@uncaged/workflow";
import type { DevelopMeta } from "./roles.js";
function coderFinishedAllPlannedPhases(
phases: ReadonlyArray<{ hash: string }>,
coderCompletedPhases: ReadonlyArray<string>,
): boolean {
if (phases.length === 0) {
return true;
}
const plannedHashes = new Set(phases.map((p) => p.hash));
const lastHash = phases[phases.length - 1].hash;
const explicit = new Set(coderCompletedPhases.filter((h) => plannedHashes.has(h)));
if (phases.every((p) => explicit.has(p.hash))) {
return true;
}
if (coderCompletedPhases.some((h) => h === lastHash)) {
return true;
}
return false;
}
function nextAfterCoder(
ctx: ModeratorContext<DevelopMeta>,
maxRounds: number,
): (keyof DevelopMeta & string) | typeof END {
const plannerStep = ctx.steps.find((s) => s.role === "planner");
if (plannerStep === undefined) {
return "reviewer";
}
const phases = plannerStep.meta.phases;
const coderCompletedPhases = ctx.steps
.filter((s) => s.role === "coder")
.map((s) => s.meta.completedPhase);
const allDone = coderFinishedAllPlannedPhases(phases, coderCompletedPhases);
if (allDone) {
return "reviewer";
}
if (ctx.steps.length < maxRounds - 1) {
return "coder";
}
return END;
}
export const developModerator: Moderator<DevelopMeta> = (ctx) => {
const maxRounds = ctx.start.meta.maxRounds;
if (ctx.steps.length === 0) {
return "planner";
}
const last = ctx.steps[ctx.steps.length - 1];
if (last.role === "planner") {
return "coder";
}
if (last.role === "coder") {
return nextAfterCoder(ctx, maxRounds);
}
if (last.role === "reviewer") {
if (last.meta.status === "approved") {
return "tester";
}
if (ctx.steps.length < maxRounds - 1) {
return "coder";
}
return END;
}
if (last.role === "tester") {
if (last.meta.status === "passed") {
return "committer";
}
if (ctx.steps.length < maxRounds - 1) {
return "coder";
}
return END;
}
if (last.role === "committer") {
return END;
}
return END;
};
@@ -0,0 +1,29 @@
import type { RoleDefinition } from "@uncaged/workflow";
import { type CoderMeta, coderRole } from "@uncaged/workflow-role-coder";
import { type CommitterMeta, committerRole } from "@uncaged/workflow-role-committer";
import { type PlannerMeta, plannerRole } from "@uncaged/workflow-role-planner";
import { type ReviewerMeta, reviewerRole } from "@uncaged/workflow-role-reviewer";
import { type TesterMeta, testerRole } from "@uncaged/workflow-role-tester";
export const DEVELOP_WORKFLOW_DESCRIPTION =
"Plan phases, implement incrementally, review, verify with tests/build/lint, and commit (planner → coder [repeat per phase] → reviewer → tester → committer).";
export type DevelopMeta = {
planner: PlannerMeta;
coder: CoderMeta;
reviewer: ReviewerMeta;
tester: TesterMeta;
committer: CommitterMeta;
};
export type DevelopRoles = {
[K in keyof DevelopMeta]: RoleDefinition<DevelopMeta[K]>;
};
export const developRoles: DevelopRoles = {
planner: plannerRole,
coder: coderRole,
reviewer: reviewerRole,
tester: testerRole,
committer: committerRole,
};
@@ -0,0 +1,17 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist",
"composite": true
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../workflow" },
{ "path": "../workflow-role-coder" },
{ "path": "../workflow-role-committer" },
{ "path": "../workflow-role-planner" },
{ "path": "../workflow-role-reviewer" },
{ "path": "../workflow-role-tester" }
]
}
@@ -313,7 +313,7 @@ describe("createSolveIssueRun", () => {
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract, null);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas },
@@ -374,6 +374,7 @@ describe("createSolveIssueRun", () => {
},
},
stubExtract,
null,
);
const gen = run(
{ prompt: "task", steps: [] },
@@ -2,6 +2,7 @@ import {
type AgentBinding,
createWorkflow,
type ExtractFn,
type LlmProvider,
type WorkflowDefinition,
type WorkflowFn,
} from "@uncaged/workflow";
@@ -50,6 +51,10 @@ export const solveIssueWorkflowDefinition: WorkflowDefinition<SolveIssueMeta> =
moderator: solveIssueModerator,
};
export function createSolveIssueRun(binding: AgentBinding, extract: ExtractFn): WorkflowFn {
return createWorkflow(solveIssueWorkflowDefinition, binding, extract);
export function createSolveIssueRun(
binding: AgentBinding,
extract: ExtractFn,
llmProvider: LlmProvider | null,
): WorkflowFn {
return createWorkflow(solveIssueWorkflowDefinition, binding, extract, llmProvider);
}
@@ -23,6 +23,7 @@ describe("buildDescriptor", () => {
extractPrompt: "Extract title and count from the analysis.",
schema,
extractRefs: null,
extractMode: "single",
},
},
moderator: () => END,
+169 -1
View File
@@ -15,7 +15,7 @@ import {
parseMerkleNode,
serializeMerkleNode,
} from "../src/merkle.js";
import { END } from "../src/types.js";
import { END, type LlmProvider } from "../src/types.js";
const plannerMetaSchema = z.object({
plan: z.string(),
@@ -97,6 +97,7 @@ const demoWorkflow = createWorkflow<DemoMeta>(
extractPrompt: "Extract plan text and affected files list.",
schema: plannerMetaSchema,
extractRefs: null,
extractMode: "single",
},
coder: {
description: "Demo coder",
@@ -104,6 +105,7 @@ const demoWorkflow = createWorkflow<DemoMeta>(
extractPrompt: "Extract the code diff summary.",
schema: coderMetaSchema,
extractRefs: null,
extractMode: "single",
},
},
moderator: (ctx) => {
@@ -124,6 +126,7 @@ const demoWorkflow = createWorkflow<DemoMeta>(
},
},
demoExtract,
null,
);
describe("executeThread", () => {
@@ -445,4 +448,169 @@ describe("executeThread", () => {
await rm(root, { recursive: true, force: true });
}
});
test("extractMode react traverses CAS DAG via cas_get during extraction", async () => {
const dagMetaSchema = z.object({ leafPayload: z.string() });
type DagDemoMeta = { walker: z.infer<typeof dagMetaSchema> };
const origFetch = globalThis.fetch;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
let fetchRound = 0;
const root = await mkdtemp(join(tmpdir(), "wf-engine-react-"));
try {
const cas = createCasStore(join(root, "cas"));
const leafYaml = serializeMerkleNode(createContentMerkleNode("needle-from-leaf"));
const leafHash = await cas.put(leafYaml);
const rootYaml = serializeMerkleNode({
type: "thread",
payload: {
workflow: "dag-demo",
threadId: "01DAG00000000000000000001",
result: { returnCode: 0, summary: "" },
},
children: [leafHash],
});
const dagRootHash = await cas.put(rootYaml);
globalThis.fetch = Object.assign(
async (_input: Parameters<typeof fetch>[0], _init?: RequestInit) => {
fetchRound += 1;
if (fetchRound === 1) {
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: "c1",
type: "function",
function: {
name: "cas_get",
arguments: JSON.stringify({ hash: dagRootHash }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
if (fetchRound === 2) {
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: "c2",
type: "function",
function: {
name: "cas_get",
arguments: JSON.stringify({ hash: leafHash }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: "c3",
type: "function",
function: {
name: "extract",
arguments: JSON.stringify({ leafPayload: "needle-from-leaf" }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
},
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const llm: LlmProvider = { baseUrl: "http://127.0.0.1:9", apiKey: "test", model: "test" };
const extractFn = createExtract(llm);
const dagWorkflow = createWorkflow<DagDemoMeta>(
{
roles: {
walker: {
description: "DAG walker",
systemPrompt: "Output only the root CAS hash.",
extractPrompt:
"Set leafPayload to the string payload of the content Merkle node under the root.",
schema: dagMetaSchema,
extractRefs: null,
extractMode: "react",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "walker" : END),
},
{ agent: async () => dagRootHash },
extractFn,
llm,
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
dagWorkflow,
"dag-demo",
{ prompt: "traverse", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(fetchRound).toBe(3);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
const roleRec = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(roleRec.role).toBe("walker");
expect(roleRec.meta).toEqual({ leafPayload: "needle-from-leaf" });
} finally {
globalThis.fetch = origFetch;
await rm(root, { recursive: true, force: true });
}
});
});
@@ -87,6 +87,26 @@ describe("fork-thread", () => {
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
});
test("parseThreadDataJsonl ignores trailing WorkflowResult line", () => {
const text = `${sampleDataJsonl.trim()}\n{"returnCode":0,"summary":"done"}\n`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.roleSteps.length).toBe(3);
expect(r.value.roleSteps[2]?.role).toBe("reviewer");
});
test("parseThreadDataJsonl errors when WorkflowResult is not last", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3}},"timestamp":1}
{"returnCode":0,"summary":"early"}
{"role":"planner","content":"x","meta":{},"timestamp":2}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(false);
});
test("parseThreadDataJsonl reads explicit depth from start record", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2}
@@ -0,0 +1,209 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createContentMerkleNode, serializeMerkleNode } from "../src/merkle.js";
import { reactExtract } from "../src/react-extract.js";
import type { LlmProvider } from "../src/types.js";
const metaSchema = z.object({ seen: z.string() });
const provider: LlmProvider = {
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
};
describe("reactExtract", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("cas_get rounds then extract tool yields validated meta", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-"));
const cas = createCasStore(casDir);
try {
const blob = serializeMerkleNode(createContentMerkleNode("needle"));
const h = await cas.put(blob);
const origFetch = globalThis.fetch;
let round = 0;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(
async (_input: Parameters<typeof fetch>[0], _init?: RequestInit) => {
round += 1;
if (round === 1) {
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: "t1",
type: "function",
function: {
name: "cas_get",
arguments: JSON.stringify({ hash: h }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: "t2",
type: "function",
function: {
name: "extract",
arguments: JSON.stringify({ seen: "needle" }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
},
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const text = `## Agent Output\n${h}\n## Extraction Instruction\nExtract seen from CAS.`;
const result = await reactExtract({
text,
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(true);
if (!result.ok) {
return;
}
expect(result.value).toEqual({ seen: "needle" });
expect(round).toBe(2);
} finally {
await rm(casDir, { recursive: true, force: true });
}
});
test("stops after max tool rounds when model keeps calling cas_get", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-max-"));
const cas = createCasStore(casDir);
try {
const blob = serializeMerkleNode(createContentMerkleNode("x"));
const h = await cas.put(blob);
const origFetch = globalThis.fetch;
let round = 0;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(
async (_input: Parameters<typeof fetch>[0], _init?: RequestInit) => {
round += 1;
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
id: `loop-${round}`,
type: "function",
function: {
name: "cas_get",
arguments: JSON.stringify({ hash: h }),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
},
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const result = await reactExtract({
text: "## Agent Output\nnoop\n## Extraction Instruction\nExtract seen.",
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(false);
if (result.ok) {
return;
}
expect(result.error).toBe("max_react_rounds_exceeded");
expect(round).toBe(10);
} finally {
await rm(casDir, { recursive: true, force: true });
}
});
test("passthrough JSON assistant message without tool calls", async () => {
const casDir = await mkdtemp(join(tmpdir(), "react-extract-pass-"));
const cas = createCasStore(casDir);
try {
const origFetch = globalThis.fetch;
restoreFetch = () => {
globalThis.fetch = origFetch;
};
globalThis.fetch = Object.assign(
async (_input: Parameters<typeof fetch>[0], _init?: RequestInit) =>
new Response(
JSON.stringify({
choices: [
{
message: {
content: '{"seen":"direct"}',
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
),
{ preconnect: origFetch.preconnect.bind(origFetch) },
) as typeof fetch;
const result = await reactExtract({
text: "## Agent Output\nok\n## Extraction Instruction\nExtract.",
schema: metaSchema,
provider,
cas,
});
expect(result.ok).toBe(true);
if (!result.ok) {
return;
}
expect(result.value).toEqual({ seen: "direct" });
} finally {
await rm(casDir, { recursive: true, force: true });
}
});
});
@@ -91,6 +91,7 @@ const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
extractPrompt: "Extract phases with CAS hashes.",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
extractMode: "single",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
@@ -99,6 +100,7 @@ const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
agent: async () => "plan-output",
},
refsDemoExtract,
null,
);
describe("RoleStep refs tracking", () => {
+4 -2
View File
@@ -125,7 +125,7 @@ describe("worker process", () => {
.trim()
.split("\n")
.filter((l) => l !== "").length,
).toBe(3);
).toBe(4);
} finally {
await rm(root, { recursive: true, force: true });
}
@@ -187,7 +187,7 @@ describe("worker process", () => {
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
expect(lines.length).toBe(4);
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(start.forkFrom).toEqual({ threadId: srcId });
const replay = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
@@ -195,6 +195,8 @@ describe("worker process", () => {
expect(replay.timestamp).toBe(555);
const coder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(coder.role).toBe("coder");
const done = JSON.parse(lines[3] ?? "{}") as Record<string, unknown>;
expect(done.returnCode).toBe(0);
} finally {
await rm(root, { recursive: true, force: true });
}
@@ -142,12 +142,14 @@ describe("workflowAsAgent integration", () => {
extractPrompt: "extract done flag",
schema: callerMetaSchema,
extractRefs: null,
extractMode: "single",
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
},
{ agent: workflowAsAgent("child-wf", { storageRoot: root }) },
parentExtract,
null,
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
+48 -6
View File
@@ -1,11 +1,14 @@
import type { ExtractFn } from "./extract-fn.js";
import type { CasStore } from "./cas.js";
import { buildExtractUserContent, type ExtractFn } from "./extract-fn.js";
import { putContentMerkleNode } from "./merkle.js";
import { reactExtract } from "./react-extract.js";
import { mergeRefsWithContentHash } from "./refs-field.js";
import {
type AgentBinding,
type AgentContext,
END,
type ExtractContext,
type LlmProvider,
type ModeratorContext,
type RoleDefinition,
type RoleMeta,
@@ -36,14 +39,51 @@ function resolveExtractedRefs(
return extractRefsFn(meta as Record<string, unknown>);
}
async function resolveRoleMeta<M extends RoleMeta>(
roleDef: RoleDefinition<Record<string, unknown>>,
extractCtx: ExtractContext<M>,
extract: ExtractFn,
llmProvider: LlmProvider | null,
cas: CasStore,
): Promise<Record<string, unknown>> {
if (roleDef.extractMode === "react") {
if (llmProvider === null) {
throw new Error(
'createWorkflow: llmProvider is required when a role uses extractMode "react"',
);
}
const text = await buildExtractUserContent(
extractCtx as unknown as ExtractContext,
roleDef.extractPrompt,
);
const reactResult = await reactExtract({
text,
schema: roleDef.schema,
provider: llmProvider,
cas,
});
if (!reactResult.ok) {
throw new Error(`react extract failed: ${reactResult.error}`);
}
return reactResult.value as Record<string, unknown>;
}
return (await extract(
roleDef.schema,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
)) as Record<string, unknown>;
}
/**
* Binds pure role definitions + moderator to runtime agents and structured extraction.
* Assign with `export const run = createWorkflow(def, binding, extract)`.
* Assign with `export const run = createWorkflow(def, binding, extract, llmProvider)`.
* Pass the same {@link LlmProvider} as {@link createExtract} when any role uses `extractMode: "react"`.
*/
export function createWorkflow<M extends RoleMeta>(
def: Pick<WorkflowDefinition<M>, "roles" | "moderator">,
binding: AgentBinding,
extract: ExtractFn,
llmProvider: LlmProvider | null,
): WorkflowFn {
return async function* workflowLoop(
input: ThreadInput,
@@ -107,10 +147,12 @@ export function createWorkflow<M extends RoleMeta>(
agentContent: raw,
};
const meta = await extract(
roleDef.schema,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
const meta = await resolveRoleMeta(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
extractCtx,
extract,
llmProvider,
options.cas,
);
const contentHash = await putContentMerkleNode(options.cas, raw);
+35 -27
View File
@@ -10,6 +10,40 @@ export type ExtractFn = <T extends Record<string, unknown>>(
ctx: ExtractContext,
) => Promise<T>;
/** Builds the user-side extraction prompt (thread + agent output + instruction). */
export async function buildExtractUserContent(
ctx: ExtractContext,
prompt: string,
): Promise<string> {
const lines: string[] = [];
lines.push(`## Role: ${ctx.currentRole.name}`);
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
lines.push("## Task");
lines.push(ctx.start.content);
lines.push("");
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(ctx.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
lines.push(`### ${step.role}`);
lines.push(body);
lines.push(`Meta: ${JSON.stringify(step.meta)}`);
lines.push("");
}
}
lines.push("## Agent Output");
lines.push(ctx.agentContent);
lines.push("");
lines.push("## Extraction Instruction");
lines.push(prompt);
return lines.join("\n");
}
/**
* Create an ExtractFn backed by an LLM provider.
* Builds prompt text from {@link ExtractContext} plus `prompt` and calls structured extraction.
@@ -20,33 +54,7 @@ export function createExtract(provider: LlmProvider): ExtractFn {
prompt: string,
ctx: ExtractContext,
): Promise<T> => {
const lines: string[] = [];
lines.push(`## Role: ${ctx.currentRole.name}`);
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
lines.push("## Task");
lines.push(ctx.start.content);
lines.push("");
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(ctx.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
lines.push(`### ${step.role}`);
lines.push(body);
lines.push(`Meta: ${JSON.stringify(step.meta)}`);
lines.push("");
}
}
lines.push("## Agent Output");
lines.push(ctx.agentContent);
lines.push("");
lines.push("## Extraction Instruction");
lines.push(prompt);
const text = lines.join("\n");
const text = await buildExtractUserContent(ctx, prompt);
const result = await llmExtractWithRetry({ text, schema, provider });
if (!result.ok) {
throw new Error(`extract failed: ${JSON.stringify(result.error)}`);
+42 -11
View File
@@ -1,6 +1,6 @@
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import type { RoleOutput } from "./types.js";
import type { RoleOutput, WorkflowCompletion } from "./types.js";
/** Role steps replayed from `.data.jsonl`, including persisted timestamps. */
export type ForkHistoricalStep = RoleOutput & { timestamp: number };
@@ -14,33 +14,56 @@ export type ParsedThreadStartRecord = {
depth: number;
};
function parseRoleLine(
/** Recognizes a persisted workflow completion line (no `role`; has numeric `returnCode` and string `summary`). Omits `rootHash` when absent. */
export function tryParseWorkflowResultRecord(
obj: Record<string, unknown>,
lineIndex: number,
): Result<ForkHistoricalStep, string> {
): WorkflowCompletion | null {
if (obj.role !== undefined) {
return null;
}
const returnCode = obj.returnCode;
const summary = obj.summary;
if (typeof returnCode !== "number" || typeof summary !== "string") {
return null;
}
return { returnCode, summary };
}
export function tryParseRoleStepRecord(obj: Record<string, unknown>): ForkHistoricalStep | null {
const role = obj.role;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return err(`invalid role record at line ${lineIndex}: missing role`);
return null;
}
if (typeof contentHash !== "string") {
return err(`invalid role record at line ${lineIndex}: missing contentHash`);
return null;
}
if (meta === null || typeof meta !== "object") {
return err(`invalid role record at line ${lineIndex}: missing meta`);
return null;
}
if (typeof timestamp !== "number") {
return err(`invalid role record at line ${lineIndex}: missing timestamp`);
return null;
}
return ok({
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
});
};
}
function parseRoleLine(
obj: Record<string, unknown>,
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const parsed = tryParseRoleStepRecord(obj);
if (parsed === null) {
return err(`invalid role record at line ${lineIndex}`);
}
return ok(parsed);
}
function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord, string> {
@@ -109,7 +132,15 @@ function parseFollowingRoleLines(lines: string[]): Result<ForkHistoricalStep[],
if (rec === null || typeof rec !== "object") {
return err(`invalid record at line ${i + 1}`);
}
const parsed = parseRoleLine(rec as Record<string, unknown>, i + 1);
const recObj = rec as Record<string, unknown>;
const wf = tryParseWorkflowResultRecord(recObj);
if (wf !== null) {
if (i !== lines.length - 1) {
return err("WorkflowResult record must be the final line in `.data.jsonl`");
}
break;
}
const parsed = parseRoleLine(recObj, i + 1);
if (!parsed.ok) {
return parsed;
}
+4
View File
@@ -25,6 +25,8 @@ export {
type ParsedThreadStartRecord,
parseThreadDataJsonl,
selectForkHistoricalSteps,
tryParseRoleStepRecord,
tryParseWorkflowResultRecord,
} from "./fork-thread.js";
export { type GcResult, garbageCollectCas } from "./gc.js";
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
@@ -54,6 +56,7 @@ export {
serializeMerkleNode,
type ThreadMerklePayload,
} from "./merkle.js";
export { type ReactExtractArgs, reactExtract } from "./react-extract.js";
export {
type ExtractProviderConfig,
getRegisteredWorkflow,
@@ -80,6 +83,7 @@ export {
type AgentFn,
END,
type ExtractContext,
type ExtractMode,
type LlmProvider,
type Moderator,
type ModeratorContext,
+20 -8
View File
@@ -47,6 +47,21 @@ function readToolDescription(parametersSchema: Record<string, unknown>): string
return "Extract structured data from the input text.";
}
/** Builds OpenAI function-tool metadata from a Zod meta schema (same naming rules as single-shot extract). */
export function extractFunctionToolFromZodSchema(schema: z.ZodType<unknown>): {
name: string;
description: string;
parameters: Record<string, unknown>;
} {
const rawJsonSchema = z.toJSONSchema(schema) as Record<string, unknown>;
const parameters = stripJsonSchemaMeta(rawJsonSchema);
return {
name: readToolName(parameters),
description: readToolDescription(parameters),
parameters,
};
}
function readToolArgumentsJson(parsed: unknown, previewSource: string): Result<string, LlmError> {
if (!isRecord(parsed)) {
return err({ kind: "invalid_response_json", message: "Top-level JSON is not an object" });
@@ -124,10 +139,7 @@ export function llmErrorToCause(error: LlmError): Error {
async function performLlmExtract<T>(
options: LlmExtractArgs<T> & { userContent: string },
): Promise<Result<T, LlmError>> {
const rawJsonSchema = z.toJSONSchema(options.schema) as Record<string, unknown>;
const parameters = stripJsonSchemaMeta(rawJsonSchema);
const toolName = readToolName(parameters);
const toolDescription = readToolDescription(parameters);
const extractTool = extractFunctionToolFromZodSchema(options.schema);
const body = {
model: options.provider.model,
@@ -142,13 +154,13 @@ async function performLlmExtract<T>(
{
type: "function" as const,
function: {
name: toolName,
description: toolDescription,
parameters,
name: extractTool.name,
description: extractTool.description,
parameters: extractTool.parameters,
},
},
],
tool_choice: { type: "function" as const, function: { name: toolName } },
tool_choice: { type: "function" as const, function: { name: extractTool.name } },
};
let response: Response;
+330
View File
@@ -0,0 +1,330 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas.js";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
import { err, ok, type Result } from "./result.js";
import type { LlmProvider } from "./types.js";
export type ReactExtractArgs<T extends Record<string, unknown>> = {
text: string;
schema: z.ZodType<T>;
provider: LlmProvider;
cas: CasStore;
};
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and children fields.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function tryParseJsonContent(content: string): unknown | null {
const trimmed = content.trim();
const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed);
const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed;
try {
return JSON.parse(payload) as unknown;
} catch {
return null;
}
}
type ToolCall = {
id: string;
type: "function";
function: { name: string; arguments: string };
};
type ChatMessage =
| { role: "system"; content: string }
| { role: "user"; content: string }
| {
role: "assistant";
content: string | null;
tool_calls: ToolCall[];
}
| { role: "tool"; tool_call_id: string; content: string };
type AssistantTurn<T> =
| { kind: "plain_json"; value: T }
| { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null };
function firstAssistantMessage(responseText: string): Result<Record<string, unknown>, string> {
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`invalid_response_json:${message}`);
}
if (!isRecord(parsed)) {
return err("invalid_response_top_level");
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return err("no_choices_in_response");
}
const firstChoice = choices[0];
if (!isRecord(firstChoice)) {
return err("invalid_choice");
}
const messageObj = firstChoice.message;
if (!isRecord(messageObj)) {
return err("invalid_message");
}
return ok(messageObj);
}
function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string> {
const toolCalls: ToolCall[] = [];
for (const tc of toolCallsRaw) {
if (!isRecord(tc)) {
return err("invalid_tool_call");
}
const id = tc.id;
const tcType = tc.type;
const fn = tc.function;
if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) {
return err("invalid_tool_call_shape");
}
const name = fn.name;
const argumentsStr = fn.arguments;
if (typeof name !== "string" || typeof argumentsStr !== "string") {
return err("invalid_tool_call_function");
}
toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } });
}
return ok(toolCalls);
}
function classifyAssistantTurn<T extends Record<string, unknown>>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
): Result<AssistantTurn<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
if (typeof content !== "string") {
return err("no_tool_calls_and_no_string_content");
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return err("no_tool_calls_and_content_not_json");
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return err(`schema_validation_failed:${validated.error.message}`);
}
return ok({ kind: "plain_json", value: validated.data });
}
const callsResult = normalizeToolCalls(toolCallsRaw);
if (!callsResult.ok) {
return err(callsResult.error);
}
const assistantContent = messageObj.content;
return ok({
kind: "tool_calls",
calls: callsResult.value,
assistantContent: typeof assistantContent === "string" ? assistantContent : null,
});
}
async function appendCasGetToolResult(
tc: ToolCall,
cas: CasStore,
messages: ChatMessage[],
): Promise<Result<null, string>> {
let hash: string;
try {
const ta = JSON.parse(tc.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return err("cas_get_invalid_arguments");
}
hash = ta.hash;
} catch {
return err("cas_get_arguments_not_json");
}
const blob = await cas.get(hash);
const toolContent = blob === null ? "null" : blob;
messages.push({
role: "tool",
tool_call_id: tc.id,
content: toolContent,
});
return ok(null);
}
async function appendExtractToolResult<T extends Record<string, unknown>>(
tc: ToolCall,
schema: z.ZodType<T>,
messages: ChatMessage[],
): Promise<Result<T, string>> {
let parsedArgs: unknown;
try {
parsedArgs = JSON.parse(tc.function.arguments) as unknown;
} catch {
return err("extract_tool_arguments_not_json");
}
const validated = schema.safeParse(parsedArgs);
if (!validated.success) {
return err(`schema_validation_failed:${validated.error.message}`);
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: '{"ok":true}',
});
return ok(validated.data);
}
async function appendToolResults<T extends Record<string, unknown>>(
toolCalls: ToolCall[],
extractToolName: string,
schema: z.ZodType<T>,
cas: CasStore,
messages: ChatMessage[],
): Promise<Result<T | null, string>> {
let extracted: T | null = null;
for (const tc of toolCalls) {
if (tc.function.name === "cas_get") {
const casRes = await appendCasGetToolResult(tc, cas, messages);
if (!casRes.ok) {
return casRes;
}
continue;
}
if (tc.function.name === extractToolName) {
const exRes = await appendExtractToolResult(tc, schema, messages);
if (!exRes.ok) {
return exRes;
}
extracted = exRes.value;
continue;
}
return err(`unknown_tool:${tc.function.name}`);
}
return ok(extracted);
}
async function postChatCompletion(
provider: LlmProvider,
messages: ChatMessage[],
tools: readonly Record<string, unknown>[],
): Promise<Result<string, string>> {
try {
const response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: provider.model,
messages,
tools,
tool_choice: "auto",
}),
});
const responseText = await response.text();
if (!response.ok) {
return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`);
}
return ok(responseText);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`network_error:${message}`);
}
}
/**
* Multi-turn ReAct extraction with `cas_get` plus a schema-shaped extract tool (OpenAI-compatible).
* Final meta comes from a successful extract tool call or from plain JSON in the assistant message.
*/
export async function reactExtract<T extends Record<string, unknown>>(
args: ReactExtractArgs<T>,
): Promise<Result<T, string>> {
const extractTool = extractFunctionToolFromZodSchema(args.schema);
const tools = [
CAS_GET_TOOL_DEFINITION,
{
type: "function" as const,
function: {
name: extractTool.name,
description: extractTool.description,
parameters: extractTool.parameters,
},
},
];
const systemContent = `You extract structured metadata from the agent output below. Use cas_get to read Merkle DAG nodes from CAS (YAML: type, payload, children) when the agent output references hashes you must traverse. When you have the complete structured object, call the ${extractTool.name} tool with JSON arguments matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`;
const messages: ChatMessage[] = [
{ role: "system", content: systemContent },
{ role: "user", content: args.text },
];
for (let round = 0; round < MAX_REACT_ROUNDS; round++) {
const bodyResult = await postChatCompletion(args.provider, messages, tools);
if (!bodyResult.ok) {
return bodyResult;
}
const msgResult = firstAssistantMessage(bodyResult.value);
if (!msgResult.ok) {
return msgResult;
}
const classified = classifyAssistantTurn(msgResult.value, args.schema);
if (!classified.ok) {
return classified;
}
const turn = classified.value;
if (turn.kind === "plain_json") {
return ok(turn.value);
}
messages.push({
role: "assistant",
content: turn.assistantContent,
tool_calls: turn.calls,
});
const toolsRound = await appendToolResults(
turn.calls,
extractTool.name,
args.schema,
args.cas,
messages,
);
if (!toolsRound.ok) {
return toolsRound;
}
if (toolsRound.value !== null) {
return ok(toolsRound.value);
}
}
return err("max_react_rounds_exceeded");
}
+4
View File
@@ -16,6 +16,9 @@ export type LlmProvider = {
model: string;
};
/** How the engine runs meta extraction for a role after the agent phase. */
export type ExtractMode = "single" | "react";
/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */
export type RoleOutput = {
role: string;
@@ -121,6 +124,7 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
schema: z.ZodType<Meta>;
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
extractRefs: ((meta: Meta) => string[]) | null;
extractMode: ExtractMode;
};
/**
+6 -3
View File
@@ -1,4 +1,4 @@
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import { importWorkflowBundleModule } from "./bundle-import-env.js";
@@ -11,7 +11,7 @@ import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
import type { RoleOutput, WorkflowFn } from "./types.js";
import type { RoleOutput, WorkflowFn, WorkflowResult } from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
@@ -404,7 +404,7 @@ async function main(): Promise<void> {
});
}
await executeThread(
const runResult = await executeThread(
workflowFn,
cmd.workflowName,
{ prompt: cmd.prompt, steps: cmd.steps },
@@ -418,9 +418,12 @@ async function main(): Promise<void> {
io,
logger,
);
await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" };
await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {});
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
+3 -1
View File
@@ -23,10 +23,12 @@
{ "path": "packages/workflow-role-coder" },
{ "path": "packages/workflow-role-planner" },
{ "path": "packages/workflow-role-reviewer" },
{ "path": "packages/workflow-role-tester" },
{ "path": "packages/workflow-agent-cursor" },
{ "path": "packages/workflow-agent-hermes" },
{ "path": "packages/workflow-util-agent" },
{ "path": "packages/cli-workflow" },
{ "path": "packages/workflow-template-solve-issue" }
{ "path": "packages/workflow-template-solve-issue" },
{ "path": "packages/workflow-template-develop" }
]
}