From 7582a88d6b0f6e4ae6e165da903d9311cfb7e086 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 6 May 2026 04:59:54 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=202=20=E2=80=94=20Thread=20lifecy?= =?UTF-8?q?cle,=20execution=20engine,=20worker,=20CLI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - types.ts: START/END, RoleMeta, ThreadContext, Role, Moderator, WorkflowDefinition - engine.ts: executeThread with JSONL persistence + AbortSignal - worker.ts: per-bundle process, TCP IPC, kill individual threads - CLI: run/ps/kill/threads/thread/thread rm commands - 32 tests pass, biome clean 小橘 --- biome.json | 29 +- package.json | 9 +- .../cli-workflow/__tests__/commands.test.ts | 100 ++++++ .../cli-workflow/__tests__/thread-cli.test.ts | 214 +++++++++++++ packages/cli-workflow/package.json | 3 +- packages/cli-workflow/src/bundle-store.ts | 52 +++ packages/cli-workflow/src/cli-dispatch.ts | 228 ++++++++++++++ packages/cli-workflow/src/cli-output.ts | 9 + packages/cli-workflow/src/cli.ts | 10 +- packages/cli-workflow/src/cmd-add.ts | 77 +++++ packages/cli-workflow/src/cmd-kill.ts | 39 +++ packages/cli-workflow/src/cmd-list.ts | 32 ++ packages/cli-workflow/src/cmd-ps.ts | 9 + packages/cli-workflow/src/cmd-remove.ts | 34 ++ packages/cli-workflow/src/cmd-run.ts | 54 ++++ packages/cli-workflow/src/cmd-show.ts | 43 +++ packages/cli-workflow/src/cmd-thread.ts | 42 +++ packages/cli-workflow/src/cmd-threads.ts | 31 ++ packages/cli-workflow/src/fs-utils.ts | 22 ++ packages/cli-workflow/src/run-argv.ts | 84 +++++ packages/cli-workflow/src/storage-env.ts | 10 + packages/cli-workflow/src/thread-scan.ts | 143 +++++++++ packages/cli-workflow/src/worker-spawn.ts | 190 +++++++++++ packages/cli-workflow/src/workflow-name.ts | 12 + packages/workflow/__tests__/base32.test.ts | 38 +++ .../__tests__/bundle-validator.test.ts | 66 ++++ packages/workflow/__tests__/engine.test.ts | 137 ++++++++ packages/workflow/__tests__/hash.test.ts | 24 ++ packages/workflow/__tests__/logger.test.ts | 31 ++ packages/workflow/__tests__/registry.test.ts | 77 +++++ packages/workflow/__tests__/result.test.ts | 21 ++ .../__tests__/thread-jsonl-format.test.ts | 41 +++ packages/workflow/__tests__/ulid.test.ts | 29 ++ packages/workflow/__tests__/worker.test.ts | 120 +++++++ packages/workflow/src/base32.ts | 13 +- packages/workflow/src/bundle-validator.ts | 186 +++++++---- packages/workflow/src/engine.ts | 143 +++++++++ packages/workflow/src/index.ts | 25 +- packages/workflow/src/logger.ts | 32 +- packages/workflow/src/registry-normalize.ts | 77 +++++ packages/workflow/src/registry-types.ts | 14 + packages/workflow/src/registry.ts | 77 +---- packages/workflow/src/types.ts | 63 ++++ packages/workflow/src/worker-entry-path.ts | 6 + packages/workflow/src/worker.ts | 295 ++++++++++++++++++ tsconfig.json | 5 +- 46 files changed, 2829 insertions(+), 167 deletions(-) create mode 100644 packages/cli-workflow/__tests__/commands.test.ts create mode 100644 packages/cli-workflow/__tests__/thread-cli.test.ts create mode 100644 packages/cli-workflow/src/bundle-store.ts create mode 100644 packages/cli-workflow/src/cli-dispatch.ts create mode 100644 packages/cli-workflow/src/cli-output.ts create mode 100644 packages/cli-workflow/src/cmd-add.ts create mode 100644 packages/cli-workflow/src/cmd-kill.ts create mode 100644 packages/cli-workflow/src/cmd-list.ts create mode 100644 packages/cli-workflow/src/cmd-ps.ts create mode 100644 packages/cli-workflow/src/cmd-remove.ts create mode 100644 packages/cli-workflow/src/cmd-run.ts create mode 100644 packages/cli-workflow/src/cmd-show.ts create mode 100644 packages/cli-workflow/src/cmd-thread.ts create mode 100644 packages/cli-workflow/src/cmd-threads.ts create mode 100644 packages/cli-workflow/src/fs-utils.ts create mode 100644 packages/cli-workflow/src/run-argv.ts create mode 100644 packages/cli-workflow/src/storage-env.ts create mode 100644 packages/cli-workflow/src/thread-scan.ts create mode 100644 packages/cli-workflow/src/worker-spawn.ts create mode 100644 packages/cli-workflow/src/workflow-name.ts create mode 100644 packages/workflow/__tests__/base32.test.ts create mode 100644 packages/workflow/__tests__/bundle-validator.test.ts create mode 100644 packages/workflow/__tests__/engine.test.ts create mode 100644 packages/workflow/__tests__/hash.test.ts create mode 100644 packages/workflow/__tests__/logger.test.ts create mode 100644 packages/workflow/__tests__/registry.test.ts create mode 100644 packages/workflow/__tests__/result.test.ts create mode 100644 packages/workflow/__tests__/thread-jsonl-format.test.ts create mode 100644 packages/workflow/__tests__/ulid.test.ts create mode 100644 packages/workflow/__tests__/worker.test.ts create mode 100644 packages/workflow/src/engine.ts create mode 100644 packages/workflow/src/registry-normalize.ts create mode 100644 packages/workflow/src/registry-types.ts create mode 100644 packages/workflow/src/types.ts create mode 100644 packages/workflow/src/worker-entry-path.ts create mode 100644 packages/workflow/src/worker.ts diff --git a/biome.json b/biome.json index bffcdba..61030ca 100644 --- a/biome.json +++ b/biome.json @@ -1,11 +1,9 @@ { - "$schema": "https://biomejs.dev/schemas/1.9.0/schema.json", + "$schema": "https://biomejs.dev/schemas/2.4.14/schema.json", "files": { - "ignore": ["**/dist/**", "**/node_modules/**"] - }, - "organizeImports": { - "enabled": true + "includes": ["**", "!**/dist", "!**/node_modules"] }, + "assist": { "actions": { "source": { "organizeImports": "on" } } }, "formatter": { "indentStyle": "space", "indentWidth": 2, @@ -19,7 +17,7 @@ }, "overrides": [ { - "include": ["**/__tests__/**"], + "includes": ["**/__tests__/**"], "linter": { "rules": { "suspicious": { @@ -30,6 +28,16 @@ } } } + }, + { + "includes": ["**/*.d.ts"], + "linter": { + "rules": { + "style": { + "noDefaultExport": "off" + } + } + } } ], "linter": { @@ -43,7 +51,6 @@ "noParameterProperties": "error", "useImportType": "error", "useShorthandFunctionType": "error", - "noVar": "error", "useConst": "error", "useEnumInitializers": "error" }, @@ -60,15 +67,15 @@ } }, "suspicious": { - "noExplicitAny": "error" + "noExplicitAny": "error", + "noVar": "error", + "noConsole": "error" }, "correctness": { "noUnusedVariables": "error", "noUnusedImports": "error" }, - "nursery": { - "noConsole": "error" - } + "nursery": {} } } } diff --git a/package.json b/package.json index 01edf45..e79f496 100644 --- a/package.json +++ b/package.json @@ -1,9 +1,16 @@ { "name": "@uncaged/workflow-monorepo", "private": true, - "workspaces": ["packages/*"], + "workspaces": [ + "packages/*" + ], "scripts": { "build": "bun run --filter '*' build", + "check": "biome check .", + "format": "biome format --write .", "test": "bun run --filter '*' test" + }, + "devDependencies": { + "@biomejs/biome": "^2.4.14" } } diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts new file mode 100644 index 0000000..366877f --- /dev/null +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -0,0 +1,100 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { cmdAdd } from "../src/cmd-add.js"; +import { cmdList, formatListLines } from "../src/cmd-list.js"; +import { cmdRemove } from "../src/cmd-remove.js"; +import { cmdShow } from "../src/cmd-show.js"; + +describe("cli workflow commands", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("add / list / show / remove roundtrip", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile( + bundlePath, + `import fs from "node:fs"; + +export default { + name: "solve-issue", + roles: { + noop: async () => { + fs.existsSync("."); + return { content: "ok", meta: { done: true } }; + }, + }, + moderator(ctx) { + if (ctx.steps.length === 0) { + return "noop"; + } + return "__end__"; + }, +}; +`, + "utf8", + ); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + + const listed = await cmdList(storageRoot); + expect(listed.ok).toBe(true); + if (listed.ok) { + const lines = formatListLines(listed.value); + expect(lines.some((l) => l.startsWith("solve-issue\t"))).toBe(true); + } + + const shown = await cmdShow(storageRoot, "solve-issue"); + expect(shown.ok).toBe(true); + if (!shown.ok) { + return; + } + expect(shown.value.hash.length).toBe(13); + + const bundleOnDisk = await readFile( + join(storageRoot, "bundles", `${shown.value.hash}.esm.js`), + "utf8", + ); + expect(bundleOnDisk.length).toBeGreaterThan(0); + + const removed = await cmdRemove(storageRoot, "solve-issue"); + expect(removed.ok).toBe(true); + + const listedAfter = await cmdList(storageRoot); + expect(listedAfter.ok).toBe(true); + if (listedAfter.ok) { + expect(formatListLines(listedAfter.value)[0]).toBe("(no workflows registered)"); + } + }); + + test("add rejects invalid bundles", async () => { + const bundlePath = join(storageRoot, "bad.esm.js"); + await writeFile( + bundlePath, + 'import x from "./local";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n', + "utf8", + ); + const r = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(r.ok).toBe(false); + }); +}); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts new file mode 100644 index 0000000..7a2255d --- /dev/null +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -0,0 +1,214 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { spawnSync } from "node:child_process"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import { cmdAdd } from "../src/cmd-add.js"; +import { cmdKill } from "../src/cmd-kill.js"; +import { cmdPs } from "../src/cmd-ps.js"; +import { cmdRun } from "../src/cmd-run.js"; +import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; +import { cmdThreads } from "../src/cmd-threads.js"; +import { pathExists } from "../src/fs-utils.js"; + +const fastBundleSource = `export default { + name: "solve-issue", + roles: { + planner: async () => ({ content: "plan", meta: { plan: "x" } }), + coder: async () => ({ content: "code", meta: { diff: "y" } }), + }, + moderator(ctx) { + if (ctx.steps.length === 0) return "planner"; + if (ctx.steps.length === 1) return "coder"; + return "__end__"; + }, +}; +`; + +const slowPlannerBundleSource = `export default { + name: "solve-issue", + roles: { + planner: async () => { + await new Promise((r) => setTimeout(r, 400)); + return { content: "plan", meta: { plan: "x" } }; + }, + coder: async () => ({ content: "code", meta: { diff: "y" } }), + }, + moderator(ctx) { + if (ctx.steps.length === 0) return "planner"; + if (ctx.steps.length === 1) return "coder"; + return "__end__"; + }, +}; +`; + +const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); + +const abortablePlannerBundleSource = `export default { + name: "solve-issue", + roles: { + planner: async () => { + await new Promise((r) => setTimeout(r, 600)); + return { content: "plan", meta: { plan: "x" } }; + }, + coder: async () => ({ content: "code", meta: { diff: "y" } }), + }, + moderator(ctx) { + if (ctx.steps.length === 0) return "planner"; + if (ctx.steps.length === 1) return "coder"; + return "__end__"; + }, +}; +`; + +describe("cli thread commands", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-thread-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("run / threads / thread / thread rm", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, fastBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + + let threads = await cmdThreads(storageRoot, []); + for ( + let attempt = 0; + attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId)); + attempt++ + ) { + await new Promise((r) => setTimeout(r, 20)); + threads = await cmdThreads(storageRoot, []); + } + expect(threads.ok).toBe(true); + if (!threads.ok) { + return; + } + expect(threads.value.some((l) => l.includes(threadId))).toBe(true); + + const shown = await cmdThreadShow(storageRoot, threadId); + expect(shown.ok).toBe(true); + if (!shown.ok) { + return; + } + expect(shown.value.includes('"threadId"')).toBe(true); + + const removed = await cmdThreadRemove(storageRoot, threadId); + expect(removed.ok).toBe(true); + + const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + expect(await pathExists(dataPath)).toBe(false); + }); + + test("cli entrypoint dispatches threads / ps (spawn)", () => { + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], { + env, + encoding: "utf8", + }); + expect(threads.status).toBe(0); + + const ps = spawnSync(process.execPath, [cliEntryPath, "ps"], { env, encoding: "utf8" }); + expect(ps.status).toBe(0); + }); + + test("ps lists running threads while planner role is in-flight", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, slowPlannerBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + + await new Promise((r) => setTimeout(r, 50)); + const psEarly = await cmdPs(storageRoot); + expect(psEarly.some((l) => l.includes(threadId))).toBe(true); + + await new Promise((r) => setTimeout(r, 900)); + + const psLate = await cmdPs(storageRoot); + expect(psLate).toEqual(["(no running threads)"]); + }); + + test("kill stops thread after the in-flight role (before subsequent roles)", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, abortablePlannerBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, "solve-issue", bundlePath); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", false, 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + + await new Promise((r) => setTimeout(r, 50)); + + const killed = await cmdKill(storageRoot, threadId); + expect(killed.ok).toBe(true); + + await new Promise((r) => setTimeout(r, 900)); + + const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + const text = await readFile(dataPath, "utf8"); + const lines = text + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(2); + + const runningPath = join(dirname(dataPath), `${threadId}.running`); + expect(await pathExists(runningPath)).toBe(false); + }); +}); diff --git a/packages/cli-workflow/package.json b/packages/cli-workflow/package.json index ba38880..80f997e 100644 --- a/packages/cli-workflow/package.json +++ b/packages/cli-workflow/package.json @@ -6,7 +6,8 @@ "uncaged-workflow": "src/cli.ts" }, "dependencies": { - "@uncaged/workflow": "workspace:*" + "@uncaged/workflow": "workspace:*", + "yaml": "^2.8.4" }, "scripts": { "build": "echo 'TODO'", diff --git a/packages/cli-workflow/src/bundle-store.ts b/packages/cli-workflow/src/bundle-store.ts new file mode 100644 index 0000000..a376127 --- /dev/null +++ b/packages/cli-workflow/src/bundle-store.ts @@ -0,0 +1,52 @@ +import { copyFile, mkdir, readFile, stat } from "node:fs/promises"; +import { join } from "node:path"; + +import { err, ok, type Result } from "@uncaged/workflow"; + +async function pathExists(path: string): Promise { + try { + await stat(path); + return true; + } catch { + return false; + } +} + +export async function storeWorkflowBundleCopy( + storageRoot: string, + hash: string, + resolvedSourcePath: string, + sourceText: string, +): Promise> { + const bundlesDir = join(storageRoot, "bundles"); + const destPath = join(bundlesDir, `${hash}.esm.js`); + + try { + await mkdir(bundlesDir, { recursive: true }); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + return err(`failed to store bundle: ${message}`); + } + + if (!(await pathExists(destPath))) { + try { + await copyFile(resolvedSourcePath, destPath); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + return err(`failed to store bundle: ${message}`); + } + return ok(undefined); + } + + let existing: string; + try { + existing = await readFile(destPath, "utf8"); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + return err(`failed to store bundle: ${message}`); + } + if (existing !== sourceText) { + return err(`bundle hash ${hash} already exists with different contents; refusing to overwrite`); + } + return ok(undefined); +} diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts new file mode 100644 index 0000000..5671d57 --- /dev/null +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -0,0 +1,228 @@ +import { printCliError, printCliLine } from "./cli-output.js"; +import { cmdAdd, formatAddSuccess } from "./cmd-add.js"; +import { cmdKill } from "./cmd-kill.js"; +import { cmdList, formatListLines } from "./cmd-list.js"; +import { cmdPs } from "./cmd-ps.js"; +import { cmdRemove } from "./cmd-remove.js"; +import { cmdRun } from "./cmd-run.js"; +import { cmdShow, formatShowYaml } from "./cmd-show.js"; +import { cmdThreadRemove, cmdThreadShow } from "./cmd-thread.js"; +import { cmdThreads } from "./cmd-threads.js"; +import { parseRunArgv } from "./run-argv.js"; + +function usage(): string { + return [ + "Usage:", + " uncaged-workflow add ", + " uncaged-workflow list", + " uncaged-workflow show ", + " uncaged-workflow remove ", + " uncaged-workflow run [--prompt ] [--dry-run] [--max-rounds N]", + " uncaged-workflow ps", + " uncaged-workflow kill ", + " uncaged-workflow threads [name]", + " uncaged-workflow thread ", + " uncaged-workflow thread rm ", + ].join("\n"); +} + +async function dispatchAdd(storageRoot: string, argv: string[]): Promise { + const name = argv[0]; + const file = argv[1]; + if (name === undefined || file === undefined || argv.length > 2) { + printCliError(`${usage()}\n\nerror: add requires `); + return 1; + } + const result = await cmdAdd(storageRoot, name, file); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(formatAddSuccess(name, file, result.value.hash)); + return 0; +} + +async function dispatchList(storageRoot: string, argv: string[]): Promise { + if (argv.length > 0) { + printCliError(`${usage()}\n\nerror: list takes no arguments`); + return 1; + } + const result = await cmdList(storageRoot); + if (!result.ok) { + printCliError(result.error); + return 1; + } + for (const line of formatListLines(result.value)) { + printCliLine(line); + } + return 0; +} + +async function dispatchShow(storageRoot: string, argv: string[]): Promise { + const name = argv[0]; + if (name === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: show requires `); + return 1; + } + const result = await cmdShow(storageRoot, name); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(formatShowYaml(name, result.value)); + return 0; +} + +async function dispatchRemove(storageRoot: string, argv: string[]): Promise { + const name = argv[0]; + if (name === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: remove requires `); + return 1; + } + const result = await cmdRemove(storageRoot, name); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`removed workflow "${name}" from registry`); + return 0; +} + +async function dispatchRun(storageRoot: string, argv: string[]): Promise { + const parsed = parseRunArgv(argv); + if (!parsed.ok) { + printCliError(`${usage()}\n\nerror: ${parsed.error}`); + return 1; + } + + const result = await cmdRun( + storageRoot, + parsed.value.name, + parsed.value.prompt, + parsed.value.dryRun, + parsed.value.maxRounds, + ); + if (!result.ok) { + printCliError(result.error); + return 1; + } + + printCliLine(result.value.threadId); + return 0; +} + +async function dispatchPs(storageRoot: string, argv: string[]): Promise { + if (argv.length > 0) { + printCliError(`${usage()}\n\nerror: ps takes no arguments`); + return 1; + } + for (const line of await cmdPs(storageRoot)) { + printCliLine(line); + } + return 0; +} + +async function dispatchKill(storageRoot: string, argv: string[]): Promise { + const threadId = argv[0]; + if (threadId === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: kill requires `); + return 1; + } + const result = await cmdKill(storageRoot, threadId); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`kill sent for thread ${threadId}`); + return 0; +} + +async function dispatchThreads(storageRoot: string, argv: string[]): Promise { + const result = await cmdThreads(storageRoot, argv); + if (!result.ok) { + printCliError(result.error); + return 1; + } + for (const line of result.value) { + printCliLine(line); + } + return 0; +} + +async function dispatchThread(storageRoot: string, argv: string[]): Promise { + const id = argv[0]; + if (id === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: thread requires `); + return 1; + } + const result = await cmdThreadShow(storageRoot, id); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(result.value); + return 0; +} + +async function dispatchThreadRm(storageRoot: string, argv: string[]): Promise { + const id = argv[0]; + if (id === undefined || argv.length > 1) { + printCliError(`${usage()}\n\nerror: thread rm requires `); + return 1; + } + const result = await cmdThreadRemove(storageRoot, id); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`removed thread ${id}`); + return 0; +} + +export async function runCli(storageRoot: string, argv: string[]): Promise { + if (argv.length === 0) { + printCliError(usage()); + return 1; + } + const command = argv[0]; + if (command === undefined) { + printCliError(usage()); + return 1; + } + const rest = argv.slice(1); + + if (command === "add") { + return dispatchAdd(storageRoot, rest); + } + if (command === "list") { + return dispatchList(storageRoot, rest); + } + if (command === "show") { + return dispatchShow(storageRoot, rest); + } + if (command === "remove") { + return dispatchRemove(storageRoot, rest); + } + if (command === "run") { + return dispatchRun(storageRoot, rest); + } + if (command === "ps") { + return dispatchPs(storageRoot, rest); + } + if (command === "kill") { + return dispatchKill(storageRoot, rest); + } + if (command === "threads") { + return dispatchThreads(storageRoot, rest); + } + if (command === "thread") { + const sub = rest[0]; + if (sub === "rm") { + return dispatchThreadRm(storageRoot, rest.slice(1)); + } + return dispatchThread(storageRoot, rest); + } + + printCliError(`${usage()}\n\nerror: unknown command ${command}`); + return 1; +} diff --git a/packages/cli-workflow/src/cli-output.ts b/packages/cli-workflow/src/cli-output.ts new file mode 100644 index 0000000..e41fc33 --- /dev/null +++ b/packages/cli-workflow/src/cli-output.ts @@ -0,0 +1,9 @@ +export function printCliLine(line: string): void { + // biome-ignore lint/suspicious/noConsole: CLI user-facing output + console.log(line); +} + +export function printCliError(line: string): void { + // biome-ignore lint/suspicious/noConsole: CLI user-facing errors + console.error(line); +} diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index 1f29714..120ad9e 100644 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -1,3 +1,9 @@ #!/usr/bin/env bun -// @uncaged/cli-workflow - uncaged-workflow CLI -console.log('uncaged-workflow'); + +import { runCli } from "./cli-dispatch.js"; +import { resolveWorkflowStorageRoot } from "./storage-env.js"; + +const argv = process.argv.slice(2); +const storageRoot = resolveWorkflowStorageRoot(); +const code = await runCli(storageRoot, argv); +process.exit(code); diff --git a/packages/cli-workflow/src/cmd-add.ts b/packages/cli-workflow/src/cmd-add.ts new file mode 100644 index 0000000..c4d2df0 --- /dev/null +++ b/packages/cli-workflow/src/cmd-add.ts @@ -0,0 +1,77 @@ +import { readFile, stat } from "node:fs/promises"; +import { basename, resolve } from "node:path"; + +import { + err, + hashWorkflowBundleBytes, + ok, + type Result, + readWorkflowRegistry, + registerWorkflowVersion, + validateWorkflowBundle, + writeWorkflowRegistry, +} from "@uncaged/workflow"; + +import { storeWorkflowBundleCopy } from "./bundle-store.js"; +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdAdd( + storageRoot: string, + name: string, + filePath: string, +): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + let resolvedPath: string; + try { + resolvedPath = resolve(filePath); + await stat(resolvedPath); + } catch { + return err(`bundle file not found: ${filePath}`); + } + + let source: string; + try { + source = await readFile(resolvedPath, "utf8"); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + return err(`failed to read bundle: ${message}`); + } + + const validated = validateWorkflowBundle({ + filePath: resolvedPath, + source, + }); + if (!validated.ok) { + return validated; + } + + const encoder = new TextEncoder(); + const bytes = encoder.encode(source); + const hash = hashWorkflowBundleBytes(bytes); + + const stored = await storeWorkflowBundleCopy(storageRoot, hash, resolvedPath, source); + if (!stored.ok) { + return stored; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const next = registerWorkflowVersion(reg.value, name, hash, Date.now()); + const written = await writeWorkflowRegistry(storageRoot, next); + if (!written.ok) { + return err(written.error.message); + } + + return ok({ hash }); +} + +export function formatAddSuccess(name: string, filePath: string, hash: string): string { + return `registered workflow "${name}" from ${basename(filePath)} as ${hash}`; +} diff --git a/packages/cli-workflow/src/cmd-kill.ts b/packages/cli-workflow/src/cmd-kill.ts new file mode 100644 index 0000000..75f23b9 --- /dev/null +++ b/packages/cli-workflow/src/cmd-kill.ts @@ -0,0 +1,39 @@ +import { join } from "node:path"; + +import { err, type Result } from "@uncaged/workflow"; + +import { readTextFileIfExists } from "./fs-utils.js"; +import { + resolveRunningHashForThread, + sendWorkerTcpCommand, + type WorkerCtl, +} from "./worker-spawn.js"; + +export async function cmdKill( + storageRoot: string, + threadId: string, +): Promise> { + const hashResult = await resolveRunningHashForThread(storageRoot, threadId); + if (!hashResult.ok) { + return hashResult; + } + + const ctlPath = join(storageRoot, "workers", `${hashResult.value}.json`); + const ctlText = await readTextFileIfExists(ctlPath); + if (ctlText === null) { + return err(`worker control file missing for bundle hash ${hashResult.value}`); + } + + let ctl: WorkerCtl; + try { + ctl = JSON.parse(ctlText) as WorkerCtl; + } catch { + return err(`corrupt worker control file: ${ctlPath}`); + } + + if (typeof ctl.port !== "number" || ctl.port <= 0) { + return err(`invalid worker control file: ${ctlPath}`); + } + + return await sendWorkerTcpCommand(ctl.port, { type: "kill", threadId }); +} diff --git a/packages/cli-workflow/src/cmd-list.ts b/packages/cli-workflow/src/cmd-list.ts new file mode 100644 index 0000000..82d60d5 --- /dev/null +++ b/packages/cli-workflow/src/cmd-list.ts @@ -0,0 +1,32 @@ +import { + err, + listRegisteredWorkflowNames, + ok, + type Result, + readWorkflowRegistry, + type WorkflowRegistryFile, +} from "@uncaged/workflow"; + +export async function cmdList(storageRoot: string): Promise> { + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + return ok(reg.value); +} + +export function formatListLines(registry: WorkflowRegistryFile): string[] { + const names = listRegisteredWorkflowNames(registry); + if (names.length === 0) { + return ["(no workflows registered)"]; + } + const lines: string[] = []; + for (const name of names) { + const entry = registry.workflows[name]; + if (entry === undefined) { + continue; + } + lines.push(`${name}\t${entry.hash}\t${entry.timestamp}`); + } + return lines; +} diff --git a/packages/cli-workflow/src/cmd-ps.ts b/packages/cli-workflow/src/cmd-ps.ts new file mode 100644 index 0000000..e24a0ab --- /dev/null +++ b/packages/cli-workflow/src/cmd-ps.ts @@ -0,0 +1,9 @@ +import { listRunningThreads } from "./thread-scan.js"; + +export async function cmdPs(storageRoot: string): Promise { + const rows = await listRunningThreads(storageRoot); + if (rows.length === 0) { + return ["(no running threads)"]; + } + return rows.map((r) => `${r.threadId}\t${r.hash}\t${r.workflowName ?? "(unknown)"}`); +} diff --git a/packages/cli-workflow/src/cmd-remove.ts b/packages/cli-workflow/src/cmd-remove.ts new file mode 100644 index 0000000..95185a5 --- /dev/null +++ b/packages/cli-workflow/src/cmd-remove.ts @@ -0,0 +1,34 @@ +import { + err, + ok, + type Result, + readWorkflowRegistry, + unregisterWorkflow, + writeWorkflowRegistry, +} from "@uncaged/workflow"; + +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdRemove(storageRoot: string, name: string): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const next = unregisterWorkflow(reg.value, name); + if (!next.ok) { + return err(next.error.message); + } + + const written = await writeWorkflowRegistry(storageRoot, next.value); + if (!written.ok) { + return err(written.error.message); + } + + return ok(undefined); +} diff --git a/packages/cli-workflow/src/cmd-run.ts b/packages/cli-workflow/src/cmd-run.ts new file mode 100644 index 0000000..0113ddd --- /dev/null +++ b/packages/cli-workflow/src/cmd-run.ts @@ -0,0 +1,54 @@ +import { join } from "node:path"; + +import { + err, + generateUlid, + getRegisteredWorkflow, + ok, + type Result, + readWorkflowRegistry, +} from "@uncaged/workflow"; +import { ensureWorkerForHash, sendWorkerTcpCommand } from "./worker-spawn.js"; +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdRun( + storageRoot: string, + name: string, + prompt: string, + isDryRun: boolean, + maxRounds: number, +): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const entry = getRegisteredWorkflow(reg.value, name); + if (entry === null) { + return err(`workflow not registered: ${name}`); + } + + const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`); + const worker = await ensureWorkerForHash(storageRoot, entry.hash, bundlePath); + if (!worker.ok) { + return worker; + } + + const threadId = generateUlid(Date.now()); + const sent = await sendWorkerTcpCommand(worker.value.port, { + type: "run", + threadId, + prompt, + options: { isDryRun, maxRounds }, + }); + if (!sent.ok) { + return sent; + } + + return ok({ threadId }); +} diff --git a/packages/cli-workflow/src/cmd-show.ts b/packages/cli-workflow/src/cmd-show.ts new file mode 100644 index 0000000..ce54a45 --- /dev/null +++ b/packages/cli-workflow/src/cmd-show.ts @@ -0,0 +1,43 @@ +import { + err, + getRegisteredWorkflow, + ok, + type Result, + readWorkflowRegistry, + type WorkflowRegistryEntry, +} from "@uncaged/workflow"; +import { stringify } from "yaml"; + +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdShow( + storageRoot: string, + name: string, +): Promise> { + const nameOk = validateCliWorkflowName(name); + if (!nameOk.ok) { + return nameOk; + } + + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + return err(reg.error.message); + } + + const entry = getRegisteredWorkflow(reg.value, name); + if (entry === null) { + return err(`workflow not found: ${name}`); + } + return ok(entry); +} + +export function formatShowYaml(name: string, entry: WorkflowRegistryEntry): string { + const payload = { + [name]: { + hash: entry.hash, + timestamp: entry.timestamp, + history: entry.history, + }, + }; + return stringify(payload, { indent: 2, defaultStringType: "QUOTE_DOUBLE" }); +} diff --git a/packages/cli-workflow/src/cmd-thread.ts b/packages/cli-workflow/src/cmd-thread.ts new file mode 100644 index 0000000..af5c6b1 --- /dev/null +++ b/packages/cli-workflow/src/cmd-thread.ts @@ -0,0 +1,42 @@ +import { unlink } from "node:fs/promises"; +import { dirname, join } from "node:path"; + +import { err, ok, type Result } from "@uncaged/workflow"; + +import { readTextFileIfExists } from "./fs-utils.js"; +import { resolveThreadDataPath } from "./thread-scan.js"; + +export async function cmdThreadShow( + storageRoot: string, + threadId: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const text = await readTextFileIfExists(dataPath); + if (text === null) { + return err(`thread data missing: ${threadId}`); + } + return ok(text.endsWith("\n") ? text.slice(0, -1) : text); +} + +export async function cmdThreadRemove( + storageRoot: string, + threadId: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + + const dir = dirname(dataPath); + const infoPath = join(dir, `${threadId}.info.jsonl`); + const runningPath = join(dir, `${threadId}.running`); + + await unlink(dataPath); + await unlink(infoPath).catch(() => {}); + await unlink(runningPath).catch(() => {}); + + return ok(undefined); +} diff --git a/packages/cli-workflow/src/cmd-threads.ts b/packages/cli-workflow/src/cmd-threads.ts new file mode 100644 index 0000000..ac55cca --- /dev/null +++ b/packages/cli-workflow/src/cmd-threads.ts @@ -0,0 +1,31 @@ +import { err, ok, type Result } from "@uncaged/workflow"; + +import { listHistoricalThreads } from "./thread-scan.js"; +import { validateCliWorkflowName } from "./workflow-name.js"; + +export async function cmdThreads( + storageRoot: string, + argv: string[], +): Promise> { + const nameFilter = argv[0]; + if (argv.length > 1) { + return err("threads expects at most one workflow name argument"); + } + + let workflowNameFilter: string | null = null; + if (nameFilter !== undefined) { + const nameOk = validateCliWorkflowName(nameFilter); + if (!nameOk.ok) { + return nameOk; + } + workflowNameFilter = nameFilter; + } + + const rows = await listHistoricalThreads(storageRoot, workflowNameFilter); + if (rows.length === 0) { + return ok(["(no threads found)"]); + } + + const lines = rows.map((r) => `${r.threadId}\t${r.hash}\t${r.workflowName ?? "(unknown)"}`); + return ok(lines); +} diff --git a/packages/cli-workflow/src/fs-utils.ts b/packages/cli-workflow/src/fs-utils.ts new file mode 100644 index 0000000..db3682b --- /dev/null +++ b/packages/cli-workflow/src/fs-utils.ts @@ -0,0 +1,22 @@ +import { readFile, stat } from "node:fs/promises"; + +export async function readTextFileIfExists(path: string): Promise { + try { + return await readFile(path, "utf8"); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return null; + } + throw e; + } +} + +export async function pathExists(path: string): Promise { + try { + await stat(path); + return true; + } catch { + return false; + } +} diff --git a/packages/cli-workflow/src/run-argv.ts b/packages/cli-workflow/src/run-argv.ts new file mode 100644 index 0000000..21276e7 --- /dev/null +++ b/packages/cli-workflow/src/run-argv.ts @@ -0,0 +1,84 @@ +import { err, ok, type Result } from "@uncaged/workflow"; + +export type ParsedRunArgv = { + name: string; + prompt: string; + dryRun: boolean; + maxRounds: number; +}; + +type FlagOk = + | { kind: "dry-run" } + | { kind: "prompt"; value: string } + | { kind: "max-rounds"; value: number }; + +function parseFlagAt(argv: string[], index: number): Result | null { + const flag = argv[index]; + if (flag === "--dry-run") { + return ok({ kind: "dry-run" }); + } + if (flag === "--prompt") { + const value = argv[index + 1]; + if (value === undefined) { + return err("missing value for --prompt"); + } + return ok({ kind: "prompt", value }); + } + if (flag === "--max-rounds") { + const value = argv[index + 1]; + if (value === undefined) { + return err("missing value for --max-rounds"); + } + const n = Number(value); + if (!Number.isFinite(n) || !Number.isInteger(n) || n < 0) { + return err("--max-rounds must be a non-negative integer"); + } + return ok({ kind: "max-rounds", value: n }); + } + return null; +} + +export function parseRunArgv(argv: string[]): Result { + let name: string | undefined; + let prompt = ""; + let dryRun = false; + let maxRounds = 5; + + let i = 0; + const first = argv[0]; + if (first !== undefined && !first.startsWith("--")) { + name = first; + i = 1; + } + + while (i < argv.length) { + const parsed = parseFlagAt(argv, i); + if (parsed === null) { + const unknown = argv[i]; + return err(`unknown run flag: ${unknown}`); + } + if (!parsed.ok) { + return parsed; + } + + const flag = parsed.value; + if (flag.kind === "dry-run") { + dryRun = true; + i += 1; + continue; + } + if (flag.kind === "prompt") { + prompt = flag.value; + i += 2; + continue; + } + maxRounds = flag.value; + i += 2; + } + + if (name === undefined || name === "") { + return err("run requires "); + } + + return ok({ name, prompt, dryRun, maxRounds }); +} diff --git a/packages/cli-workflow/src/storage-env.ts b/packages/cli-workflow/src/storage-env.ts new file mode 100644 index 0000000..0a5721c --- /dev/null +++ b/packages/cli-workflow/src/storage-env.ts @@ -0,0 +1,10 @@ +import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow"; + +/** Resolve storage root, honoring `UNCAGED_WORKFLOW_STORAGE_ROOT` for tests/tools. */ +export function resolveWorkflowStorageRoot(): string { + const override = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + if (override !== undefined && override !== "") { + return override; + } + return getDefaultWorkflowStorageRoot(); +} diff --git a/packages/cli-workflow/src/thread-scan.ts b/packages/cli-workflow/src/thread-scan.ts new file mode 100644 index 0000000..df06731 --- /dev/null +++ b/packages/cli-workflow/src/thread-scan.ts @@ -0,0 +1,143 @@ +import { readdir } from "node:fs/promises"; +import { join } from "node:path"; + +import { pathExists, readTextFileIfExists } from "./fs-utils.js"; + +export type RunningThreadRow = { + threadId: string; + hash: string; + workflowName: string | null; +}; + +export type HistoricalThreadRow = { + threadId: string; + hash: string; + workflowName: string | null; +}; + +async function readWorkflowNameFromDataJsonl(dataPath: string): Promise { + const text = await readTextFileIfExists(dataPath); + if (text === null) { + return null; + } + const firstLine = text.split("\n")[0]; + if (firstLine === undefined || firstLine.trim() === "") { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(firstLine) as unknown; + } catch { + return null; + } + if (parsed === null || typeof parsed !== "object") { + return null; + } + const name = (parsed as Record).name; + return typeof name === "string" ? name : null; +} + +/** Threads currently executing — identified via `.running` markers. */ +export async function listRunningThreads(storageRoot: string): Promise { + const logsRoot = join(storageRoot, "logs"); + if (!(await pathExists(logsRoot))) { + return []; + } + + const hashes = await readdir(logsRoot); + const out: RunningThreadRow[] = []; + + for (const hash of hashes) { + const dir = join(logsRoot, hash); + let entries: string[]; + try { + entries = await readdir(dir); + } catch { + continue; + } + + for (const fileName of entries) { + if (!fileName.endsWith(".running")) { + continue; + } + const threadId = fileName.slice(0, -".running".length); + const dataPath = join(dir, `${threadId}.data.jsonl`); + const workflowName = await readWorkflowNameFromDataJsonl(dataPath); + out.push({ threadId, hash, workflowName }); + } + } + + out.sort((a, b) => { + const ha = `${a.hash}/${a.threadId}`; + const hb = `${b.hash}/${b.threadId}`; + return ha.localeCompare(hb); + }); + + return out; +} + +/** + * Historical threads discovered via `*.data.jsonl`. + * When `workflowNameFilter` is non-null, only threads whose start record `name` matches are returned. + */ +export async function listHistoricalThreads( + storageRoot: string, + workflowNameFilter: string | null, +): Promise { + const logsRoot = join(storageRoot, "logs"); + if (!(await pathExists(logsRoot))) { + return []; + } + + const hashes = await readdir(logsRoot); + const out: HistoricalThreadRow[] = []; + + for (const hash of hashes) { + const dir = join(logsRoot, hash); + let entries: string[]; + try { + entries = await readdir(dir); + } catch { + continue; + } + + for (const fileName of entries) { + if (!fileName.endsWith(".data.jsonl")) { + continue; + } + const threadId = fileName.slice(0, -".data.jsonl".length); + const dataPath = join(dir, fileName); + const workflowName = await readWorkflowNameFromDataJsonl(dataPath); + if (workflowNameFilter !== null && workflowName !== workflowNameFilter) { + continue; + } + out.push({ threadId, hash, workflowName }); + } + } + + out.sort((a, b) => { + const ha = `${a.hash}/${a.threadId}`; + const hb = `${b.hash}/${b.threadId}`; + return ha.localeCompare(hb); + }); + + return out; +} + +export async function resolveThreadDataPath( + storageRoot: string, + threadId: string, +): Promise { + const logsRoot = join(storageRoot, "logs"); + if (!(await pathExists(logsRoot))) { + return null; + } + const hashes = await readdir(logsRoot); + for (const hash of hashes) { + const candidate = join(logsRoot, hash, `${threadId}.data.jsonl`); + if (await pathExists(candidate)) { + return candidate; + } + } + return null; +} diff --git a/packages/cli-workflow/src/worker-spawn.ts b/packages/cli-workflow/src/worker-spawn.ts new file mode 100644 index 0000000..677f85d --- /dev/null +++ b/packages/cli-workflow/src/worker-spawn.ts @@ -0,0 +1,190 @@ +import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; +import { mkdir, readdir, unlink, writeFile } from "node:fs/promises"; +import { createConnection } from "node:net"; +import { join } from "node:path"; + +import { err, getWorkerHostScriptPath, ok, type Result } from "@uncaged/workflow"; + +import { pathExists, readTextFileIfExists } from "./fs-utils.js"; + +export type WorkerCtl = { + pid: number; + port: number; +}; + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function waitForReadyLine( + childStdout: NodeJS.ReadableStream, + child: ChildProcessWithoutNullStreams, +): Promise> { + return await new Promise((resolve) => { + let buf = ""; + let settled = false; + + function finish(result: Result): void { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(result); + } + + function onData(chunk: Buffer | string): void { + buf += typeof chunk === "string" ? chunk : chunk.toString("utf8"); + const nl = buf.indexOf("\n"); + if (nl < 0) { + return; + } + const line = buf.slice(0, nl).trim(); + const prefix = "READY "; + if (!line.startsWith(prefix)) { + finish(err(`worker did not emit READY line (got: ${line})`)); + return; + } + const portText = line.slice(prefix.length); + const port = Number(portText); + if (!Number.isFinite(port) || port <= 0) { + finish(err(`worker READY line had invalid port: ${portText}`)); + return; + } + finish(ok(port)); + } + + function onEnd(): void { + finish(err("worker stdout ended before READY line")); + } + + function onExit(code: number | null): void { + finish(err(`worker exited before READY line (code ${code})`)); + } + + function cleanup(): void { + childStdout.off("data", onData); + childStdout.off("end", onEnd); + child.off("exit", onExit); + } + + childStdout.on("data", onData); + childStdout.on("end", onEnd); + child.on("exit", onExit); + }); +} + +async function spawnWorkerProcess( + bundlePath: string, + storageRoot: string, + hash: string, +): Promise> { + const scriptPath = getWorkerHostScriptPath(); + const child = spawn(process.execPath, [scriptPath, bundlePath, storageRoot, hash], { + stdio: ["ignore", "pipe", "inherit"], + }); + + if (child.stdout === null || child.pid === undefined) { + return err("failed to spawn worker process"); + } + + const pid = child.pid; + const ready = await waitForReadyLine(child.stdout, child); + if (!ready.ok) { + child.kill(); + return ready; + } + + child.unref(); + child.stdout.destroy(); + + return ok({ pid, port: ready.value }); +} + +export async function ensureWorkerForHash( + storageRoot: string, + hash: string, + bundlePath: string, +): Promise> { + const ctlPath = join(storageRoot, "workers", `${hash}.json`); + const existingText = await readTextFileIfExists(ctlPath); + if (existingText !== null) { + try { + const ctl = JSON.parse(existingText) as WorkerCtl; + if ( + typeof ctl.pid === "number" && + typeof ctl.port === "number" && + ctl.pid > 0 && + ctl.port > 0 && + isProcessAlive(ctl.pid) + ) { + return ok({ port: ctl.port }); + } + } catch { + // Corrupt control file — ignore and respawn. + } + await unlink(ctlPath).catch(() => {}); + } + + const spawned = await spawnWorkerProcess(bundlePath, storageRoot, hash); + if (!spawned.ok) { + return spawned; + } + + await mkdir(join(storageRoot, "workers"), { recursive: true }); + const ctl: WorkerCtl = { pid: spawned.value.pid, port: spawned.value.port }; + await writeFile(ctlPath, `${JSON.stringify(ctl)}\n`, "utf8"); + + return ok({ port: spawned.value.port }); +} + +export async function sendWorkerTcpCommand( + port: number, + payload: unknown, +): Promise> { + return await new Promise((resolve) => { + let settled = false; + const socket = createConnection({ host: "127.0.0.1", port }, () => { + socket.write(`${JSON.stringify(payload)}\n`); + socket.end(); + }); + socket.on("error", (e) => { + if (settled) { + return; + } + settled = true; + const message = e instanceof Error ? e.message : String(e); + resolve(err(`failed to send worker command: ${message}`)); + }); + socket.on("close", () => { + if (settled) { + return; + } + settled = true; + resolve(ok(undefined)); + }); + }); +} + +export async function resolveRunningHashForThread( + storageRoot: string, + threadId: string, +): Promise> { + const logsRoot = join(storageRoot, "logs"); + if (!(await pathExists(logsRoot))) { + return err(`thread not running (no logs dir): ${threadId}`); + } + const hashes = await readdir(logsRoot); + for (const hash of hashes) { + const runningPath = join(logsRoot, hash, `${threadId}.running`); + if (await pathExists(runningPath)) { + return ok(hash); + } + } + return err(`thread not running: ${threadId}`); +} diff --git a/packages/cli-workflow/src/workflow-name.ts b/packages/cli-workflow/src/workflow-name.ts new file mode 100644 index 0000000..ae27666 --- /dev/null +++ b/packages/cli-workflow/src/workflow-name.ts @@ -0,0 +1,12 @@ +import { err, ok, type Result } from "@uncaged/workflow"; + +const WORKFLOW_NAME_RE = /^[a-z][a-z0-9]*(-[a-z0-9]+)*$/; + +export function validateCliWorkflowName(name: string): Result { + if (!WORKFLOW_NAME_RE.test(name)) { + return err( + 'invalid workflow name: use verb-first kebab-case (lowercase letters, digits, hyphens), e.g. "solve-issue"', + ); + } + return ok(undefined); +} diff --git a/packages/workflow/__tests__/base32.test.ts b/packages/workflow/__tests__/base32.test.ts new file mode 100644 index 0000000..1087be7 --- /dev/null +++ b/packages/workflow/__tests__/base32.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, test } from "bun:test"; + +import { + decodeCrockfordBase32Bits, + decodeCrockfordToUint64, + encodeCrockfordBase32Bits, + encodeUint64AsCrockford, +} from "../src/base32.js"; + +describe("Crockford Base32", () => { + test("roundtrip 64-bit hash encoding", () => { + const value = 0xef46_db37_51d8_e999n; + const encoded = encodeUint64AsCrockford(value); + expect(encoded.length).toBe(13); + const decoded = decodeCrockfordToUint64(encoded); + expect(decoded.ok).toBe(true); + if (decoded.ok) { + expect(decoded.value).toBe(value); + } + }); + + test("roundtrip arbitrary bit widths used by ULID (128-bit)", () => { + const rand = 0x1234567890abcdef12n & ((1n << 80n) - 1n); + const payload = (12345n << 80n) | rand; + const encoded = encodeCrockfordBase32Bits(payload, 128); + expect(encoded.length).toBe(26); + const decoded = decodeCrockfordBase32Bits(encoded, 128); + expect(decoded.ok).toBe(true); + if (decoded.ok) { + expect(decoded.value).toBe(payload); + } + }); + + test("reject invalid characters", () => { + const decoded = decodeCrockfordToUint64("!!!!!!!!!!!!!"); + expect(decoded.ok).toBe(false); + }); +}); diff --git a/packages/workflow/__tests__/bundle-validator.test.ts b/packages/workflow/__tests__/bundle-validator.test.ts new file mode 100644 index 0000000..278c411 --- /dev/null +++ b/packages/workflow/__tests__/bundle-validator.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, test } from "bun:test"; + +import { validateWorkflowBundle } from "../src/bundle-validator.js"; + +describe("validateWorkflowBundle", () => { + test("accepts minimal valid builtin-only bundle", () => { + const source = `import fs from "node:fs"; + +export default async function run() { + fs.existsSync("."); + return { returnCode: 0, summary: "ok" }; +} +`; + const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source }); + expect(r.ok).toBe(true); + }); + + test("rejects wrong filename suffix", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.js", + source: "export default async function run() { return { returnCode: 0, summary: '' }; }\n", + }); + expect(r.ok).toBe(false); + }); + + test("rejects missing default export", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.esm.js", + source: "export const x = 1;\n", + }); + expect(r.ok).toBe(false); + if (!r.ok) { + expect(r.error).toContain("default export"); + } + }); + + test("rejects non-builtin imports", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.esm.js", + source: + 'import x from "some-package";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n', + }); + expect(r.ok).toBe(false); + }); + + test("rejects dynamic import", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.esm.js", + source: + 'export default async function run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n', + }); + expect(r.ok).toBe(false); + if (!r.ok) { + expect(r.error).toContain("dynamic import"); + } + }); + + test("rejects require()", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.esm.js", + source: + 'export default async function run() { require("fs"); return { returnCode: 0, summary: "" }; }\n', + }); + expect(r.ok).toBe(false); + }); +}); diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts new file mode 100644 index 0000000..b48efa0 --- /dev/null +++ b/packages/workflow/__tests__/engine.test.ts @@ -0,0 +1,137 @@ +import { describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { executeThread } from "../src/engine.js"; +import { createLogger } from "../src/logger.js"; +import { END, type WorkflowDefinition } from "../src/types.js"; + +type DemoMeta = { + planner: Record; + coder: Record; +}; + +const demoWorkflow: WorkflowDefinition = { + name: "demo-flow", + roles: { + planner: async () => ({ + content: "plan-body", + meta: { plan: "do-it", files: ["a.ts"] }, + }), + coder: async () => ({ + content: "code-body", + meta: { diff: "+ok" }, + }), + }, + moderator: (ctx) => { + if (ctx.steps.length === 0) { + return "planner"; + } + if (ctx.steps.length === 1) { + return "coder"; + } + return END; + }, +}; + +describe("executeThread", () => { + test("writes RFC-001 `.data.jsonl` start + role records and `.info.jsonl` logs", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-engine-")); + try { + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const hash = "C9NMV6V2TQT81"; + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); + await mkdir(join(root, "logs", hash), { recursive: true }); + + const logger = createLogger({ sink: { kind: "file", path: infoPath } }); + const ac = new AbortController(); + + const result = await executeThread( + demoWorkflow, + "Fix the login redirect bug in #3", + { isDryRun: false, maxRounds: 5, signal: ac.signal }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + logger, + ); + + expect(result.returnCode).toBe(0); + + const dataText = await readFile(dataPath, "utf8"); + const lines = dataText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(3); + + const start = JSON.parse(lines[0] ?? "{}") as Record; + expect(start.name).toBe("demo-flow"); + expect(start.hash).toBe(hash); + expect(start.threadId).toBe(threadId); + expect(typeof start.timestamp).toBe("number"); + + const params = start.parameters as Record; + expect(params.prompt).toBe("Fix the login redirect bug in #3"); + const opts = params.options as Record; + expect(opts.isDryRun).toBe(false); + expect(opts.maxRounds).toBe(5); + + const role1 = JSON.parse(lines[1] ?? "{}") as Record; + expect(role1.role).toBe("planner"); + expect(role1.content).toBe("plan-body"); + expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] }); + expect(typeof role1.timestamp).toBe("number"); + + const role2 = JSON.parse(lines[2] ?? "{}") as Record; + expect(role2.role).toBe("coder"); + + const infoText = await readFile(infoPath, "utf8"); + const infoLines = infoText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(infoLines.length).toBeGreaterThan(0); + const log0 = JSON.parse(infoLines[0] ?? "{}") as Record; + expect(typeof log0.tag).toBe("string"); + expect(String(log0.tag).length).toBe(8); + expect(typeof log0.content).toBe("string"); + expect(typeof log0.timestamp).toBe("number"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); + + test("respects maxRounds=0 (start record only)", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-engine-max0-")); + try { + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const hash = "C9NMV6V2TQT81"; + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); + await mkdir(join(root, "logs", hash), { recursive: true }); + + const logger = createLogger({ sink: { kind: "file", path: infoPath } }); + const ac = new AbortController(); + + const result = await executeThread( + demoWorkflow, + "hello", + { isDryRun: false, maxRounds: 0, signal: ac.signal }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + logger, + ); + + expect(result.returnCode).toBe(0); + + const dataText = await readFile(dataPath, "utf8"); + const lines = dataText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(1); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/workflow/__tests__/hash.test.ts b/packages/workflow/__tests__/hash.test.ts new file mode 100644 index 0000000..0c44e52 --- /dev/null +++ b/packages/workflow/__tests__/hash.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, test } from "bun:test"; + +import { decodeCrockfordToUint64 } from "../src/base32.js"; +import { hashWorkflowBundleBytes } from "../src/hash.js"; + +describe("hashWorkflowBundleBytes", () => { + test("matches XXH64 reference for empty input", () => { + const encoder = new TextEncoder(); + const digest = hashWorkflowBundleBytes(encoder.encode("")); + const decoded = decodeCrockfordToUint64(digest); + expect(decoded.ok).toBe(true); + if (decoded.ok) { + expect(decoded.value).toBe(0xef46_db37_51d8_e999n); + } + }); + + test("stable for identical content", () => { + const encoder = new TextEncoder(); + const data = encoder.encode( + "export default async function run() { return { returnCode: 0, summary: '' }; }\n", + ); + expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data)); + }); +}); diff --git a/packages/workflow/__tests__/logger.test.ts b/packages/workflow/__tests__/logger.test.ts new file mode 100644 index 0000000..213f4bf --- /dev/null +++ b/packages/workflow/__tests__/logger.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, test } from "bun:test"; +import { mkdir, readFile, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { createLogger } from "../src/logger.js"; + +describe("createLogger", () => { + test("writes JSONL records to a file sink", async () => { + const dir = join(tmpdir(), `wf-log-${process.pid}-${Date.now()}`); + await mkdir(dir, { recursive: true }); + const logPath = join(dir, "test.log"); + const log = createLogger({ sink: { kind: "file", path: logPath } }); + log("01ABCDEF", "hello"); + const text = await readFile(logPath, "utf8"); + const line = text.trim().split("\n")[0]; + expect(line).toBeDefined(); + const obj = JSON.parse(line ?? "{}") as { tag: string; content: string; timestamp: number }; + expect(obj.tag).toBe("01ABCDEF"); + expect(obj.content).toBe("hello"); + expect(typeof obj.timestamp).toBe("number"); + await rm(dir, { recursive: true, force: true }); + }); + + test("rejects invalid tags", () => { + const log = createLogger({ sink: { kind: "stderr" } }); + expect(() => log("BAD", "x")).toThrow(); + expect(() => log("01abcdefg", "x")).toThrow(); + expect(() => log("01ABCDEO", "x")).toThrow(); + }); +}); diff --git a/packages/workflow/__tests__/registry.test.ts b/packages/workflow/__tests__/registry.test.ts new file mode 100644 index 0000000..c5612d8 --- /dev/null +++ b/packages/workflow/__tests__/registry.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, test } from "bun:test"; +import { mkdir, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { + readWorkflowRegistry, + registerWorkflowVersion, + unregisterWorkflow, + writeWorkflowRegistry, +} from "../src/registry.js"; + +describe("workflow registry", () => { + test("roundtrips through workflow.yaml", async () => { + const dir = join(tmpdir(), `wf-reg-${process.pid}-${Date.now()}`); + await mkdir(dir, { recursive: true }); + + const empty = await readWorkflowRegistry(dir); + expect(empty.ok).toBe(true); + if (!empty.ok) { + return; + } + + const r1 = registerWorkflowVersion(empty.value, "solve-issue", "AAAAAAAAAAAAA", 100); + const w1 = await writeWorkflowRegistry(dir, r1); + expect(w1.ok).toBe(true); + + const back = await readWorkflowRegistry(dir); + expect(back.ok).toBe(true); + if (!back.ok) { + await rm(dir, { recursive: true, force: true }); + return; + } + expect(back.value.workflows["solve-issue"]?.hash).toBe("AAAAAAAAAAAAA"); + + const r2 = registerWorkflowVersion(back.value, "solve-issue", "BBBBBBBBBBBBB", 200); + expect(r2.workflows["solve-issue"]?.history[0]?.hash).toBe("AAAAAAAAAAAAA"); + + const removed = unregisterWorkflow(r2, "solve-issue"); + expect(removed.ok).toBe(true); + if (!removed.ok) { + await rm(dir, { recursive: true, force: true }); + return; + } + + const w2 = await writeWorkflowRegistry(dir, removed.value); + expect(w2.ok).toBe(true); + + const finalRead = await readWorkflowRegistry(dir); + expect(finalRead.ok).toBe(true); + if (finalRead.ok) { + expect(finalRead.value.workflows["solve-issue"]).toBeUndefined(); + } + + await rm(dir, { recursive: true, force: true }); + }); + + test("treats missing registry as empty", async () => { + const dir = join(tmpdir(), `wf-reg2-${process.pid}-${Date.now()}`); + await mkdir(dir, { recursive: true }); + const empty = await readWorkflowRegistry(dir); + expect(empty.ok).toBe(true); + if (empty.ok) { + expect(Object.keys(empty.value.workflows).length).toBe(0); + } + await rm(dir, { recursive: true, force: true }); + }); + + test("parse errors on invalid shape", async () => { + const dir = join(tmpdir(), `wf-reg3-${process.pid}-${Date.now()}`); + await mkdir(dir, { recursive: true }); + await writeFile(join(dir, "workflow.yaml"), 'workflows: "broken"\n', "utf8"); + const bad = await readWorkflowRegistry(dir); + expect(bad.ok).toBe(false); + await rm(dir, { recursive: true, force: true }); + }); +}); diff --git a/packages/workflow/__tests__/result.test.ts b/packages/workflow/__tests__/result.test.ts new file mode 100644 index 0000000..310c028 --- /dev/null +++ b/packages/workflow/__tests__/result.test.ts @@ -0,0 +1,21 @@ +import { describe, expect, test } from "bun:test"; + +import { err, ok } from "../src/result.js"; + +describe("result helpers", () => { + test("ok wraps value", () => { + const r = ok(42); + expect(r.ok).toBe(true); + if (r.ok) { + expect(r.value).toBe(42); + } + }); + + test("err wraps error", () => { + const r = err("nope"); + expect(r.ok).toBe(false); + if (!r.ok) { + expect(r.error).toBe("nope"); + } + }); +}); diff --git a/packages/workflow/__tests__/thread-jsonl-format.test.ts b/packages/workflow/__tests__/thread-jsonl-format.test.ts new file mode 100644 index 0000000..05bfce5 --- /dev/null +++ b/packages/workflow/__tests__/thread-jsonl-format.test.ts @@ -0,0 +1,41 @@ +import { describe, expect, test } from "bun:test"; + +describe("RFC-001 thread JSONL shapes", () => { + test("documents the `.data.jsonl` start record + role record keys", () => { + const startRecord = { + name: "solve-issue", + hash: "C9NMV6V2TQT81", + threadId: "01KQXKW18CT8G75T53R8F4G7YG", + parameters: { + prompt: "Fix the login redirect bug in #3", + options: { + isDryRun: false, + maxRounds: 5, + }, + }, + timestamp: 1714963200000, + }; + + const roleRecord = { + role: "planner", + content: "Plan: modify auth middleware...", + meta: { plan: "...", files: ["src/auth.ts"] }, + timestamp: 1714963201000, + }; + + expect(Object.keys(startRecord).sort()).toEqual( + ["hash", "name", "parameters", "threadId", "timestamp"].sort(), + ); + expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort()); + }); + + test("documents the `.info.jsonl` debug record keys", () => { + const infoRecord = { + tag: "4KNMR2PX", + content: "Loading workflow bundle...", + timestamp: 1714963200500, + }; + + expect(Object.keys(infoRecord).sort()).toEqual(["content", "tag", "timestamp"].sort()); + }); +}); diff --git a/packages/workflow/__tests__/ulid.test.ts b/packages/workflow/__tests__/ulid.test.ts new file mode 100644 index 0000000..f1998f1 --- /dev/null +++ b/packages/workflow/__tests__/ulid.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, test } from "bun:test"; + +import { decodeCrockfordBase32Bits } from "../src/base32.js"; +import { generateUlid } from "../src/ulid.js"; + +describe("generateUlid", () => { + test("length and decodable Crockford payload", () => { + const id = generateUlid(1_714_963_200_000); + expect(id.length).toBe(26); + const decoded = decodeCrockfordBase32Bits(id, 128); + expect(decoded.ok).toBe(true); + }); + + test("embeds 48-bit timestamp at the MSB of the 128-bit payload", () => { + const ts = 9_999_888_777_666; + const id = generateUlid(ts); + const decoded = decodeCrockfordBase32Bits(id, 128); + expect(decoded.ok).toBe(true); + if (decoded.ok) { + const recoveredMs = decoded.value >> 80n; + expect(Number(recoveredMs)).toBe(ts); + } + }); + + test("rejects out-of-range timestamps", () => { + expect(() => generateUlid(-1)).toThrow(); + expect(() => generateUlid(2 ** 48)).toThrow(); + }); +}); diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts new file mode 100644 index 0000000..5f13817 --- /dev/null +++ b/packages/workflow/__tests__/worker.test.ts @@ -0,0 +1,120 @@ +import { describe, expect, test } from "bun:test"; +import { spawn } from "node:child_process"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { createConnection } from "node:net"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; + +const bundleSource = `export default { + name: "demo-flow", + roles: { + planner: async () => ({ content: "p", meta: { plan: "x" } }), + coder: async () => ({ content: "c", meta: { diff: "y" } }), + }, + moderator(ctx) { + if (ctx.steps.length === 0) return "planner"; + if (ctx.steps.length === 1) return "coder"; + return "__end__"; + }, +}; +`; + +async function readReadyPort(child: import("node:child_process").ChildProcess): Promise { + return await new Promise((resolve, reject) => { + if (child.stdout === null) { + reject(new Error("missing stdout")); + return; + } + + let buf = ""; + function cleanup(): void { + child.stdout?.off("data", onData); + child.off("exit", onExit); + } + + function onData(chunk: Buffer): void { + buf += chunk.toString("utf8"); + const nl = buf.indexOf("\n"); + if (nl < 0) { + return; + } + cleanup(); + const line = buf.slice(0, nl).trim(); + const prefix = "READY "; + if (!line.startsWith(prefix)) { + reject(new Error(`unexpected READY line: ${line}`)); + return; + } + resolve(Number(line.slice(prefix.length))); + } + + function onExit(code: number | null): void { + cleanup(); + reject(new Error(`worker exited before READY (code ${code})`)); + } + + child.stdout.on("data", onData); + child.on("exit", onExit); + }); +} + +async function sendJson(port: number, payload: unknown): Promise { + await new Promise((resolve, reject) => { + const socket = createConnection({ host: "127.0.0.1", port }, () => { + socket.write(`${JSON.stringify(payload)}\n`); + socket.end(); + }); + socket.on("error", reject); + socket.on("close", () => resolve()); + }); +} + +describe("worker process", () => { + test("loads bundle, runs a thread over TCP, then exits when idle", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-worker-")); + try { + const hash = "C9NMV6V2TQT81"; + await mkdir(join(root, "bundles"), { recursive: true }); + const bundlePath = join(root, "bundles", `${hash}.esm.js`); + await writeFile(bundlePath, bundleSource, "utf8"); + + const scriptPath = getWorkerHostScriptPath(); + const child = spawn(process.execPath, [scriptPath, bundlePath, root, hash], { + stdio: ["ignore", "pipe", "inherit"], + }); + + if (child.stdout === null) { + throw new Error("missing stdout"); + } + + const port = await readReadyPort(child); + + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + await sendJson(port, { + type: "run", + threadId, + prompt: "hello", + options: { isDryRun: false, maxRounds: 5 }, + }); + + const exitCode: number = await new Promise((resolve) => { + child.on("exit", (code) => resolve(code ?? 1)); + }); + + expect(exitCode).toBe(0); + + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const text = await readFile(dataPath, "utf8"); + expect( + text + .trim() + .split("\n") + .filter((l) => l !== "").length, + ).toBe(3); + } finally { + await rm(root, { recursive: true, force: true }); + } + }, 15_000); +}); diff --git a/packages/workflow/src/base32.ts b/packages/workflow/src/base32.ts index 683abe7..f7cd939 100644 --- a/packages/workflow/src/base32.ts +++ b/packages/workflow/src/base32.ts @@ -1,7 +1,7 @@ import { err, ok, type Result } from "./result.js"; -/** Crockford Base32 alphabet (no I, L, O, U). */ -export const CROCKFORD_BASE32_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTVWXZ"; +/** Crockford Base32 alphabet (no I, L, O, U) — exactly 32 symbols. */ +export const CROCKFORD_BASE32_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"; const DECODE_MAP: Record = (() => { const map: Record = {}; @@ -31,13 +31,16 @@ export function encodeCrockfordBase32Bits(value: bigint, bitLength: number): str let result = ""; for (let i = 0; i < charCount; i++) { const shift = totalBits - 5 * (i + 1); - const quintet = Number((shifted >> BigInt(shift)) & 0x1fn); + const quintet = Number((shifted >> BigInt(shift)) & 31n); result += CROCKFORD_BASE32_ALPHABET[quintet]; } return result; } -export function decodeCrockfordBase32Bits(encoded: string, bitLength: number): Result { +export function decodeCrockfordBase32Bits( + encoded: string, + bitLength: number, +): Result { if (bitLength <= 0) { return err(new Error("bitLength must be positive")); } @@ -57,7 +60,7 @@ export function decodeCrockfordBase32Bits(encoded: string, bitLength: number): R if (val === undefined) { return err(new Error(`invalid Crockford Base32 character: ${ch}`)); } - shifted = (shifted << 5n) | BigInt(val); + shifted = (shifted << 5n) | BigInt(val & 31); } return ok(shifted >> BigInt(padBits)); } diff --git a/packages/workflow/src/bundle-validator.ts b/packages/workflow/src/bundle-validator.ts index 7f16a36..849ac76 100644 --- a/packages/workflow/src/bundle-validator.ts +++ b/packages/workflow/src/bundle-validator.ts @@ -1,7 +1,13 @@ import { isBuiltin } from "node:module"; - +import type { + CallExpression, + ExportAllDeclaration, + ExportNamedDeclaration, + ImportDeclaration, + Node, + Program, +} from "acorn"; import * as acorn from "acorn"; -import type { Node, Program } from "acorn"; import { err, ok, type Result } from "./result.js"; @@ -26,22 +32,36 @@ function isAllowedImportSpecifier(spec: string): boolean { return isBuiltin(spec); } -function walk(node: Node, visit: (n: Node) => void): void { - visit(node); +function pushNestedAstNodes(value: unknown, out: Node[]): void { + if (value === null || value === undefined) { + return; + } + if (Array.isArray(value)) { + for (const item of value) { + if (item !== null && typeof item === "object" && "type" in item) { + out.push(item as Node); + } + } + return; + } + if (typeof value === "object" && "type" in value) { + out.push(value as Node); + } +} + +function collectChildNodes(node: Node): Node[] { + const children: Node[] = []; for (const key of Object.keys(node)) { const val = (node as Record)[key]; - if (val === null || val === undefined) { - continue; - } - if (Array.isArray(val)) { - for (const item of val) { - if (item !== null && typeof item === "object" && "type" in item) { - walk(item as Node, visit); - } - } - } else if (typeof val === "object" && "type" in val) { - walk(val as Node, visit); - } + pushNestedAstNodes(val, children); + } + return children; +} + +function walkAst(node: Node, visit: (n: Node) => void): void { + visit(node); + for (const child of collectChildNodes(node)) { + walkAst(child, visit); } } @@ -54,6 +74,85 @@ function programHasDefaultExport(body: readonly Node[]): boolean { return false; } +function stringLiteralModuleSpecifier(src: Node): string | null { + if (src.type !== "Literal" || typeof src.value !== "string") { + return null; + } + return src.value; +} + +function validateImportDeclaration(node: ImportDeclaration): string | null { + const spec = stringLiteralModuleSpecifier(node.source); + if (spec === null) { + return "only static string import specifiers are allowed"; + } + if (!isAllowedImportSpecifier(spec)) { + return `disallowed import specifier "${spec}" (only Node built-ins are allowed)`; + } + return null; +} + +function validateExportSource( + src: Node, + staticMessage: string, + disallowedPrefix: string, +): string | null { + const spec = stringLiteralModuleSpecifier(src); + if (spec === null) { + return staticMessage; + } + if (!isAllowedImportSpecifier(spec)) { + return `${disallowedPrefix} "${spec}" (only Node built-ins are allowed)`; + } + return null; +} + +function validateExportNamedDeclaration(node: ExportNamedDeclaration): string | null { + if (node.source === null || node.source === undefined) { + return null; + } + return validateExportSource( + node.source, + "only static string re-export specifiers are allowed", + "disallowed re-export specifier", + ); +} + +function validateExportAllDeclaration(node: ExportAllDeclaration): string | null { + return validateExportSource( + node.source, + "only static string export-all specifiers are allowed", + "disallowed export-all specifier", + ); +} + +function validateRequireCall(node: CallExpression): string | null { + const callee = node.callee; + if (callee.type === "Identifier" && callee.name === "require") { + return "require() is not allowed in workflow bundles"; + } + return null; +} + +function bundleConstraintViolationForNode(node: Node): string | null { + if (node.type === "ImportExpression") { + return "dynamic import() is not allowed in workflow bundles"; + } + if (node.type === "ImportDeclaration") { + return validateImportDeclaration(node); + } + if (node.type === "ExportNamedDeclaration") { + return validateExportNamedDeclaration(node); + } + if (node.type === "ExportAllDeclaration") { + return validateExportAllDeclaration(node); + } + if (node.type === "CallExpression") { + return validateRequireCall(node); + } + return null; +} + /** * Validate RFC-001 bundle rules: single-file ESM shape, default export, * no dynamic `import()`, static imports restricted to Node builtins. @@ -84,58 +183,19 @@ export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Re return err("workflow bundle must have a default export"); } - let walkError: string | null = null; - walk(ast, (n) => { - if (walkError !== null) { + let violation: string | null = null; + walkAst(ast, (node) => { + if (violation !== null) { return; } - if (n.type === "ImportExpression") { - walkError = "dynamic import() is not allowed in workflow bundles"; - return; - } - if (n.type === "ImportDeclaration") { - const src = n.source; - if (src.type !== "Literal" || typeof src.value !== "string") { - walkError = "only static string import specifiers are allowed"; - return; - } - if (!isAllowedImportSpecifier(src.value)) { - walkError = `disallowed import specifier "${src.value}" (only Node built-ins are allowed)`; - } - return; - } - if (n.type === "ExportNamedDeclaration" && n.source !== null && n.source !== undefined) { - const src = n.source; - if (src.type !== "Literal" || typeof src.value !== "string") { - walkError = "only static string re-export specifiers are allowed"; - return; - } - if (!isAllowedImportSpecifier(src.value)) { - walkError = `disallowed re-export specifier "${src.value}" (only Node built-ins are allowed)`; - } - return; - } - if (n.type === "ExportAllDeclaration") { - const src = n.source; - if (src.type !== "Literal" || typeof src.value !== "string") { - walkError = "only static string export-all specifiers are allowed"; - return; - } - if (!isAllowedImportSpecifier(src.value)) { - walkError = `disallowed export-all specifier "${src.value}" (only Node built-ins are allowed)`; - } - return; - } - if (n.type === "CallExpression") { - const c = n.callee; - if (c.type === "Identifier" && c.name === "require") { - walkError = "require() is not allowed in workflow bundles"; - } + const next = bundleConstraintViolationForNode(node); + if (next !== null) { + violation = next; } }); - if (walkError !== null) { - return err(walkError); + if (violation !== null) { + return err(violation); } return ok(undefined); diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts new file mode 100644 index 0000000..7855f4d --- /dev/null +++ b/packages/workflow/src/engine.ts @@ -0,0 +1,143 @@ +import { appendFile, mkdir } from "node:fs/promises"; +import { dirname } from "node:path"; + +import type { LogFn } from "./logger.js"; +import { + END, + type RoleMeta, + type RoleStep, + START, + type ThreadContext, + type WorkflowDefinition, +} from "./types.js"; + +export type ExecuteThreadIo = { + threadId: string; + hash: string; + dataJsonlPath: string; + infoJsonlPath: string; +}; + +export type ExecuteThreadOptions = { + isDryRun: boolean; + maxRounds: number; + signal: AbortSignal; +}; + +function isRoleNext( + next: (keyof M & string) | typeof END, +): next is keyof M & string { + return next !== END; +} + +async function appendDataLine(path: string, record: unknown): Promise { + const line = `${JSON.stringify(record)}\n`; + await appendFile(path, line, "utf8"); +} + +/** + * Execute a workflow thread: moderator loop, role steps, RFC-001 `.data.jsonl` records, + * debug lines via `logger` to `.info.jsonl`. + */ +export async function executeThread( + def: WorkflowDefinition, + prompt: string, + options: ExecuteThreadOptions, + io: ExecuteThreadIo, + logger: LogFn, +): Promise<{ returnCode: number; summary: string }> { + await mkdir(dirname(io.dataJsonlPath), { recursive: true }); + await mkdir(dirname(io.infoJsonlPath), { recursive: true }); + + const nowMs = Date.now(); + const start: ThreadContext["start"] = { + role: START, + content: prompt, + meta: { maxRounds: options.maxRounds, threadId: io.threadId }, + timestamp: nowMs, + }; + + const startRecord = { + name: def.name, + hash: io.hash, + threadId: io.threadId, + parameters: { + prompt, + options: { + isDryRun: options.isDryRun, + maxRounds: options.maxRounds, + }, + }, + timestamp: nowMs, + }; + + await appendDataLine(io.dataJsonlPath, startRecord); + + let steps: RoleStep[] = []; + + logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${def.name}`); + + while (true) { + if (options.signal.aborted) { + logger("V8JX4NP2", `thread ${io.threadId} aborted`); + return { returnCode: 130, summary: "thread aborted" }; + } + + if (steps.length >= options.maxRounds) { + logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); + return { + returnCode: 0, + summary: `completed: reached maxRounds (${options.maxRounds})`, + }; + } + + const ctx: ThreadContext = { + threadId: io.threadId, + start, + steps, + }; + + const next = def.moderator(ctx); + + if (!isRoleNext(next)) { + logger("M5FZ2K8H", `thread ${io.threadId} moderator returned END`); + return { returnCode: 0, summary: "completed: moderator returned END" }; + } + + const roleFn = def.roles[next]; + if (roleFn === undefined) { + logger("K2P8QX9W", `thread ${io.threadId} unknown role ${next}`); + return { returnCode: 1, summary: `unknown role: ${next}` }; + } + + if (options.signal.aborted) { + logger("V8JX4NP3", `thread ${io.threadId} aborted`); + return { returnCode: 130, summary: "thread aborted" }; + } + + const result = await roleFn(ctx); + + const ts = Date.now(); + const step: RoleStep = { + role: next, + content: result.content, + meta: result.meta, + timestamp: ts, + } as RoleStep; + + await appendDataLine(io.dataJsonlPath, { + role: step.role, + content: step.content, + meta: step.meta, + timestamp: step.timestamp, + }); + + steps = [...steps, step]; + logger("N7BW4YHQ", `thread ${io.threadId} completed role ${next}`); + + if (options.signal.aborted) { + logger("V8JX4NP4", `thread ${io.threadId} aborted`); + return { returnCode: 130, summary: "thread aborted" }; + } + } +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 93e00a4..b9c365d 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -6,10 +6,15 @@ export { encodeUint64AsCrockford, } from "./base32.js"; export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js"; +export { + type ExecuteThreadIo, + type ExecuteThreadOptions, + executeThread, +} from "./engine.js"; export { hashWorkflowBundleBytes } from "./hash.js"; export { - createLogger, type CreateLoggerOptions, + createLogger, type LogFn, type LoggerSink, } from "./logger.js"; @@ -21,12 +26,26 @@ export { registerWorkflowVersion, stringifyWorkflowRegistryYaml, unregisterWorkflow, - workflowRegistryPath, - writeWorkflowRegistry, type WorkflowHistoryEntry, type WorkflowRegistryEntry, type WorkflowRegistryFile, + workflowRegistryPath, + writeWorkflowRegistry, } from "./registry.js"; export { err, ok, type Result } from "./result.js"; export { getDefaultWorkflowStorageRoot } from "./storage-root.js"; +export { + type AgentFn, + END, + type Moderator, + type Role, + type RoleMeta, + type RoleResult, + type RoleStep, + START, + type StartStep, + type ThreadContext, + type WorkflowDefinition, +} from "./types.js"; export { generateUlid } from "./ulid.js"; +export { getWorkerHostScriptPath } from "./worker-entry-path.js"; diff --git a/packages/workflow/src/logger.ts b/packages/workflow/src/logger.ts index 7f70316..26a019c 100644 --- a/packages/workflow/src/logger.ts +++ b/packages/workflow/src/logger.ts @@ -1,7 +1,11 @@ import { appendFileSync } from "node:fs"; +import { CROCKFORD_BASE32_ALPHABET } from "./base32.js"; + const TAG_LENGTH = 8; +const TAG_CHAR_SET: ReadonlySet = new Set(CROCKFORD_BASE32_ALPHABET.split("")); + function assertValidLogTag(tag: string): void { if (tag.length !== TAG_LENGTH) { throw new Error(`log tag must be exactly ${TAG_LENGTH} characters`); @@ -12,15 +16,13 @@ function assertValidLogTag(tag: string): void { throw new Error("log tag validation failed"); } const upper = ch.toUpperCase(); - if (!/[0-9A-HJKMNP-TV-Z]/.test(upper)) { + if (!TAG_CHAR_SET.has(upper)) { throw new Error(`invalid Crockford Base32 character in log tag: ${ch}`); } } } -export type LoggerSink = - | { kind: "stderr" } - | { kind: "file"; path: string }; +export type LoggerSink = { kind: "stderr" } | { kind: "file"; path: string }; export type CreateLoggerOptions = { sink: LoggerSink; @@ -33,12 +35,11 @@ export function createLogger(options: CreateLoggerOptions): LogFn { if (options.sink.kind === "stderr") { return (tag: string, content: string) => { assertValidLogTag(tag); - const line = - `${JSON.stringify({ - tag: tag.toUpperCase(), - content, - timestamp: Date.now(), - })}\n`; + const line = `${JSON.stringify({ + tag: tag.toUpperCase(), + content, + timestamp: Date.now(), + })}\n`; process.stderr.write(line); }; } @@ -46,12 +47,11 @@ export function createLogger(options: CreateLoggerOptions): LogFn { const filePath = options.sink.path; return (tag: string, content: string) => { assertValidLogTag(tag); - const line = - `${JSON.stringify({ - tag: tag.toUpperCase(), - content, - timestamp: Date.now(), - })}\n`; + const line = `${JSON.stringify({ + tag: tag.toUpperCase(), + content, + timestamp: Date.now(), + })}\n`; appendFileSync(filePath, line, "utf8"); }; } diff --git a/packages/workflow/src/registry-normalize.ts b/packages/workflow/src/registry-normalize.ts new file mode 100644 index 0000000..d76d7dc --- /dev/null +++ b/packages/workflow/src/registry-normalize.ts @@ -0,0 +1,77 @@ +import type { + WorkflowHistoryEntry, + WorkflowRegistryEntry, + WorkflowRegistryFile, +} from "./registry-types.js"; +import { err, ok, type Result } from "./result.js"; + +export function normalizeWorkflowHistoryEntry( + workflowName: string, + index: number, + raw: unknown, +): Result { + if (raw === null || typeof raw !== "object") { + return err(new Error(`workflow "${workflowName}" history[${index}] must be a mapping`)); + } + const he = raw as Record; + const hash = he.hash; + const timestamp = he.timestamp; + if (typeof hash !== "string" || typeof timestamp !== "number" || !Number.isFinite(timestamp)) { + return err( + new Error(`workflow "${workflowName}" history[${index}] must have hash and timestamp`), + ); + } + return ok({ hash, timestamp }); +} + +export function normalizeWorkflowRegistryEntry( + workflowName: string, + raw: unknown, +): Result { + if (raw === null || typeof raw !== "object") { + return err(new Error(`workflow "${workflowName}" must be a mapping`)); + } + const e = raw as Record; + const hash = e.hash; + const timestamp = e.timestamp; + const historyRaw = e.history; + if (typeof hash !== "string") { + return err(new Error(`workflow "${workflowName}" must have a string hash`)); + } + if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) { + return err(new Error(`workflow "${workflowName}" must have a finite numeric timestamp`)); + } + if (!Array.isArray(historyRaw)) { + return err(new Error(`workflow "${workflowName}" must have a history array`)); + } + const history: WorkflowHistoryEntry[] = []; + for (let i = 0; i < historyRaw.length; i++) { + const item = historyRaw[i]; + const next = normalizeWorkflowHistoryEntry(workflowName, i, item); + if (!next.ok) { + return next; + } + history.push(next.value); + } + return ok({ hash, timestamp, history }); +} + +export function normalizeWorkflowRegistryRoot(raw: unknown): Result { + if (raw === null || typeof raw !== "object") { + return err(new Error("registry root must be a mapping")); + } + const root = raw as Record; + const workflowsRaw = root.workflows; + if (workflowsRaw === null || workflowsRaw === undefined || typeof workflowsRaw !== "object") { + return err(new Error('registry must contain a "workflows" mapping')); + } + const workflows: Record = {}; + for (const [name, entryRaw] of Object.entries(workflowsRaw)) { + const entryResult = normalizeWorkflowRegistryEntry(name, entryRaw); + if (!entryResult.ok) { + return entryResult; + } + workflows[name] = entryResult.value; + } + return ok({ workflows }); +} diff --git a/packages/workflow/src/registry-types.ts b/packages/workflow/src/registry-types.ts new file mode 100644 index 0000000..9789426 --- /dev/null +++ b/packages/workflow/src/registry-types.ts @@ -0,0 +1,14 @@ +export type WorkflowHistoryEntry = { + hash: string; + timestamp: number; +}; + +export type WorkflowRegistryEntry = { + hash: string; + timestamp: number; + history: WorkflowHistoryEntry[]; +}; + +export type WorkflowRegistryFile = { + workflows: Record; +}; diff --git a/packages/workflow/src/registry.ts b/packages/workflow/src/registry.ts index d9f073b..5516bc8 100644 --- a/packages/workflow/src/registry.ts +++ b/packages/workflow/src/registry.ts @@ -3,22 +3,19 @@ import { dirname, join } from "node:path"; import { parseDocument, stringify } from "yaml"; +import { normalizeWorkflowRegistryRoot } from "./registry-normalize.js"; +import type { + WorkflowHistoryEntry, + WorkflowRegistryEntry, + WorkflowRegistryFile, +} from "./registry-types.js"; import { err, ok, type Result } from "./result.js"; -export type WorkflowHistoryEntry = { - hash: string; - timestamp: number; -}; - -export type WorkflowRegistryEntry = { - hash: string; - timestamp: number; - history: WorkflowHistoryEntry[]; -}; - -export type WorkflowRegistryFile = { - workflows: Record; -}; +export type { + WorkflowHistoryEntry, + WorkflowRegistryEntry, + WorkflowRegistryFile, +} from "./registry-types.js"; export function workflowRegistryPath(storageRoot: string): string { return join(storageRoot, "workflow.yaml"); @@ -28,50 +25,6 @@ function emptyRegistry(): WorkflowRegistryFile { return { workflows: {} }; } -function normalizeRegistry(raw: unknown): Result { - if (raw === null || typeof raw !== "object") { - return err(new Error("registry root must be a mapping")); - } - const root = raw as Record; - const workflowsRaw = root.workflows; - if (workflowsRaw === null || workflowsRaw === undefined || typeof workflowsRaw !== "object") { - return err(new Error('registry must contain a "workflows" mapping')); - } - const workflows: Record = {}; - for (const [name, entryRaw] of Object.entries(workflowsRaw)) { - if (entryRaw === null || typeof entryRaw !== "object") { - return err(new Error(`workflow "${name}" must be a mapping`)); - } - const e = entryRaw as Record; - const hash = e.hash; - const timestamp = e.timestamp; - const historyRaw = e.history; - if (typeof hash !== "string") { - return err(new Error(`workflow "${name}" must have a string hash`)); - } - if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) { - return err(new Error(`workflow "${name}" must have a finite numeric timestamp`)); - } - if (!Array.isArray(historyRaw)) { - return err(new Error(`workflow "${name}" must have a history array`)); - } - const history: WorkflowHistoryEntry[] = []; - for (let i = 0; i < historyRaw.length; i++) { - const h = historyRaw[i]; - if (h === null || typeof h !== "object") { - return err(new Error(`workflow "${name}" history[${i}] must be a mapping`)); - } - const he = h as Record; - if (typeof he.hash !== "string" || typeof he.timestamp !== "number" || !Number.isFinite(he.timestamp)) { - return err(new Error(`workflow "${name}" history[${i}] must have hash and timestamp`)); - } - history.push({ hash: he.hash, timestamp: he.timestamp }); - } - workflows[name] = { hash, timestamp, history }; - } - return ok({ workflows }); -} - export function parseWorkflowRegistryYaml(text: string): Result { if (text.trim() === "") { return ok(emptyRegistry()); @@ -82,14 +35,16 @@ export function parseWorkflowRegistryYaml(text: string): Result> { +export async function readWorkflowRegistry( + storageRoot: string, +): Promise> { const path = workflowRegistryPath(storageRoot); let text: string; try { diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts new file mode 100644 index 0000000..66aa229 --- /dev/null +++ b/packages/workflow/src/types.ts @@ -0,0 +1,63 @@ +/** Sentinel values for automaton control flow. */ +export const START = "__start__" as const; +export const END = "__end__" as const; + +/** Maps role names → their meta types. Single generic drives all inference. */ +export type RoleMeta = Record>; + +/** Typed output of a Role execution. */ +export type RoleResult> = { + content: string; + meta: Meta; +}; + +/** Engine start frame: initial prompt + thread identity. */ +export type StartStep = { + role: typeof START; + content: string; + meta: { maxRounds: number; threadId: string }; + timestamp: number; +}; + +/** A completed role step in the thread. */ +export type RoleStep = { + [K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number }; +}[keyof M & string]; + +/** Thread-scoped context passed to roles and moderator. */ +export type ThreadContext = { + threadId: string; + start: StartStep; + steps: RoleStep[]; +}; + +/** + * A Role — receives full thread context, returns typed content + meta. + * Implementation can be an agent, LLM call, script, HTTP request, etc. + */ +export type Role> = ( + ctx: ThreadContext, +) => Promise>; + +/** + * An Agent — raw string output interface for LLM/CLI adapters. + * Structured meta is extracted by the role's extract layer. + */ +export type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise; + +/** + * The Moderator — a pure routing function. + * Receives the full thread context (start + all prior steps). + * On initial call, `steps` is empty. + * Returns the next role name or END to terminate. + */ +export type Moderator = ( + ctx: ThreadContext, +) => (keyof M & string) | typeof END; + +/** Complete workflow definition as authored by users. */ +export type WorkflowDefinition = { + name: string; + roles: { [K in keyof M & string]: Role }; + moderator: Moderator; +}; diff --git a/packages/workflow/src/worker-entry-path.ts b/packages/workflow/src/worker-entry-path.ts new file mode 100644 index 0000000..5690bd2 --- /dev/null +++ b/packages/workflow/src/worker-entry-path.ts @@ -0,0 +1,6 @@ +import { fileURLToPath } from "node:url"; + +/** Absolute path to `worker-host.ts` for spawning bundle worker processes. */ +export function getWorkerHostScriptPath(): string { + return fileURLToPath(new URL("./worker.ts", import.meta.url)); +} diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts new file mode 100644 index 0000000..23e2356 --- /dev/null +++ b/packages/workflow/src/worker.ts @@ -0,0 +1,295 @@ +import { mkdir, unlink, writeFile } from "node:fs/promises"; +import { createServer, type Socket } from "node:net"; +import { dirname, join } from "node:path"; +import { pathToFileURL } from "node:url"; + +import { type ExecuteThreadIo, executeThread } from "./engine.js"; +import { createLogger } from "./logger.js"; +import type { RoleMeta, WorkflowDefinition } from "./types.js"; + +const bootLog = createLogger({ sink: { kind: "stderr" } }); + +type RunCommand = { + type: "run"; + threadId: string; + prompt: string; + options: { isDryRun: boolean; maxRounds: number }; +}; + +type KillCommand = { + type: "kill"; + threadId: string; +}; + +type ControlCommand = RunCommand | KillCommand; + +function parseControlPayload(payload: unknown): ControlCommand | null { + if (payload === null || typeof payload !== "object") { + return null; + } + const rec = payload as Record; + const type = rec.type; + if (type === "kill") { + const threadId = rec.threadId; + if (typeof threadId !== "string") { + return null; + } + return { type: "kill", threadId }; + } + if (type === "run") { + const threadId = rec.threadId; + const prompt = rec.prompt; + const options = rec.options; + if (typeof threadId !== "string" || typeof prompt !== "string") { + return null; + } + if (options === null || typeof options !== "object") { + return null; + } + const optRec = options as Record; + const isDryRun = optRec.isDryRun; + const maxRounds = optRec.maxRounds; + if (typeof isDryRun !== "boolean" || typeof maxRounds !== "number") { + return null; + } + return { + type: "run", + threadId, + prompt, + options: { isDryRun, maxRounds }, + }; + } + return null; +} + +function parseCommandLine(line: string): ControlCommand | null { + const trimmed = line.trim(); + if (trimmed === "") { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(trimmed) as unknown; + } catch { + bootLog("S8KQ3WJP", "worker received invalid JSON control line"); + return null; + } + return parseControlPayload(parsed); +} + +function isWorkflowDefinitionLike(value: unknown): value is WorkflowDefinition { + if (value === null || typeof value !== "object") { + return false; + } + const rec = value as Record; + if (typeof rec.name !== "string") { + return false; + } + if (rec.roles === null || typeof rec.roles !== "object") { + return false; + } + if (typeof rec.moderator !== "function") { + return false; + } + return true; +} + +async function readLineFromSocket(socket: Socket): Promise { + return await new Promise((resolve) => { + let buf = ""; + function onData(chunk: Buffer): void { + buf += chunk.toString("utf8"); + const nl = buf.indexOf("\n"); + if (nl >= 0) { + cleanup(); + resolve(buf.slice(0, nl)); + } + } + function onEnd(): void { + cleanup(); + resolve(buf === "" ? null : buf); + } + function onError(): void { + cleanup(); + resolve(null); + } + function cleanup(): void { + socket.off("data", onData); + socket.off("end", onEnd); + socket.off("error", onError); + } + socket.on("data", onData); + socket.on("end", onEnd); + socket.on("error", onError); + }); +} + +async function main(): Promise { + const bundlePath = process.argv[2]; + const storageRoot = process.argv[3]; + const hash = process.argv[4]; + + if ( + bundlePath === undefined || + storageRoot === undefined || + hash === undefined || + bundlePath === "" || + storageRoot === "" || + hash === "" + ) { + bootLog("H7XN4MKQ", "worker usage: worker "); + process.exit(2); + return; + } + + // Dynamic import required: user bundle path resolved at runtime + const modUnknown: unknown = await import(pathToFileURL(bundlePath).href); + const modRec = modUnknown as Record; + const defaultExport = modRec.default; + if (!isWorkflowDefinitionLike(defaultExport)) { + bootLog( + "T4BW9YJX", + "workflow bundle default export must be a WorkflowDefinition { name, roles, moderator }", + ); + process.exit(2); + return; + } + const def = defaultExport; + + const controllers = new Map(); + let activeThreads = 0; + let shutdownTimer: ReturnType | null = null; + + const workerCtlPath = join(storageRoot, "workers", `${hash}.json`); + + function cancelShutdownTimer(): void { + if (shutdownTimer !== null) { + clearTimeout(shutdownTimer); + shutdownTimer = null; + } + } + + function scheduleShutdown(): void { + cancelShutdownTimer(); + shutdownTimer = setTimeout(() => { + void unlink(workerCtlPath).catch(() => {}); + process.exit(0); + }, 150); + } + + function bumpStart(): void { + cancelShutdownTimer(); + activeThreads++; + } + + function bumpDone(): void { + activeThreads--; + if (activeThreads <= 0) { + activeThreads = 0; + scheduleShutdown(); + } + } + + async function dispatchCommand(cmd: ControlCommand, socket: Socket | null): Promise { + if (cmd.type === "kill") { + const ac = controllers.get(cmd.threadId); + if (ac !== undefined) { + ac.abort(); + bootLog("P9XK2WNQ", `kill requested for thread ${cmd.threadId}`); + } + socket?.end(); + return; + } + + bumpStart(); + + const threadId = cmd.threadId; + const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`); + const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`); + const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`); + + const io: ExecuteThreadIo = { + threadId, + hash, + dataJsonlPath, + infoJsonlPath, + }; + + const existing = controllers.get(threadId); + if (existing !== undefined) { + existing.abort(); + controllers.delete(threadId); + } + + const ac = new AbortController(); + controllers.set(threadId, ac); + + try { + await mkdir(dirname(runningPath), { recursive: true }); + await mkdir(dirname(dataJsonlPath), { recursive: true }); + await writeFile(runningPath, "", "utf8"); + + const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); + + await executeThread(def, cmd.prompt, { ...cmd.options, signal: ac.signal }, io, logger); + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`); + } finally { + controllers.delete(threadId); + await unlink(runningPath).catch(() => {}); + bumpDone(); + socket?.end(); + } + } + + if (typeof process.send === "function") { + process.on("message", (msg: unknown) => { + const cmd = parseControlPayload(msg); + if (cmd === null) { + return; + } + void dispatchCommand(cmd, null); + }); + } + + const server = createServer((socket) => { + void (async () => { + const line = await readLineFromSocket(socket); + if (line === null) { + socket.end(); + return; + } + const cmd = parseCommandLine(line); + if (cmd === null) { + socket.end(); + return; + } + await dispatchCommand(cmd, socket); + })(); + }); + + server.on("error", (err) => { + bootLog("W8YK4NPX", `worker server error: ${err.message}`); + process.exit(1); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + resolve(); + }); + }); + + const addr = server.address(); + if (addr === null || typeof addr === "string") { + bootLog("R9XK4MNW", "worker failed to bind TCP address"); + process.exit(1); + return; + } + + process.stdout.write(`READY ${addr.port}\n`); + + await new Promise(() => {}); +} + +void main(); diff --git a/tsconfig.json b/tsconfig.json index e8e18ff..1cd58fd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -15,8 +15,5 @@ "composite": true, "outDir": "dist" }, - "references": [ - { "path": "packages/workflow" }, - { "path": "packages/cli-workflow" } - ] + "references": [{ "path": "packages/workflow" }, { "path": "packages/cli-workflow" }] }