feat: Phase 2 — Thread lifecycle, execution engine, worker, CLI

- types.ts: START/END, RoleMeta, ThreadContext, Role, Moderator, WorkflowDefinition
- engine.ts: executeThread with JSONL persistence + AbortSignal
- worker.ts: per-bundle process, TCP IPC, kill individual threads
- CLI: run/ps/kill/threads/thread/thread rm commands
- 32 tests pass, biome clean

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-06 04:59:54 +00:00
parent 01e930df8f
commit 7582a88d6b
46 changed files with 2829 additions and 167 deletions
+18 -11
View File
@@ -1,11 +1,9 @@
{
"$schema": "https://biomejs.dev/schemas/1.9.0/schema.json",
"$schema": "https://biomejs.dev/schemas/2.4.14/schema.json",
"files": {
"ignore": ["**/dist/**", "**/node_modules/**"]
},
"organizeImports": {
"enabled": true
"includes": ["**", "!**/dist", "!**/node_modules"]
},
"assist": { "actions": { "source": { "organizeImports": "on" } } },
"formatter": {
"indentStyle": "space",
"indentWidth": 2,
@@ -19,7 +17,7 @@
},
"overrides": [
{
"include": ["**/__tests__/**"],
"includes": ["**/__tests__/**"],
"linter": {
"rules": {
"suspicious": {
@@ -30,6 +28,16 @@
}
}
}
},
{
"includes": ["**/*.d.ts"],
"linter": {
"rules": {
"style": {
"noDefaultExport": "off"
}
}
}
}
],
"linter": {
@@ -43,7 +51,6 @@
"noParameterProperties": "error",
"useImportType": "error",
"useShorthandFunctionType": "error",
"noVar": "error",
"useConst": "error",
"useEnumInitializers": "error"
},
@@ -60,15 +67,15 @@
}
},
"suspicious": {
"noExplicitAny": "error"
"noExplicitAny": "error",
"noVar": "error",
"noConsole": "error"
},
"correctness": {
"noUnusedVariables": "error",
"noUnusedImports": "error"
},
"nursery": {
"noConsole": "error"
}
"nursery": {}
}
}
}
+8 -1
View File
@@ -1,9 +1,16 @@
{
"name": "@uncaged/workflow-monorepo",
"private": true,
"workspaces": ["packages/*"],
"workspaces": [
"packages/*"
],
"scripts": {
"build": "bun run --filter '*' build",
"check": "biome check .",
"format": "biome format --write .",
"test": "bun run --filter '*' test"
},
"devDependencies": {
"@biomejs/biome": "^2.4.14"
}
}
@@ -0,0 +1,100 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdList, formatListLines } from "../src/cmd-list.js";
import { cmdRemove } from "../src/cmd-remove.js";
import { cmdShow } from "../src/cmd-show.js";
describe("cli workflow commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("add / list / show / remove roundtrip", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`import fs from "node:fs";
export default {
name: "solve-issue",
roles: {
noop: async () => {
fs.existsSync(".");
return { content: "ok", meta: { done: true } };
},
},
moderator(ctx) {
if (ctx.steps.length === 0) {
return "noop";
}
return "__end__";
},
};
`,
"utf8",
);
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
const listed = await cmdList(storageRoot);
expect(listed.ok).toBe(true);
if (listed.ok) {
const lines = formatListLines(listed.value);
expect(lines.some((l) => l.startsWith("solve-issue\t"))).toBe(true);
}
const shown = await cmdShow(storageRoot, "solve-issue");
expect(shown.ok).toBe(true);
if (!shown.ok) {
return;
}
expect(shown.value.hash.length).toBe(13);
const bundleOnDisk = await readFile(
join(storageRoot, "bundles", `${shown.value.hash}.esm.js`),
"utf8",
);
expect(bundleOnDisk.length).toBeGreaterThan(0);
const removed = await cmdRemove(storageRoot, "solve-issue");
expect(removed.ok).toBe(true);
const listedAfter = await cmdList(storageRoot);
expect(listedAfter.ok).toBe(true);
if (listedAfter.ok) {
expect(formatListLines(listedAfter.value)[0]).toBe("(no workflows registered)");
}
});
test("add rejects invalid bundles", async () => {
const bundlePath = join(storageRoot, "bad.esm.js");
await writeFile(
bundlePath,
'import x from "./local";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n',
"utf8",
);
const r = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(r.ok).toBe(false);
});
});
@@ -0,0 +1,214 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPs } from "../src/cmd-ps.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js";
const fastBundleSource = `export default {
name: "solve-issue",
roles: {
planner: async () => ({ content: "plan", meta: { plan: "x" } }),
coder: async () => ({ content: "code", meta: { diff: "y" } }),
},
moderator(ctx) {
if (ctx.steps.length === 0) return "planner";
if (ctx.steps.length === 1) return "coder";
return "__end__";
},
};
`;
const slowPlannerBundleSource = `export default {
name: "solve-issue",
roles: {
planner: async () => {
await new Promise((r) => setTimeout(r, 400));
return { content: "plan", meta: { plan: "x" } };
},
coder: async () => ({ content: "code", meta: { diff: "y" } }),
},
moderator(ctx) {
if (ctx.steps.length === 0) return "planner";
if (ctx.steps.length === 1) return "coder";
return "__end__";
},
};
`;
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `export default {
name: "solve-issue",
roles: {
planner: async () => {
await new Promise((r) => setTimeout(r, 600));
return { content: "plan", meta: { plan: "x" } };
},
coder: async () => ({ content: "code", meta: { diff: "y" } }),
},
moderator(ctx) {
if (ctx.steps.length === 0) return "planner";
if (ctx.steps.length === 1) return "coder";
return "__end__";
},
};
`;
describe("cli thread commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-thread-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("run / threads / thread / thread rm", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
let threads = await cmdThreads(storageRoot, []);
for (
let attempt = 0;
attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId));
attempt++
) {
await new Promise((r) => setTimeout(r, 20));
threads = await cmdThreads(storageRoot, []);
}
expect(threads.ok).toBe(true);
if (!threads.ok) {
return;
}
expect(threads.value.some((l) => l.includes(threadId))).toBe(true);
const shown = await cmdThreadShow(storageRoot, threadId);
expect(shown.ok).toBe(true);
if (!shown.ok) {
return;
}
expect(shown.value.includes('"threadId"')).toBe(true);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
expect(await pathExists(dataPath)).toBe(false);
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], {
env,
encoding: "utf8",
});
expect(threads.status).toBe(0);
const ps = spawnSync(process.execPath, [cliEntryPath, "ps"], { env, encoding: "utf8" });
expect(ps.status).toBe(0);
});
test("ps lists running threads while planner role is in-flight", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, slowPlannerBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 50));
const psEarly = await cmdPs(storageRoot);
expect(psEarly.some((l) => l.includes(threadId))).toBe(true);
await new Promise((r) => setTimeout(r, 900));
const psLate = await cmdPs(storageRoot);
expect(psLate).toEqual(["(no running threads)"]);
});
test("kill stops thread after the in-flight role (before subsequent roles)", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, abortablePlannerBundleSource, "utf8");
const added = await cmdAdd(storageRoot, "solve-issue", bundlePath);
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
await new Promise((r) => setTimeout(r, 50));
const killed = await cmdKill(storageRoot, threadId);
expect(killed.ok).toBe(true);
await new Promise((r) => setTimeout(r, 900));
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const text = await readFile(dataPath, "utf8");
const lines = text
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
expect(await pathExists(runningPath)).toBe(false);
});
});
+2 -1
View File
@@ -6,7 +6,8 @@
"uncaged-workflow": "src/cli.ts"
},
"dependencies": {
"@uncaged/workflow": "workspace:*"
"@uncaged/workflow": "workspace:*",
"yaml": "^2.8.4"
},
"scripts": {
"build": "echo 'TODO'",
+52
View File
@@ -0,0 +1,52 @@
import { copyFile, mkdir, readFile, stat } from "node:fs/promises";
import { join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
async function pathExists(path: string): Promise<boolean> {
try {
await stat(path);
return true;
} catch {
return false;
}
}
export async function storeWorkflowBundleCopy(
storageRoot: string,
hash: string,
resolvedSourcePath: string,
sourceText: string,
): Promise<Result<void, string>> {
const bundlesDir = join(storageRoot, "bundles");
const destPath = join(bundlesDir, `${hash}.esm.js`);
try {
await mkdir(bundlesDir, { recursive: true });
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to store bundle: ${message}`);
}
if (!(await pathExists(destPath))) {
try {
await copyFile(resolvedSourcePath, destPath);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to store bundle: ${message}`);
}
return ok(undefined);
}
let existing: string;
try {
existing = await readFile(destPath, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to store bundle: ${message}`);
}
if (existing !== sourceText) {
return err(`bundle hash ${hash} already exists with different contents; refusing to overwrite`);
}
return ok(undefined);
}
+228
View File
@@ -0,0 +1,228 @@
import { printCliError, printCliLine } from "./cli-output.js";
import { cmdAdd, formatAddSuccess } from "./cmd-add.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
import { cmdPs } from "./cmd-ps.js";
import { cmdRemove } from "./cmd-remove.js";
import { cmdRun } from "./cmd-run.js";
import { cmdShow, formatShowYaml } from "./cmd-show.js";
import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js";
import { cmdThreads } from "./cmd-threads.js";
import { parseRunArgv } from "./run-argv.js";
function usage(): string {
return [
"Usage:",
" uncaged-workflow add <name> <file>",
" uncaged-workflow list",
" uncaged-workflow show <name>",
" uncaged-workflow remove <name>",
" uncaged-workflow run <name> [--prompt <text>] [--dry-run] [--max-rounds N]",
" uncaged-workflow ps",
" uncaged-workflow kill <thread-id>",
" uncaged-workflow threads [name]",
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
].join("\n");
}
async function dispatchAdd(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
const file = argv[1];
if (name === undefined || file === undefined || argv.length > 2) {
printCliError(`${usage()}\n\nerror: add requires <name> <file>`);
return 1;
}
const result = await cmdAdd(storageRoot, name, file);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(formatAddSuccess(name, file, result.value.hash));
return 0;
}
async function dispatchList(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: list takes no arguments`);
return 1;
}
const result = await cmdList(storageRoot);
if (!result.ok) {
printCliError(result.error);
return 1;
}
for (const line of formatListLines(result.value)) {
printCliLine(line);
}
return 0;
}
async function dispatchShow(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: show requires <name>`);
return 1;
}
const result = await cmdShow(storageRoot, name);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(formatShowYaml(name, result.value));
return 0;
}
async function dispatchRemove(storageRoot: string, argv: string[]): Promise<number> {
const name = argv[0];
if (name === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: remove requires <name>`);
return 1;
}
const result = await cmdRemove(storageRoot, name);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`removed workflow "${name}" from registry`);
return 0;
}
async function dispatchRun(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseRunArgv(argv);
if (!parsed.ok) {
printCliError(`${usage()}\n\nerror: ${parsed.error}`);
return 1;
}
const result = await cmdRun(
storageRoot,
parsed.value.name,
parsed.value.prompt,
parsed.value.dryRun,
parsed.value.maxRounds,
);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(result.value.threadId);
return 0;
}
async function dispatchPs(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: ps takes no arguments`);
return 1;
}
for (const line of await cmdPs(storageRoot)) {
printCliLine(line);
}
return 0;
}
async function dispatchKill(storageRoot: string, argv: string[]): Promise<number> {
const threadId = argv[0];
if (threadId === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: kill requires <thread-id>`);
return 1;
}
const result = await cmdKill(storageRoot, threadId);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`kill sent for thread ${threadId}`);
return 0;
}
async function dispatchThreads(storageRoot: string, argv: string[]): Promise<number> {
const result = await cmdThreads(storageRoot, argv);
if (!result.ok) {
printCliError(result.error);
return 1;
}
for (const line of result.value) {
printCliLine(line);
}
return 0;
}
async function dispatchThread(storageRoot: string, argv: string[]): Promise<number> {
const id = argv[0];
if (id === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: thread requires <id>`);
return 1;
}
const result = await cmdThreadShow(storageRoot, id);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(result.value);
return 0;
}
async function dispatchThreadRm(storageRoot: string, argv: string[]): Promise<number> {
const id = argv[0];
if (id === undefined || argv.length > 1) {
printCliError(`${usage()}\n\nerror: thread rm requires <id>`);
return 1;
}
const result = await cmdThreadRemove(storageRoot, id);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`removed thread ${id}`);
return 0;
}
export async function runCli(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length === 0) {
printCliError(usage());
return 1;
}
const command = argv[0];
if (command === undefined) {
printCliError(usage());
return 1;
}
const rest = argv.slice(1);
if (command === "add") {
return dispatchAdd(storageRoot, rest);
}
if (command === "list") {
return dispatchList(storageRoot, rest);
}
if (command === "show") {
return dispatchShow(storageRoot, rest);
}
if (command === "remove") {
return dispatchRemove(storageRoot, rest);
}
if (command === "run") {
return dispatchRun(storageRoot, rest);
}
if (command === "ps") {
return dispatchPs(storageRoot, rest);
}
if (command === "kill") {
return dispatchKill(storageRoot, rest);
}
if (command === "threads") {
return dispatchThreads(storageRoot, rest);
}
if (command === "thread") {
const sub = rest[0];
if (sub === "rm") {
return dispatchThreadRm(storageRoot, rest.slice(1));
}
return dispatchThread(storageRoot, rest);
}
printCliError(`${usage()}\n\nerror: unknown command ${command}`);
return 1;
}
+9
View File
@@ -0,0 +1,9 @@
export function printCliLine(line: string): void {
// biome-ignore lint/suspicious/noConsole: CLI user-facing output
console.log(line);
}
export function printCliError(line: string): void {
// biome-ignore lint/suspicious/noConsole: CLI user-facing errors
console.error(line);
}
+8 -2
View File
@@ -1,3 +1,9 @@
#!/usr/bin/env bun
// @uncaged/cli-workflow - uncaged-workflow CLI
console.log('uncaged-workflow');
import { runCli } from "./cli-dispatch.js";
import { resolveWorkflowStorageRoot } from "./storage-env.js";
const argv = process.argv.slice(2);
const storageRoot = resolveWorkflowStorageRoot();
const code = await runCli(storageRoot, argv);
process.exit(code);
+77
View File
@@ -0,0 +1,77 @@
import { readFile, stat } from "node:fs/promises";
import { basename, resolve } from "node:path";
import {
err,
hashWorkflowBundleBytes,
ok,
type Result,
readWorkflowRegistry,
registerWorkflowVersion,
validateWorkflowBundle,
writeWorkflowRegistry,
} from "@uncaged/workflow";
import { storeWorkflowBundleCopy } from "./bundle-store.js";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdAdd(
storageRoot: string,
name: string,
filePath: string,
): Promise<Result<{ hash: string }, string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
let resolvedPath: string;
try {
resolvedPath = resolve(filePath);
await stat(resolvedPath);
} catch {
return err(`bundle file not found: ${filePath}`);
}
let source: string;
try {
source = await readFile(resolvedPath, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to read bundle: ${message}`);
}
const validated = validateWorkflowBundle({
filePath: resolvedPath,
source,
});
if (!validated.ok) {
return validated;
}
const encoder = new TextEncoder();
const bytes = encoder.encode(source);
const hash = hashWorkflowBundleBytes(bytes);
const stored = await storeWorkflowBundleCopy(storageRoot, hash, resolvedPath, source);
if (!stored.ok) {
return stored;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const next = registerWorkflowVersion(reg.value, name, hash, Date.now());
const written = await writeWorkflowRegistry(storageRoot, next);
if (!written.ok) {
return err(written.error.message);
}
return ok({ hash });
}
export function formatAddSuccess(name: string, filePath: string, hash: string): string {
return `registered workflow "${name}" from ${basename(filePath)} as ${hash}`;
}
+39
View File
@@ -0,0 +1,39 @@
import { join } from "node:path";
import { err, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import {
resolveRunningHashForThread,
sendWorkerTcpCommand,
type WorkerCtl,
} from "./worker-spawn.js";
export async function cmdKill(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const hashResult = await resolveRunningHashForThread(storageRoot, threadId);
if (!hashResult.ok) {
return hashResult;
}
const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`);
const ctlText = await readTextFileIfExists(ctlPath);
if (ctlText === null) {
return err(`worker control file missing for bundle hash ${hashResult.value}`);
}
let ctl: WorkerCtl;
try {
ctl = JSON.parse(ctlText) as WorkerCtl;
} catch {
return err(`corrupt worker control file: ${ctlPath}`);
}
if (typeof ctl.port !== "number" || ctl.port <= 0) {
return err(`invalid worker control file: ${ctlPath}`);
}
return await sendWorkerTcpCommand(ctl.port, { type: "kill", threadId });
}
+32
View File
@@ -0,0 +1,32 @@
import {
err,
listRegisteredWorkflowNames,
ok,
type Result,
readWorkflowRegistry,
type WorkflowRegistryFile,
} from "@uncaged/workflow";
export async function cmdList(storageRoot: string): Promise<Result<WorkflowRegistryFile, string>> {
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
return ok(reg.value);
}
export function formatListLines(registry: WorkflowRegistryFile): string[] {
const names = listRegisteredWorkflowNames(registry);
if (names.length === 0) {
return ["(no workflows registered)"];
}
const lines: string[] = [];
for (const name of names) {
const entry = registry.workflows[name];
if (entry === undefined) {
continue;
}
lines.push(`${name}\t${entry.hash}\t${entry.timestamp}`);
}
return lines;
}
+9
View File
@@ -0,0 +1,9 @@
import { listRunningThreads } from "./thread-scan.js";
export async function cmdPs(storageRoot: string): Promise<string[]> {
const rows = await listRunningThreads(storageRoot);
if (rows.length === 0) {
return ["(no running threads)"];
}
return rows.map((r) => `${r.threadId}\t${r.hash}\t${r.workflowName ?? "(unknown)"}`);
}
+34
View File
@@ -0,0 +1,34 @@
import {
err,
ok,
type Result,
readWorkflowRegistry,
unregisterWorkflow,
writeWorkflowRegistry,
} from "@uncaged/workflow";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdRemove(storageRoot: string, name: string): Promise<Result<void, string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const next = unregisterWorkflow(reg.value, name);
if (!next.ok) {
return err(next.error.message);
}
const written = await writeWorkflowRegistry(storageRoot, next.value);
if (!written.ok) {
return err(written.error.message);
}
return ok(undefined);
}
+54
View File
@@ -0,0 +1,54 @@
import { join } from "node:path";
import {
err,
generateUlid,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
} from "@uncaged/workflow";
import { ensureWorkerForHash, sendWorkerTcpCommand } from "./worker-spawn.js";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdRun(
storageRoot: string,
name: string,
prompt: string,
isDryRun: boolean,
maxRounds: number,
): Promise<Result<{ threadId: string }, string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const entry = getRegisteredWorkflow(reg.value, name);
if (entry === null) {
return err(`workflow not registered: ${name}`);
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const worker = await ensureWorkerForHash(storageRoot, entry.hash, bundlePath);
if (!worker.ok) {
return worker;
}
const threadId = generateUlid(Date.now());
const sent = await sendWorkerTcpCommand(worker.value.port, {
type: "run",
threadId,
prompt,
options: { isDryRun, maxRounds },
});
if (!sent.ok) {
return sent;
}
return ok({ threadId });
}
+43
View File
@@ -0,0 +1,43 @@
import {
err,
getRegisteredWorkflow,
ok,
type Result,
readWorkflowRegistry,
type WorkflowRegistryEntry,
} from "@uncaged/workflow";
import { stringify } from "yaml";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdShow(
storageRoot: string,
name: string,
): Promise<Result<WorkflowRegistryEntry, string>> {
const nameOk = validateCliWorkflowName(name);
if (!nameOk.ok) {
return nameOk;
}
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
return err(reg.error.message);
}
const entry = getRegisteredWorkflow(reg.value, name);
if (entry === null) {
return err(`workflow not found: ${name}`);
}
return ok(entry);
}
export function formatShowYaml(name: string, entry: WorkflowRegistryEntry): string {
const payload = {
[name]: {
hash: entry.hash,
timestamp: entry.timestamp,
history: entry.history,
},
};
return stringify(payload, { indent: 2, defaultStringType: "QUOTE_DOUBLE" });
}
+42
View File
@@ -0,0 +1,42 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import { resolveThreadDataPath } from "./thread-scan.js";
export async function cmdThreadShow(
storageRoot: string,
threadId: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return err(`thread data missing: ${threadId}`);
}
return ok(text.endsWith("\n") ? text.slice(0, -1) : text);
}
export async function cmdThreadRemove(
storageRoot: string,
threadId: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
return ok(undefined);
}
+31
View File
@@ -0,0 +1,31 @@
import { err, ok, type Result } from "@uncaged/workflow";
import { listHistoricalThreads } from "./thread-scan.js";
import { validateCliWorkflowName } from "./workflow-name.js";
export async function cmdThreads(
storageRoot: string,
argv: string[],
): Promise<Result<string[], string>> {
const nameFilter = argv[0];
if (argv.length > 1) {
return err("threads expects at most one workflow name argument");
}
let workflowNameFilter: string | null = null;
if (nameFilter !== undefined) {
const nameOk = validateCliWorkflowName(nameFilter);
if (!nameOk.ok) {
return nameOk;
}
workflowNameFilter = nameFilter;
}
const rows = await listHistoricalThreads(storageRoot, workflowNameFilter);
if (rows.length === 0) {
return ok(["(no threads found)"]);
}
const lines = rows.map((r) => `${r.threadId}\t${r.hash}\t${r.workflowName ?? "(unknown)"}`);
return ok(lines);
}
+22
View File
@@ -0,0 +1,22 @@
import { readFile, stat } from "node:fs/promises";
export async function readTextFileIfExists(path: string): Promise<string | null> {
try {
return await readFile(path, "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return null;
}
throw e;
}
}
export async function pathExists(path: string): Promise<boolean> {
try {
await stat(path);
return true;
} catch {
return false;
}
}
+84
View File
@@ -0,0 +1,84 @@
import { err, ok, type Result } from "@uncaged/workflow";
export type ParsedRunArgv = {
name: string;
prompt: string;
dryRun: boolean;
maxRounds: number;
};
type FlagOk =
| { kind: "dry-run" }
| { kind: "prompt"; value: string }
| { kind: "max-rounds"; value: number };
function parseFlagAt(argv: string[], index: number): Result<FlagOk, string> | null {
const flag = argv[index];
if (flag === "--dry-run") {
return ok({ kind: "dry-run" });
}
if (flag === "--prompt") {
const value = argv[index + 1];
if (value === undefined) {
return err("missing value for --prompt");
}
return ok({ kind: "prompt", value });
}
if (flag === "--max-rounds") {
const value = argv[index + 1];
if (value === undefined) {
return err("missing value for --max-rounds");
}
const n = Number(value);
if (!Number.isFinite(n) || !Number.isInteger(n) || n < 0) {
return err("--max-rounds must be a non-negative integer");
}
return ok({ kind: "max-rounds", value: n });
}
return null;
}
export function parseRunArgv(argv: string[]): Result<ParsedRunArgv, string> {
let name: string | undefined;
let prompt = "";
let dryRun = false;
let maxRounds = 5;
let i = 0;
const first = argv[0];
if (first !== undefined && !first.startsWith("--")) {
name = first;
i = 1;
}
while (i < argv.length) {
const parsed = parseFlagAt(argv, i);
if (parsed === null) {
const unknown = argv[i];
return err(`unknown run flag: ${unknown}`);
}
if (!parsed.ok) {
return parsed;
}
const flag = parsed.value;
if (flag.kind === "dry-run") {
dryRun = true;
i += 1;
continue;
}
if (flag.kind === "prompt") {
prompt = flag.value;
i += 2;
continue;
}
maxRounds = flag.value;
i += 2;
}
if (name === undefined || name === "") {
return err("run requires <name>");
}
return ok({ name, prompt, dryRun, maxRounds });
}
+10
View File
@@ -0,0 +1,10 @@
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow";
/** Resolve storage root, honoring `UNCAGED_WORKFLOW_STORAGE_ROOT` for tests/tools. */
export function resolveWorkflowStorageRoot(): string {
const override = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
if (override !== undefined && override !== "") {
return override;
}
return getDefaultWorkflowStorageRoot();
}
+143
View File
@@ -0,0 +1,143 @@
import { readdir } from "node:fs/promises";
import { join } from "node:path";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
export type RunningThreadRow = {
threadId: string;
hash: string;
workflowName: string | null;
};
export type HistoricalThreadRow = {
threadId: string;
hash: string;
workflowName: string | null;
};
async function readWorkflowNameFromDataJsonl(dataPath: string): Promise<string | null> {
const text = await readTextFileIfExists(dataPath);
if (text === null) {
return null;
}
const firstLine = text.split("\n")[0];
if (firstLine === undefined || firstLine.trim() === "") {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(firstLine) as unknown;
} catch {
return null;
}
if (parsed === null || typeof parsed !== "object") {
return null;
}
const name = (parsed as Record<string, unknown>).name;
return typeof name === "string" ? name : null;
}
/** Threads currently executing — identified via `<threadId>.running` markers. */
export async function listRunningThreads(storageRoot: string): Promise<RunningThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const out: RunningThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".running")) {
continue;
}
const threadId = fileName.slice(0, -".running".length);
const dataPath = join(dir, `${threadId}.data.jsonl`);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
out.push({ threadId, hash, workflowName });
}
}
out.sort((a, b) => {
const ha = `${a.hash}/${a.threadId}`;
const hb = `${b.hash}/${b.threadId}`;
return ha.localeCompare(hb);
});
return out;
}
/**
* Historical threads discovered via `*.data.jsonl`.
* When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned.
*/
export async function listHistoricalThreads(
storageRoot: string,
workflowNameFilter: string | null,
): Promise<HistoricalThreadRow[]> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return [];
}
const hashes = await readdir(logsRoot);
const out: HistoricalThreadRow[] = [];
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (!fileName.endsWith(".data.jsonl")) {
continue;
}
const threadId = fileName.slice(0, -".data.jsonl".length);
const dataPath = join(dir, fileName);
const workflowName = await readWorkflowNameFromDataJsonl(dataPath);
if (workflowNameFilter !== null && workflowName !== workflowNameFilter) {
continue;
}
out.push({ threadId, hash, workflowName });
}
}
out.sort((a, b) => {
const ha = `${a.hash}/${a.threadId}`;
const hb = `${b.hash}/${b.threadId}`;
return ha.localeCompare(hb);
});
return out;
}
export async function resolveThreadDataPath(
storageRoot: string,
threadId: string,
): Promise<string | null> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return null;
}
const hashes = await readdir(logsRoot);
for (const hash of hashes) {
const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`);
if (await pathExists(candidate)) {
return candidate;
}
}
return null;
}
+190
View File
@@ -0,0 +1,190 @@
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import { mkdir, readdir, unlink, writeFile } from "node:fs/promises";
import { createConnection } from "node:net";
import { join } from "node:path";
import { err, getWorkerHostScriptPath, ok, type Result } from "@uncaged/workflow";
import { pathExists, readTextFileIfExists } from "./fs-utils.js";
export type WorkerCtl = {
pid: number;
port: number;
};
function isProcessAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
async function waitForReadyLine(
childStdout: NodeJS.ReadableStream,
child: ChildProcessWithoutNullStreams,
): Promise<Result<number, string>> {
return await new Promise((resolve) => {
let buf = "";
let settled = false;
function finish(result: Result<number, string>): void {
if (settled) {
return;
}
settled = true;
cleanup();
resolve(result);
}
function onData(chunk: Buffer | string): void {
buf += typeof chunk === "string" ? chunk : chunk.toString("utf8");
const nl = buf.indexOf("\n");
if (nl < 0) {
return;
}
const line = buf.slice(0, nl).trim();
const prefix = "READY ";
if (!line.startsWith(prefix)) {
finish(err(`worker did not emit READY line (got: ${line})`));
return;
}
const portText = line.slice(prefix.length);
const port = Number(portText);
if (!Number.isFinite(port) || port <= 0) {
finish(err(`worker READY line had invalid port: ${portText}`));
return;
}
finish(ok(port));
}
function onEnd(): void {
finish(err("worker stdout ended before READY line"));
}
function onExit(code: number | null): void {
finish(err(`worker exited before READY line (code ${code})`));
}
function cleanup(): void {
childStdout.off("data", onData);
childStdout.off("end", onEnd);
child.off("exit", onExit);
}
childStdout.on("data", onData);
childStdout.on("end", onEnd);
child.on("exit", onExit);
});
}
async function spawnWorkerProcess(
bundlePath: string,
storageRoot: string,
hash: string,
): Promise<Result<{ pid: number; port: number }, string>> {
const scriptPath = getWorkerHostScriptPath();
const child = spawn(process.execPath, [scriptPath, bundlePath, storageRoot, hash], {
stdio: ["ignore", "pipe", "inherit"],
});
if (child.stdout === null || child.pid === undefined) {
return err("failed to spawn worker process");
}
const pid = child.pid;
const ready = await waitForReadyLine(child.stdout, child);
if (!ready.ok) {
child.kill();
return ready;
}
child.unref();
child.stdout.destroy();
return ok({ pid, port: ready.value });
}
export async function ensureWorkerForHash(
storageRoot: string,
hash: string,
bundlePath: string,
): Promise<Result<{ port: number }, string>> {
const ctlPath = join(storageRoot, "workers", `${hash}.json`);
const existingText = await readTextFileIfExists(ctlPath);
if (existingText !== null) {
try {
const ctl = JSON.parse(existingText) as WorkerCtl;
if (
typeof ctl.pid === "number" &&
typeof ctl.port === "number" &&
ctl.pid > 0 &&
ctl.port > 0 &&
isProcessAlive(ctl.pid)
) {
return ok({ port: ctl.port });
}
} catch {
// Corrupt control file — ignore and respawn.
}
await unlink(ctlPath).catch(() => {});
}
const spawned = await spawnWorkerProcess(bundlePath, storageRoot, hash);
if (!spawned.ok) {
return spawned;
}
await mkdir(join(storageRoot, "workers"), { recursive: true });
const ctl: WorkerCtl = { pid: spawned.value.pid, port: spawned.value.port };
await writeFile(ctlPath, `${JSON.stringify(ctl)}\n`, "utf8");
return ok({ port: spawned.value.port });
}
export async function sendWorkerTcpCommand(
port: number,
payload: unknown,
): Promise<Result<void, string>> {
return await new Promise((resolve) => {
let settled = false;
const socket = createConnection({ host: "127.0.0.1", port }, () => {
socket.write(`${JSON.stringify(payload)}\n`);
socket.end();
});
socket.on("error", (e) => {
if (settled) {
return;
}
settled = true;
const message = e instanceof Error ? e.message : String(e);
resolve(err(`failed to send worker command: ${message}`));
});
socket.on("close", () => {
if (settled) {
return;
}
settled = true;
resolve(ok(undefined));
});
});
}
export async function resolveRunningHashForThread(
storageRoot: string,
threadId: string,
): Promise<Result<string, string>> {
const logsRoot = join(storageRoot, "logs");
if (!(await pathExists(logsRoot))) {
return err(`thread not running (no logs dir): ${threadId}`);
}
const hashes = await readdir(logsRoot);
for (const hash of hashes) {
const runningPath = join(logsRoot, hash, `${threadId}.running`);
if (await pathExists(runningPath)) {
return ok(hash);
}
}
return err(`thread not running: ${threadId}`);
}
@@ -0,0 +1,12 @@
import { err, ok, type Result } from "@uncaged/workflow";
const WORKFLOW_NAME_RE = /^[a-z][a-z0-9]*(-[a-z0-9]+)*$/;
export function validateCliWorkflowName(name: string): Result<void, string> {
if (!WORKFLOW_NAME_RE.test(name)) {
return err(
'invalid workflow name: use verb-first kebab-case (lowercase letters, digits, hyphens), e.g. "solve-issue"',
);
}
return ok(undefined);
}
@@ -0,0 +1,38 @@
import { describe, expect, test } from "bun:test";
import {
decodeCrockfordBase32Bits,
decodeCrockfordToUint64,
encodeCrockfordBase32Bits,
encodeUint64AsCrockford,
} from "../src/base32.js";
describe("Crockford Base32", () => {
test("roundtrip 64-bit hash encoding", () => {
const value = 0xef46_db37_51d8_e999n;
const encoded = encodeUint64AsCrockford(value);
expect(encoded.length).toBe(13);
const decoded = decodeCrockfordToUint64(encoded);
expect(decoded.ok).toBe(true);
if (decoded.ok) {
expect(decoded.value).toBe(value);
}
});
test("roundtrip arbitrary bit widths used by ULID (128-bit)", () => {
const rand = 0x1234567890abcdef12n & ((1n << 80n) - 1n);
const payload = (12345n << 80n) | rand;
const encoded = encodeCrockfordBase32Bits(payload, 128);
expect(encoded.length).toBe(26);
const decoded = decodeCrockfordBase32Bits(encoded, 128);
expect(decoded.ok).toBe(true);
if (decoded.ok) {
expect(decoded.value).toBe(payload);
}
});
test("reject invalid characters", () => {
const decoded = decodeCrockfordToUint64("!!!!!!!!!!!!!");
expect(decoded.ok).toBe(false);
});
});
@@ -0,0 +1,66 @@
import { describe, expect, test } from "bun:test";
import { validateWorkflowBundle } from "../src/bundle-validator.js";
describe("validateWorkflowBundle", () => {
test("accepts minimal valid builtin-only bundle", () => {
const source = `import fs from "node:fs";
export default async function run() {
fs.existsSync(".");
return { returnCode: 0, summary: "ok" };
}
`;
const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source });
expect(r.ok).toBe(true);
});
test("rejects wrong filename suffix", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.js",
source: "export default async function run() { return { returnCode: 0, summary: '' }; }\n",
});
expect(r.ok).toBe(false);
});
test("rejects missing default export", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source: "export const x = 1;\n",
});
expect(r.ok).toBe(false);
if (!r.ok) {
expect(r.error).toContain("default export");
}
});
test("rejects non-builtin imports", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'import x from "some-package";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
});
test("rejects dynamic import", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'export default async function run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
if (!r.ok) {
expect(r.error).toContain("dynamic import");
}
});
test("rejects require()", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.esm.js",
source:
'export default async function run() { require("fs"); return { returnCode: 0, summary: "" }; }\n',
});
expect(r.ok).toBe(false);
});
});
+137
View File
@@ -0,0 +1,137 @@
import { describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { executeThread } from "../src/engine.js";
import { createLogger } from "../src/logger.js";
import { END, type WorkflowDefinition } from "../src/types.js";
type DemoMeta = {
planner: Record<string, unknown>;
coder: Record<string, unknown>;
};
const demoWorkflow: WorkflowDefinition<DemoMeta> = {
name: "demo-flow",
roles: {
planner: async () => ({
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
}),
coder: async () => ({
content: "code-body",
meta: { diff: "+ok" },
}),
},
moderator: (ctx) => {
if (ctx.steps.length === 0) {
return "planner";
}
if (ctx.steps.length === 1) {
return "coder";
}
return END;
},
};
describe("executeThread", () => {
test("writes RFC-001 `.data.jsonl` start + role records and `.info.jsonl` logs", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-engine-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
demoWorkflow,
"Fix the login redirect bug in #3",
{ isDryRun: false, maxRounds: 5, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
const start = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(start.name).toBe("demo-flow");
expect(start.hash).toBe(hash);
expect(start.threadId).toBe(threadId);
expect(typeof start.timestamp).toBe("number");
const params = start.parameters as Record<string, unknown>;
expect(params.prompt).toBe("Fix the login redirect bug in #3");
const opts = params.options as Record<string, unknown>;
expect(opts.isDryRun).toBe(false);
expect(opts.maxRounds).toBe(5);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.content).toBe("plan-body");
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
expect(typeof role1.timestamp).toBe("number");
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role2.role).toBe("coder");
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(infoLines.length).toBeGreaterThan(0);
const log0 = JSON.parse(infoLines[0] ?? "{}") as Record<string, unknown>;
expect(typeof log0.tag).toBe("string");
expect(String(log0.tag).length).toBe(8);
expect(typeof log0.content).toBe("string");
expect(typeof log0.timestamp).toBe("number");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("respects maxRounds=0 (start record only)", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-engine-max0-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
demoWorkflow,
"hello",
{ isDryRun: false, maxRounds: 0, signal: ac.signal },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
+24
View File
@@ -0,0 +1,24 @@
import { describe, expect, test } from "bun:test";
import { decodeCrockfordToUint64 } from "../src/base32.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
describe("hashWorkflowBundleBytes", () => {
test("matches XXH64 reference for empty input", () => {
const encoder = new TextEncoder();
const digest = hashWorkflowBundleBytes(encoder.encode(""));
const decoded = decodeCrockfordToUint64(digest);
expect(decoded.ok).toBe(true);
if (decoded.ok) {
expect(decoded.value).toBe(0xef46_db37_51d8_e999n);
}
});
test("stable for identical content", () => {
const encoder = new TextEncoder();
const data = encoder.encode(
"export default async function run() { return { returnCode: 0, summary: '' }; }\n",
);
expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data));
});
});
@@ -0,0 +1,31 @@
import { describe, expect, test } from "bun:test";
import { mkdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createLogger } from "../src/logger.js";
describe("createLogger", () => {
test("writes JSONL records to a file sink", async () => {
const dir = join(tmpdir(), `wf-log-${process.pid}-${Date.now()}`);
await mkdir(dir, { recursive: true });
const logPath = join(dir, "test.log");
const log = createLogger({ sink: { kind: "file", path: logPath } });
log("01ABCDEF", "hello");
const text = await readFile(logPath, "utf8");
const line = text.trim().split("\n")[0];
expect(line).toBeDefined();
const obj = JSON.parse(line ?? "{}") as { tag: string; content: string; timestamp: number };
expect(obj.tag).toBe("01ABCDEF");
expect(obj.content).toBe("hello");
expect(typeof obj.timestamp).toBe("number");
await rm(dir, { recursive: true, force: true });
});
test("rejects invalid tags", () => {
const log = createLogger({ sink: { kind: "stderr" } });
expect(() => log("BAD", "x")).toThrow();
expect(() => log("01abcdefg", "x")).toThrow();
expect(() => log("01ABCDEO", "x")).toThrow();
});
});
@@ -0,0 +1,77 @@
import { describe, expect, test } from "bun:test";
import { mkdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
readWorkflowRegistry,
registerWorkflowVersion,
unregisterWorkflow,
writeWorkflowRegistry,
} from "../src/registry.js";
describe("workflow registry", () => {
test("roundtrips through workflow.yaml", async () => {
const dir = join(tmpdir(), `wf-reg-${process.pid}-${Date.now()}`);
await mkdir(dir, { recursive: true });
const empty = await readWorkflowRegistry(dir);
expect(empty.ok).toBe(true);
if (!empty.ok) {
return;
}
const r1 = registerWorkflowVersion(empty.value, "solve-issue", "AAAAAAAAAAAAA", 100);
const w1 = await writeWorkflowRegistry(dir, r1);
expect(w1.ok).toBe(true);
const back = await readWorkflowRegistry(dir);
expect(back.ok).toBe(true);
if (!back.ok) {
await rm(dir, { recursive: true, force: true });
return;
}
expect(back.value.workflows["solve-issue"]?.hash).toBe("AAAAAAAAAAAAA");
const r2 = registerWorkflowVersion(back.value, "solve-issue", "BBBBBBBBBBBBB", 200);
expect(r2.workflows["solve-issue"]?.history[0]?.hash).toBe("AAAAAAAAAAAAA");
const removed = unregisterWorkflow(r2, "solve-issue");
expect(removed.ok).toBe(true);
if (!removed.ok) {
await rm(dir, { recursive: true, force: true });
return;
}
const w2 = await writeWorkflowRegistry(dir, removed.value);
expect(w2.ok).toBe(true);
const finalRead = await readWorkflowRegistry(dir);
expect(finalRead.ok).toBe(true);
if (finalRead.ok) {
expect(finalRead.value.workflows["solve-issue"]).toBeUndefined();
}
await rm(dir, { recursive: true, force: true });
});
test("treats missing registry as empty", async () => {
const dir = join(tmpdir(), `wf-reg2-${process.pid}-${Date.now()}`);
await mkdir(dir, { recursive: true });
const empty = await readWorkflowRegistry(dir);
expect(empty.ok).toBe(true);
if (empty.ok) {
expect(Object.keys(empty.value.workflows).length).toBe(0);
}
await rm(dir, { recursive: true, force: true });
});
test("parse errors on invalid shape", async () => {
const dir = join(tmpdir(), `wf-reg3-${process.pid}-${Date.now()}`);
await mkdir(dir, { recursive: true });
await writeFile(join(dir, "workflow.yaml"), 'workflows: "broken"\n', "utf8");
const bad = await readWorkflowRegistry(dir);
expect(bad.ok).toBe(false);
await rm(dir, { recursive: true, force: true });
});
});
@@ -0,0 +1,21 @@
import { describe, expect, test } from "bun:test";
import { err, ok } from "../src/result.js";
describe("result helpers", () => {
test("ok wraps value", () => {
const r = ok(42);
expect(r.ok).toBe(true);
if (r.ok) {
expect(r.value).toBe(42);
}
});
test("err wraps error", () => {
const r = err("nope");
expect(r.ok).toBe(false);
if (!r.ok) {
expect(r.error).toBe("nope");
}
});
});
@@ -0,0 +1,41 @@
import { describe, expect, test } from "bun:test";
describe("RFC-001 thread JSONL shapes", () => {
test("documents the `.data.jsonl` start record + role record keys", () => {
const startRecord = {
name: "solve-issue",
hash: "C9NMV6V2TQT81",
threadId: "01KQXKW18CT8G75T53R8F4G7YG",
parameters: {
prompt: "Fix the login redirect bug in #3",
options: {
isDryRun: false,
maxRounds: 5,
},
},
timestamp: 1714963200000,
};
const roleRecord = {
role: "planner",
content: "Plan: modify auth middleware...",
meta: { plan: "...", files: ["src/auth.ts"] },
timestamp: 1714963201000,
};
expect(Object.keys(startRecord).sort()).toEqual(
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
);
expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort());
});
test("documents the `.info.jsonl` debug record keys", () => {
const infoRecord = {
tag: "4KNMR2PX",
content: "Loading workflow bundle...",
timestamp: 1714963200500,
};
expect(Object.keys(infoRecord).sort()).toEqual(["content", "tag", "timestamp"].sort());
});
});
+29
View File
@@ -0,0 +1,29 @@
import { describe, expect, test } from "bun:test";
import { decodeCrockfordBase32Bits } from "../src/base32.js";
import { generateUlid } from "../src/ulid.js";
describe("generateUlid", () => {
test("length and decodable Crockford payload", () => {
const id = generateUlid(1_714_963_200_000);
expect(id.length).toBe(26);
const decoded = decodeCrockfordBase32Bits(id, 128);
expect(decoded.ok).toBe(true);
});
test("embeds 48-bit timestamp at the MSB of the 128-bit payload", () => {
const ts = 9_999_888_777_666;
const id = generateUlid(ts);
const decoded = decodeCrockfordBase32Bits(id, 128);
expect(decoded.ok).toBe(true);
if (decoded.ok) {
const recoveredMs = decoded.value >> 80n;
expect(Number(recoveredMs)).toBe(ts);
}
});
test("rejects out-of-range timestamps", () => {
expect(() => generateUlid(-1)).toThrow();
expect(() => generateUlid(2 ** 48)).toThrow();
});
});
+120
View File
@@ -0,0 +1,120 @@
import { describe, expect, test } from "bun:test";
import { spawn } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { createConnection } from "node:net";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getWorkerHostScriptPath } from "../src/worker-entry-path.js";
const bundleSource = `export default {
name: "demo-flow",
roles: {
planner: async () => ({ content: "p", meta: { plan: "x" } }),
coder: async () => ({ content: "c", meta: { diff: "y" } }),
},
moderator(ctx) {
if (ctx.steps.length === 0) return "planner";
if (ctx.steps.length === 1) return "coder";
return "__end__";
},
};
`;
async function readReadyPort(child: import("node:child_process").ChildProcess): Promise<number> {
return await new Promise((resolve, reject) => {
if (child.stdout === null) {
reject(new Error("missing stdout"));
return;
}
let buf = "";
function cleanup(): void {
child.stdout?.off("data", onData);
child.off("exit", onExit);
}
function onData(chunk: Buffer): void {
buf += chunk.toString("utf8");
const nl = buf.indexOf("\n");
if (nl < 0) {
return;
}
cleanup();
const line = buf.slice(0, nl).trim();
const prefix = "READY ";
if (!line.startsWith(prefix)) {
reject(new Error(`unexpected READY line: ${line}`));
return;
}
resolve(Number(line.slice(prefix.length)));
}
function onExit(code: number | null): void {
cleanup();
reject(new Error(`worker exited before READY (code ${code})`));
}
child.stdout.on("data", onData);
child.on("exit", onExit);
});
}
async function sendJson(port: number, payload: unknown): Promise<void> {
await new Promise<void>((resolve, reject) => {
const socket = createConnection({ host: "127.0.0.1", port }, () => {
socket.write(`${JSON.stringify(payload)}\n`);
socket.end();
});
socket.on("error", reject);
socket.on("close", () => resolve());
});
}
describe("worker process", () => {
test("loads bundle, runs a thread over TCP, then exits when idle", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-worker-"));
try {
const hash = "C9NMV6V2TQT81";
await mkdir(join(root, "bundles"), { recursive: true });
const bundlePath = join(root, "bundles", `${hash}.esm.js`);
await writeFile(bundlePath, bundleSource, "utf8");
const scriptPath = getWorkerHostScriptPath();
const child = spawn(process.execPath, [scriptPath, bundlePath, root, hash], {
stdio: ["ignore", "pipe", "inherit"],
});
if (child.stdout === null) {
throw new Error("missing stdout");
}
const port = await readReadyPort(child);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
await sendJson(port, {
type: "run",
threadId,
prompt: "hello",
options: { isDryRun: false, maxRounds: 5 },
});
const exitCode: number = await new Promise((resolve) => {
child.on("exit", (code) => resolve(code ?? 1));
});
expect(exitCode).toBe(0);
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const text = await readFile(dataPath, "utf8");
expect(
text
.trim()
.split("\n")
.filter((l) => l !== "").length,
).toBe(3);
} finally {
await rm(root, { recursive: true, force: true });
}
}, 15_000);
});
+8 -5
View File
@@ -1,7 +1,7 @@
import { err, ok, type Result } from "./result.js";
/** Crockford Base32 alphabet (no I, L, O, U). */
export const CROCKFORD_BASE32_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTVWXZ";
/** Crockford Base32 alphabet (no I, L, O, U) — exactly 32 symbols. */
export const CROCKFORD_BASE32_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTVWXYZ";
const DECODE_MAP: Record<string, number> = (() => {
const map: Record<string, number> = {};
@@ -31,13 +31,16 @@ export function encodeCrockfordBase32Bits(value: bigint, bitLength: number): str
let result = "";
for (let i = 0; i < charCount; i++) {
const shift = totalBits - 5 * (i + 1);
const quintet = Number((shifted >> BigInt(shift)) & 0x1fn);
const quintet = Number((shifted >> BigInt(shift)) & 31n);
result += CROCKFORD_BASE32_ALPHABET[quintet];
}
return result;
}
export function decodeCrockfordBase32Bits(encoded: string, bitLength: number): Result<bigint, Error> {
export function decodeCrockfordBase32Bits(
encoded: string,
bitLength: number,
): Result<bigint, Error> {
if (bitLength <= 0) {
return err(new Error("bitLength must be positive"));
}
@@ -57,7 +60,7 @@ export function decodeCrockfordBase32Bits(encoded: string, bitLength: number): R
if (val === undefined) {
return err(new Error(`invalid Crockford Base32 character: ${ch}`));
}
shifted = (shifted << 5n) | BigInt(val);
shifted = (shifted << 5n) | BigInt(val & 31);
}
return ok(shifted >> BigInt(padBits));
}
+123 -63
View File
@@ -1,7 +1,13 @@
import { isBuiltin } from "node:module";
import type {
CallExpression,
ExportAllDeclaration,
ExportNamedDeclaration,
ImportDeclaration,
Node,
Program,
} from "acorn";
import * as acorn from "acorn";
import type { Node, Program } from "acorn";
import { err, ok, type Result } from "./result.js";
@@ -26,22 +32,36 @@ function isAllowedImportSpecifier(spec: string): boolean {
return isBuiltin(spec);
}
function walk(node: Node, visit: (n: Node) => void): void {
visit(node);
function pushNestedAstNodes(value: unknown, out: Node[]): void {
if (value === null || value === undefined) {
return;
}
if (Array.isArray(value)) {
for (const item of value) {
if (item !== null && typeof item === "object" && "type" in item) {
out.push(item as Node);
}
}
return;
}
if (typeof value === "object" && "type" in value) {
out.push(value as Node);
}
}
function collectChildNodes(node: Node): Node[] {
const children: Node[] = [];
for (const key of Object.keys(node)) {
const val = (node as Record<string, unknown>)[key];
if (val === null || val === undefined) {
continue;
}
if (Array.isArray(val)) {
for (const item of val) {
if (item !== null && typeof item === "object" && "type" in item) {
walk(item as Node, visit);
}
}
} else if (typeof val === "object" && "type" in val) {
walk(val as Node, visit);
}
pushNestedAstNodes(val, children);
}
return children;
}
function walkAst(node: Node, visit: (n: Node) => void): void {
visit(node);
for (const child of collectChildNodes(node)) {
walkAst(child, visit);
}
}
@@ -54,6 +74,85 @@ function programHasDefaultExport(body: readonly Node[]): boolean {
return false;
}
function stringLiteralModuleSpecifier(src: Node): string | null {
if (src.type !== "Literal" || typeof src.value !== "string") {
return null;
}
return src.value;
}
function validateImportDeclaration(node: ImportDeclaration): string | null {
const spec = stringLiteralModuleSpecifier(node.source);
if (spec === null) {
return "only static string import specifiers are allowed";
}
if (!isAllowedImportSpecifier(spec)) {
return `disallowed import specifier "${spec}" (only Node built-ins are allowed)`;
}
return null;
}
function validateExportSource(
src: Node,
staticMessage: string,
disallowedPrefix: string,
): string | null {
const spec = stringLiteralModuleSpecifier(src);
if (spec === null) {
return staticMessage;
}
if (!isAllowedImportSpecifier(spec)) {
return `${disallowedPrefix} "${spec}" (only Node built-ins are allowed)`;
}
return null;
}
function validateExportNamedDeclaration(node: ExportNamedDeclaration): string | null {
if (node.source === null || node.source === undefined) {
return null;
}
return validateExportSource(
node.source,
"only static string re-export specifiers are allowed",
"disallowed re-export specifier",
);
}
function validateExportAllDeclaration(node: ExportAllDeclaration): string | null {
return validateExportSource(
node.source,
"only static string export-all specifiers are allowed",
"disallowed export-all specifier",
);
}
function validateRequireCall(node: CallExpression): string | null {
const callee = node.callee;
if (callee.type === "Identifier" && callee.name === "require") {
return "require() is not allowed in workflow bundles";
}
return null;
}
function bundleConstraintViolationForNode(node: Node): string | null {
if (node.type === "ImportExpression") {
return "dynamic import() is not allowed in workflow bundles";
}
if (node.type === "ImportDeclaration") {
return validateImportDeclaration(node);
}
if (node.type === "ExportNamedDeclaration") {
return validateExportNamedDeclaration(node);
}
if (node.type === "ExportAllDeclaration") {
return validateExportAllDeclaration(node);
}
if (node.type === "CallExpression") {
return validateRequireCall(node);
}
return null;
}
/**
* Validate RFC-001 bundle rules: single-file ESM shape, default export,
* no dynamic `import()`, static imports restricted to Node builtins.
@@ -84,58 +183,19 @@ export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Re
return err("workflow bundle must have a default export");
}
let walkError: string | null = null;
walk(ast, (n) => {
if (walkError !== null) {
let violation: string | null = null;
walkAst(ast, (node) => {
if (violation !== null) {
return;
}
if (n.type === "ImportExpression") {
walkError = "dynamic import() is not allowed in workflow bundles";
return;
}
if (n.type === "ImportDeclaration") {
const src = n.source;
if (src.type !== "Literal" || typeof src.value !== "string") {
walkError = "only static string import specifiers are allowed";
return;
}
if (!isAllowedImportSpecifier(src.value)) {
walkError = `disallowed import specifier "${src.value}" (only Node built-ins are allowed)`;
}
return;
}
if (n.type === "ExportNamedDeclaration" && n.source !== null && n.source !== undefined) {
const src = n.source;
if (src.type !== "Literal" || typeof src.value !== "string") {
walkError = "only static string re-export specifiers are allowed";
return;
}
if (!isAllowedImportSpecifier(src.value)) {
walkError = `disallowed re-export specifier "${src.value}" (only Node built-ins are allowed)`;
}
return;
}
if (n.type === "ExportAllDeclaration") {
const src = n.source;
if (src.type !== "Literal" || typeof src.value !== "string") {
walkError = "only static string export-all specifiers are allowed";
return;
}
if (!isAllowedImportSpecifier(src.value)) {
walkError = `disallowed export-all specifier "${src.value}" (only Node built-ins are allowed)`;
}
return;
}
if (n.type === "CallExpression") {
const c = n.callee;
if (c.type === "Identifier" && c.name === "require") {
walkError = "require() is not allowed in workflow bundles";
}
const next = bundleConstraintViolationForNode(node);
if (next !== null) {
violation = next;
}
});
if (walkError !== null) {
return err(walkError);
if (violation !== null) {
return err(violation);
}
return ok(undefined);
+143
View File
@@ -0,0 +1,143 @@
import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { LogFn } from "./logger.js";
import {
END,
type RoleMeta,
type RoleStep,
START,
type ThreadContext,
type WorkflowDefinition,
} from "./types.js";
export type ExecuteThreadIo = {
threadId: string;
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
};
export type ExecuteThreadOptions = {
isDryRun: boolean;
maxRounds: number;
signal: AbortSignal;
};
function isRoleNext<M extends RoleMeta>(
next: (keyof M & string) | typeof END,
): next is keyof M & string {
return next !== END;
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
const line = `${JSON.stringify(record)}\n`;
await appendFile(path, line, "utf8");
}
/**
* Execute a workflow thread: moderator loop, role steps, RFC-001 `.data.jsonl` records,
* debug lines via `logger` to `.info.jsonl`.
*/
export async function executeThread<M extends RoleMeta>(
def: WorkflowDefinition<M>,
prompt: string,
options: ExecuteThreadOptions,
io: ExecuteThreadIo,
logger: LogFn,
): Promise<{ returnCode: number; summary: string }> {
await mkdir(dirname(io.dataJsonlPath), { recursive: true });
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const nowMs = Date.now();
const start: ThreadContext<M>["start"] = {
role: START,
content: prompt,
meta: { maxRounds: options.maxRounds, threadId: io.threadId },
timestamp: nowMs,
};
const startRecord = {
name: def.name,
hash: io.hash,
threadId: io.threadId,
parameters: {
prompt,
options: {
isDryRun: options.isDryRun,
maxRounds: options.maxRounds,
},
},
timestamp: nowMs,
};
await appendDataLine(io.dataJsonlPath, startRecord);
let steps: RoleStep<M>[] = [];
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${def.name}`);
while (true) {
if (options.signal.aborted) {
logger("V8JX4NP2", `thread ${io.threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
}
if (steps.length >= options.maxRounds) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
};
}
const ctx: ThreadContext<M> = {
threadId: io.threadId,
start,
steps,
};
const next = def.moderator(ctx);
if (!isRoleNext(next)) {
logger("M5FZ2K8H", `thread ${io.threadId} moderator returned END`);
return { returnCode: 0, summary: "completed: moderator returned END" };
}
const roleFn = def.roles[next];
if (roleFn === undefined) {
logger("K2P8QX9W", `thread ${io.threadId} unknown role ${next}`);
return { returnCode: 1, summary: `unknown role: ${next}` };
}
if (options.signal.aborted) {
logger("V8JX4NP3", `thread ${io.threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
}
const result = await roleFn(ctx);
const ts = Date.now();
const step: RoleStep<M> = {
role: next,
content: result.content,
meta: result.meta,
timestamp: ts,
} as RoleStep<M>;
await appendDataLine(io.dataJsonlPath, {
role: step.role,
content: step.content,
meta: step.meta,
timestamp: step.timestamp,
});
steps = [...steps, step];
logger("N7BW4YHQ", `thread ${io.threadId} completed role ${next}`);
if (options.signal.aborted) {
logger("V8JX4NP4", `thread ${io.threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
}
}
}
+22 -3
View File
@@ -6,10 +6,15 @@ export {
encodeUint64AsCrockford,
} from "./base32.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export {
type ExecuteThreadIo,
type ExecuteThreadOptions,
executeThread,
} from "./engine.js";
export { hashWorkflowBundleBytes } from "./hash.js";
export {
createLogger,
type CreateLoggerOptions,
createLogger,
type LogFn,
type LoggerSink,
} from "./logger.js";
@@ -21,12 +26,26 @@ export {
registerWorkflowVersion,
stringifyWorkflowRegistryYaml,
unregisterWorkflow,
workflowRegistryPath,
writeWorkflowRegistry,
type WorkflowHistoryEntry,
type WorkflowRegistryEntry,
type WorkflowRegistryFile,
workflowRegistryPath,
writeWorkflowRegistry,
} from "./registry.js";
export { err, ok, type Result } from "./result.js";
export { getDefaultWorkflowStorageRoot } from "./storage-root.js";
export {
type AgentFn,
END,
type Moderator,
type Role,
type RoleMeta,
type RoleResult,
type RoleStep,
START,
type StartStep,
type ThreadContext,
type WorkflowDefinition,
} from "./types.js";
export { generateUlid } from "./ulid.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
+16 -16
View File
@@ -1,7 +1,11 @@
import { appendFileSync } from "node:fs";
import { CROCKFORD_BASE32_ALPHABET } from "./base32.js";
const TAG_LENGTH = 8;
const TAG_CHAR_SET: ReadonlySet<string> = new Set(CROCKFORD_BASE32_ALPHABET.split(""));
function assertValidLogTag(tag: string): void {
if (tag.length !== TAG_LENGTH) {
throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`);
@@ -12,15 +16,13 @@ function assertValidLogTag(tag: string): void {
throw new Error("log tag validation failed");
}
const upper = ch.toUpperCase();
if (!/[0-9A-HJKMNP-TV-Z]/.test(upper)) {
if (!TAG_CHAR_SET.has(upper)) {
throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`);
}
}
}
export type LoggerSink =
| { kind: "stderr" }
| { kind: "file"; path: string };
export type LoggerSink = { kind: "stderr" } | { kind: "file"; path: string };
export type CreateLoggerOptions = {
sink: LoggerSink;
@@ -33,12 +35,11 @@ export function createLogger(options: CreateLoggerOptions): LogFn {
if (options.sink.kind === "stderr") {
return (tag: string, content: string) => {
assertValidLogTag(tag);
const line =
`${JSON.stringify({
tag: tag.toUpperCase(),
content,
timestamp: Date.now(),
})}\n`;
const line = `${JSON.stringify({
tag: tag.toUpperCase(),
content,
timestamp: Date.now(),
})}\n`;
process.stderr.write(line);
};
}
@@ -46,12 +47,11 @@ export function createLogger(options: CreateLoggerOptions): LogFn {
const filePath = options.sink.path;
return (tag: string, content: string) => {
assertValidLogTag(tag);
const line =
`${JSON.stringify({
tag: tag.toUpperCase(),
content,
timestamp: Date.now(),
})}\n`;
const line = `${JSON.stringify({
tag: tag.toUpperCase(),
content,
timestamp: Date.now(),
})}\n`;
appendFileSync(filePath, line, "utf8");
};
}
@@ -0,0 +1,77 @@
import type {
WorkflowHistoryEntry,
WorkflowRegistryEntry,
WorkflowRegistryFile,
} from "./registry-types.js";
import { err, ok, type Result } from "./result.js";
export function normalizeWorkflowHistoryEntry(
workflowName: string,
index: number,
raw: unknown,
): Result<WorkflowHistoryEntry, Error> {
if (raw === null || typeof raw !== "object") {
return err(new Error(`workflow "${workflowName}" history[${index}] must be a mapping`));
}
const he = raw as Record<string, unknown>;
const hash = he.hash;
const timestamp = he.timestamp;
if (typeof hash !== "string" || typeof timestamp !== "number" || !Number.isFinite(timestamp)) {
return err(
new Error(`workflow "${workflowName}" history[${index}] must have hash and timestamp`),
);
}
return ok({ hash, timestamp });
}
export function normalizeWorkflowRegistryEntry(
workflowName: string,
raw: unknown,
): Result<WorkflowRegistryEntry, Error> {
if (raw === null || typeof raw !== "object") {
return err(new Error(`workflow "${workflowName}" must be a mapping`));
}
const e = raw as Record<string, unknown>;
const hash = e.hash;
const timestamp = e.timestamp;
const historyRaw = e.history;
if (typeof hash !== "string") {
return err(new Error(`workflow "${workflowName}" must have a string hash`));
}
if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) {
return err(new Error(`workflow "${workflowName}" must have a finite numeric timestamp`));
}
if (!Array.isArray(historyRaw)) {
return err(new Error(`workflow "${workflowName}" must have a history array`));
}
const history: WorkflowHistoryEntry[] = [];
for (let i = 0; i < historyRaw.length; i++) {
const item = historyRaw[i];
const next = normalizeWorkflowHistoryEntry(workflowName, i, item);
if (!next.ok) {
return next;
}
history.push(next.value);
}
return ok({ hash, timestamp, history });
}
export function normalizeWorkflowRegistryRoot(raw: unknown): Result<WorkflowRegistryFile, Error> {
if (raw === null || typeof raw !== "object") {
return err(new Error("registry root must be a mapping"));
}
const root = raw as Record<string, unknown>;
const workflowsRaw = root.workflows;
if (workflowsRaw === null || workflowsRaw === undefined || typeof workflowsRaw !== "object") {
return err(new Error('registry must contain a "workflows" mapping'));
}
const workflows: Record<string, WorkflowRegistryEntry> = {};
for (const [name, entryRaw] of Object.entries(workflowsRaw)) {
const entryResult = normalizeWorkflowRegistryEntry(name, entryRaw);
if (!entryResult.ok) {
return entryResult;
}
workflows[name] = entryResult.value;
}
return ok({ workflows });
}
+14
View File
@@ -0,0 +1,14 @@
export type WorkflowHistoryEntry = {
hash: string;
timestamp: number;
};
export type WorkflowRegistryEntry = {
hash: string;
timestamp: number;
history: WorkflowHistoryEntry[];
};
export type WorkflowRegistryFile = {
workflows: Record<string, WorkflowRegistryEntry>;
};
+16 -61
View File
@@ -3,22 +3,19 @@ import { dirname, join } from "node:path";
import { parseDocument, stringify } from "yaml";
import { normalizeWorkflowRegistryRoot } from "./registry-normalize.js";
import type {
WorkflowHistoryEntry,
WorkflowRegistryEntry,
WorkflowRegistryFile,
} from "./registry-types.js";
import { err, ok, type Result } from "./result.js";
export type WorkflowHistoryEntry = {
hash: string;
timestamp: number;
};
export type WorkflowRegistryEntry = {
hash: string;
timestamp: number;
history: WorkflowHistoryEntry[];
};
export type WorkflowRegistryFile = {
workflows: Record<string, WorkflowRegistryEntry>;
};
export type {
WorkflowHistoryEntry,
WorkflowRegistryEntry,
WorkflowRegistryFile,
} from "./registry-types.js";
export function workflowRegistryPath(storageRoot: string): string {
return join(storageRoot, "workflow.yaml");
@@ -28,50 +25,6 @@ function emptyRegistry(): WorkflowRegistryFile {
return { workflows: {} };
}
function normalizeRegistry(raw: unknown): Result<WorkflowRegistryFile, Error> {
if (raw === null || typeof raw !== "object") {
return err(new Error("registry root must be a mapping"));
}
const root = raw as Record<string, unknown>;
const workflowsRaw = root.workflows;
if (workflowsRaw === null || workflowsRaw === undefined || typeof workflowsRaw !== "object") {
return err(new Error('registry must contain a "workflows" mapping'));
}
const workflows: Record<string, WorkflowRegistryEntry> = {};
for (const [name, entryRaw] of Object.entries(workflowsRaw)) {
if (entryRaw === null || typeof entryRaw !== "object") {
return err(new Error(`workflow "${name}" must be a mapping`));
}
const e = entryRaw as Record<string, unknown>;
const hash = e.hash;
const timestamp = e.timestamp;
const historyRaw = e.history;
if (typeof hash !== "string") {
return err(new Error(`workflow "${name}" must have a string hash`));
}
if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) {
return err(new Error(`workflow "${name}" must have a finite numeric timestamp`));
}
if (!Array.isArray(historyRaw)) {
return err(new Error(`workflow "${name}" must have a history array`));
}
const history: WorkflowHistoryEntry[] = [];
for (let i = 0; i < historyRaw.length; i++) {
const h = historyRaw[i];
if (h === null || typeof h !== "object") {
return err(new Error(`workflow "${name}" history[${i}] must be a mapping`));
}
const he = h as Record<string, unknown>;
if (typeof he.hash !== "string" || typeof he.timestamp !== "number" || !Number.isFinite(he.timestamp)) {
return err(new Error(`workflow "${name}" history[${i}] must have hash and timestamp`));
}
history.push({ hash: he.hash, timestamp: he.timestamp });
}
workflows[name] = { hash, timestamp, history };
}
return ok({ workflows });
}
export function parseWorkflowRegistryYaml(text: string): Result<WorkflowRegistryFile, Error> {
if (text.trim() === "") {
return ok(emptyRegistry());
@@ -82,14 +35,16 @@ export function parseWorkflowRegistryYaml(text: string): Result<WorkflowRegistry
} catch (e) {
return err(e instanceof Error ? e : new Error(String(e)));
}
return normalizeRegistry(doc);
return normalizeWorkflowRegistryRoot(doc);
}
export function stringifyWorkflowRegistryYaml(registry: WorkflowRegistryFile): string {
return `${stringify(registry, { indent: 2 })}\n`;
return `${stringify(registry, { indent: 2, defaultStringType: "QUOTE_DOUBLE" })}\n`;
}
export async function readWorkflowRegistry(storageRoot: string): Promise<Result<WorkflowRegistryFile, Error>> {
export async function readWorkflowRegistry(
storageRoot: string,
): Promise<Result<WorkflowRegistryFile, Error>> {
const path = workflowRegistryPath(storageRoot);
let text: string;
try {
+63
View File
@@ -0,0 +1,63 @@
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
/** Maps role names → their meta types. Single generic drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
/** Typed output of a Role execution. */
export type RoleResult<Meta extends Record<string, unknown>> = {
content: string;
meta: Meta;
};
/** Engine start frame: initial prompt + thread identity. */
export type StartStep = {
role: typeof START;
content: string;
meta: { maxRounds: number; threadId: string };
timestamp: number;
};
/** A completed role step in the thread. */
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
}[keyof M & string];
/** Thread-scoped context passed to roles and moderator. */
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
start: StartStep;
steps: RoleStep<M>[];
};
/**
* A Role — receives full thread context, returns typed content + meta.
* Implementation can be an agent, LLM call, script, HTTP request, etc.
*/
export type Role<Meta extends Record<string, unknown>> = (
ctx: ThreadContext,
) => Promise<RoleResult<Meta>>;
/**
* An Agent — raw string output interface for LLM/CLI adapters.
* Structured meta is extracted by the role's extract layer.
*/
export type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise<string>;
/**
* The Moderator — a pure routing function.
* Receives the full thread context (start + all prior steps).
* On initial call, `steps` is empty.
* Returns the next role name or END to terminate.
*/
export type Moderator<M extends RoleMeta> = (
ctx: ThreadContext<M>,
) => (keyof M & string) | typeof END;
/** Complete workflow definition as authored by users. */
export type WorkflowDefinition<M extends RoleMeta> = {
name: string;
roles: { [K in keyof M & string]: Role<M[K]> };
moderator: Moderator<M>;
};
@@ -0,0 +1,6 @@
import { fileURLToPath } from "node:url";
/** Absolute path to `worker-host.ts` for spawning bundle worker processes. */
export function getWorkerHostScriptPath(): string {
return fileURLToPath(new URL("./worker.ts", import.meta.url));
}
+295
View File
@@ -0,0 +1,295 @@
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import { pathToFileURL } from "node:url";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { createLogger } from "./logger.js";
import type { RoleMeta, WorkflowDefinition } from "./types.js";
const bootLog = createLogger({ sink: { kind: "stderr" } });
type RunCommand = {
type: "run";
threadId: string;
prompt: string;
options: { isDryRun: boolean; maxRounds: number };
};
type KillCommand = {
type: "kill";
threadId: string;
};
type ControlCommand = RunCommand | KillCommand;
function parseControlPayload(payload: unknown): ControlCommand | null {
if (payload === null || typeof payload !== "object") {
return null;
}
const rec = payload as Record<string, unknown>;
const type = rec.type;
if (type === "kill") {
const threadId = rec.threadId;
if (typeof threadId !== "string") {
return null;
}
return { type: "kill", threadId };
}
if (type === "run") {
const threadId = rec.threadId;
const prompt = rec.prompt;
const options = rec.options;
if (typeof threadId !== "string" || typeof prompt !== "string") {
return null;
}
if (options === null || typeof options !== "object") {
return null;
}
const optRec = options as Record<string, unknown>;
const isDryRun = optRec.isDryRun;
const maxRounds = optRec.maxRounds;
if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") {
return null;
}
return {
type: "run",
threadId,
prompt,
options: { isDryRun, maxRounds },
};
}
return null;
}
function parseCommandLine(line: string): ControlCommand | null {
const trimmed = line.trim();
if (trimmed === "") {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed) as unknown;
} catch {
bootLog("S8KQ3WJP", "worker received invalid JSON control line");
return null;
}
return parseControlPayload(parsed);
}
function isWorkflowDefinitionLike(value: unknown): value is WorkflowDefinition<RoleMeta> {
if (value === null || typeof value !== "object") {
return false;
}
const rec = value as Record<string, unknown>;
if (typeof rec.name !== "string") {
return false;
}
if (rec.roles === null || typeof rec.roles !== "object") {
return false;
}
if (typeof rec.moderator !== "function") {
return false;
}
return true;
}
async function readLineFromSocket(socket: Socket): Promise<string | null> {
return await new Promise((resolve) => {
let buf = "";
function onData(chunk: Buffer): void {
buf += chunk.toString("utf8");
const nl = buf.indexOf("\n");
if (nl >= 0) {
cleanup();
resolve(buf.slice(0, nl));
}
}
function onEnd(): void {
cleanup();
resolve(buf === "" ? null : buf);
}
function onError(): void {
cleanup();
resolve(null);
}
function cleanup(): void {
socket.off("data", onData);
socket.off("end", onEnd);
socket.off("error", onError);
}
socket.on("data", onData);
socket.on("end", onEnd);
socket.on("error", onError);
});
}
async function main(): Promise<void> {
const bundlePath = process.argv[2];
const storageRoot = process.argv[3];
const hash = process.argv[4];
if (
bundlePath === undefined ||
storageRoot === undefined ||
hash === undefined ||
bundlePath === "" ||
storageRoot === "" ||
hash === ""
) {
bootLog("H7XN4MKQ", "worker usage: worker <bundlePath> <storageRoot> <hash>");
process.exit(2);
return;
}
// Dynamic import required: user bundle path resolved at runtime
const modUnknown: unknown = await import(pathToFileURL(bundlePath).href);
const modRec = modUnknown as Record<string, unknown>;
const defaultExport = modRec.default;
if (!isWorkflowDefinitionLike(defaultExport)) {
bootLog(
"T4BW9YJX",
"workflow bundle default export must be a WorkflowDefinition { name, roles, moderator }",
);
process.exit(2);
return;
}
const def = defaultExport;
const controllers = new Map<string, AbortController>();
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
function cancelShutdownTimer(): void {
if (shutdownTimer !== null) {
clearTimeout(shutdownTimer);
shutdownTimer = null;
}
}
function scheduleShutdown(): void {
cancelShutdownTimer();
shutdownTimer = setTimeout(() => {
void unlink(workerCtlPath).catch(() => {});
process.exit(0);
}, 150);
}
function bumpStart(): void {
cancelShutdownTimer();
activeThreads++;
}
function bumpDone(): void {
activeThreads--;
if (activeThreads <= 0) {
activeThreads = 0;
scheduleShutdown();
}
}
async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise<void> {
if (cmd.type === "kill") {
const ac = controllers.get(cmd.threadId);
if (ac !== undefined) {
ac.abort();
bootLog("P9XK2WNQ", `kill requested for thread ${cmd.threadId}`);
}
socket?.end();
return;
}
bumpStart();
const threadId = cmd.threadId;
const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`);
const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId,
hash,
dataJsonlPath,
infoJsonlPath,
};
const existing = controllers.get(threadId);
if (existing !== undefined) {
existing.abort();
controllers.delete(threadId);
}
const ac = new AbortController();
controllers.set(threadId, ac);
try {
await mkdir(dirname(runningPath), { recursive: true });
await mkdir(dirname(dataJsonlPath), { recursive: true });
await writeFile(runningPath, "", "utf8");
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
await executeThread(def, cmd.prompt, { ...cmd.options, signal: ac.signal }, io, logger);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
} finally {
controllers.delete(threadId);
await unlink(runningPath).catch(() => {});
bumpDone();
socket?.end();
}
}
if (typeof process.send === "function") {
process.on("message", (msg: unknown) => {
const cmd = parseControlPayload(msg);
if (cmd === null) {
return;
}
void dispatchCommand(cmd, null);
});
}
const server = createServer((socket) => {
void (async () => {
const line = await readLineFromSocket(socket);
if (line === null) {
socket.end();
return;
}
const cmd = parseCommandLine(line);
if (cmd === null) {
socket.end();
return;
}
await dispatchCommand(cmd, socket);
})();
});
server.on("error", (err) => {
bootLog("W8YK4NPX", `worker server error: ${err.message}`);
process.exit(1);
});
await new Promise<void>((resolve) => {
server.listen(0, "127.0.0.1", () => {
resolve();
});
});
const addr = server.address();
if (addr === null || typeof addr === "string") {
bootLog("R9XK4MNW", "worker failed to bind TCP address");
process.exit(1);
return;
}
process.stdout.write(`READY ${addr.port}\n`);
await new Promise<void>(() => {});
}
void main();
+1 -4
View File
@@ -15,8 +15,5 @@
"composite": true,
"outDir": "dist"
},
"references": [
{ "path": "packages/workflow" },
{ "path": "packages/cli-workflow" }
]
"references": [{ "path": "packages/workflow" }, { "path": "packages/cli-workflow" }]
}