diff --git a/packages/cli-uwf/package.json b/packages/cli-uwf/package.json index a056665..5451a21 100644 --- a/packages/cli-uwf/package.json +++ b/packages/cli-uwf/package.json @@ -13,9 +13,12 @@ "dependencies": { "@uncaged/json-cas": "workspace:^", "@uncaged/json-cas-fs": "workspace:^", + "@uncaged/uwf-agent-kit": "workspace:^", + "@uncaged/uwf-moderator": "workspace:^", "@uncaged/uwf-protocol": "workspace:^", "@uncaged/workflow-util": "workspace:^", "commander": "^14.0.3", + "dotenv": "^16.6.1", "yaml": "^2.8.4" }, "scripts": { diff --git a/packages/cli-uwf/src/cli.ts b/packages/cli-uwf/src/cli.ts index c080175..159aecd 100644 --- a/packages/cli-uwf/src/cli.ts +++ b/packages/cli-uwf/src/cli.ts @@ -2,7 +2,13 @@ import { Command } from "commander"; -import { cmdThreadKill, cmdThreadList, cmdThreadShow, cmdThreadStart } from "./commands/thread.js"; +import { + cmdThreadKill, + cmdThreadList, + cmdThreadShow, + cmdThreadStart, + cmdThreadStep, +} from "./commands/thread.js"; import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js"; import { resolveStorageRoot } from "./store.js"; @@ -79,9 +85,13 @@ thread .description("Execute one step") .argument("", "Thread ULID") .option("--agent ", "Override agent command") - .action(() => { - process.stderr.write("uwf thread step: not implemented\n"); - process.exit(1); + .action((threadId: string, opts: { agent: string | undefined }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const agentOverride = opts.agent ?? null; + const result = await cmdThreadStep(storageRoot, threadId, agentOverride); + writeJson(result); + }); }); thread diff --git a/packages/cli-uwf/src/commands/thread.ts b/packages/cli-uwf/src/commands/thread.ts index ade3ae8..47dba49 100644 --- a/packages/cli-uwf/src/commands/thread.ts +++ b/packages/cli-uwf/src/commands/thread.ts @@ -1,14 +1,25 @@ +import { execFileSync } from "node:child_process"; + import { validate } from "@uncaged/json-cas"; +import { getEnvPath, loadWorkflowConfig } from "@uncaged/uwf-agent-kit"; +import { evaluate } from "@uncaged/uwf-moderator"; import type { + AgentAlias, + AgentConfig, CasRef, + ModeratorContext, StartNodePayload, StartOutput, + StepContext, StepNodePayload, StepOutput, ThreadId, ThreadListItem, + WorkflowConfig, + WorkflowPayload, } from "@uncaged/uwf-protocol"; import { generateUlid } from "@uncaged/workflow-util"; +import { config as loadDotenv } from "dotenv"; import { appendThreadHistory, @@ -24,6 +35,15 @@ import { } from "../store.js"; import { isCasRef } from "../validate.js"; +const END_ROLE = "$END"; + +type ChainState = { + startHash: CasRef; + start: StartNodePayload; + stepsNewestFirst: StepNodePayload[]; + headIsStart: boolean; +}; + export type KillOutput = { thread: ThreadId; archived: boolean; @@ -184,6 +204,238 @@ export async function cmdThreadList( return items; } +function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { + const headNode = uwf.store.get(headHash); + if (headNode === null) { + fail(`CAS node not found: ${headHash}`); + } + + if (headNode.type === uwf.schemas.startNode) { + return { + startHash: headHash, + start: headNode.payload as StartNodePayload, + stepsNewestFirst: [], + headIsStart: true, + }; + } + + if (headNode.type !== uwf.schemas.stepNode) { + fail(`head ${headHash} is not a StartNode or StepNode`); + } + + const stepsNewestFirst: StepNodePayload[] = []; + let hash: CasRef | null = headHash; + + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null) { + fail(`CAS node not found while walking chain: ${hash}`); + } + if (node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + stepsNewestFirst.push(payload); + hash = payload.prev; + } + + const newest = stepsNewestFirst[0]; + if (newest === undefined) { + fail(`empty step chain at head ${headHash}`); + } + + const startNode = uwf.store.get(newest.start); + if (startNode === null || startNode.type !== uwf.schemas.startNode) { + fail(`StartNode not found: ${newest.start}`); + } + + return { + startHash: newest.start, + start: startNode.payload as StartNodePayload, + stepsNewestFirst, + headIsStart: false, + }; +} + +function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { + const node = uwf.store.get(outputRef); + if (node === null) { + return {}; + } + return node.payload; +} + +function buildModeratorContext(uwf: UwfStore, chain: ChainState): ModeratorContext { + const chronological = [...chain.stepsNewestFirst].reverse(); + const steps: StepContext[] = chronological.map((step) => ({ + role: step.role, + output: expandOutput(uwf, step.output), + detail: step.detail, + agent: step.agent, + })); + return { start: chain.start, steps }; +} + +function loadWorkflowPayload(uwf: UwfStore, workflowRef: CasRef): WorkflowPayload { + const node = uwf.store.get(workflowRef); + if (node === null) { + fail(`workflow CAS node not found: ${workflowRef}`); + } + if (node.type !== uwf.schemas.workflow) { + fail(`node ${workflowRef} is not a Workflow`); + } + return node.payload as WorkflowPayload; +} + +function parseAgentOverride(override: string): AgentConfig { + const parts = override + .trim() + .split(/\s+/) + .filter((p) => p.length > 0); + const command = parts[0]; + if (command === undefined) { + fail("agent override must not be empty"); + } + return { command, args: parts.slice(1) }; +} + +function resolveAgentConfig( + config: WorkflowConfig, + workflow: WorkflowPayload, + role: string, + agentOverride: string | null, +): AgentConfig { + if (agentOverride !== null) { + return parseAgentOverride(agentOverride); + } + + let alias: AgentAlias = config.defaultAgent; + if (config.agentOverrides !== null) { + const roleOverrides = config.agentOverrides[workflow.name]; + if (roleOverrides !== undefined && roleOverrides[role] !== undefined) { + alias = roleOverrides[role]; + } + } + + const agentConfig = config.agents[alias]; + if (agentConfig === undefined) { + fail(`unknown agent alias in config: ${alias}`); + } + return agentConfig; +} + +function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRef { + const argv = [...agent.args, threadId, role]; + let stdout: string; + try { + stdout = execFileSync(agent.command, argv, { + encoding: "utf8", + env: process.env, + stdio: ["ignore", "pipe", "pipe"], + }); + } catch (e) { + const err = e as NodeJS.ErrnoException & { stderr?: Buffer | string }; + const stderr = + err.stderr === undefined + ? "" + : typeof err.stderr === "string" + ? err.stderr + : err.stderr.toString("utf8"); + const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : ""; + fail(`agent command failed (${agent.command})${detail}`); + } + + const line = stdout.trim().split("\n").pop()?.trim() ?? ""; + if (!isCasRef(line)) { + fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`); + } + return line; +} + +async function archiveThread( + storageRoot: string, + threadId: ThreadId, + workflow: CasRef, + head: CasRef, +): Promise { + const index = await loadThreadsIndex(storageRoot); + delete index[threadId]; + await saveThreadsIndex(storageRoot, index); + await appendThreadHistory(storageRoot, { + thread: threadId, + workflow, + head, + completedAt: Date.now(), + }); +} + +export async function cmdThreadStep( + storageRoot: string, + threadId: ThreadId, + agentOverride: string | null, +): Promise { + const index = await loadThreadsIndex(storageRoot); + const headHash = index[threadId]; + if (headHash === undefined) { + fail(`thread not active: ${threadId}`); + } + + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + const workflowHash = chain.start.workflow; + const workflow = loadWorkflowPayload(uwf, workflowHash); + const context = buildModeratorContext(uwf, chain); + + const nextResult = evaluate(workflow, context); + if (!nextResult.ok) { + fail(nextResult.error.message); + } + + if (nextResult.value === END_ROLE) { + await archiveThread(storageRoot, threadId, workflowHash, headHash); + return { + workflow: workflowHash, + thread: threadId, + head: headHash, + done: true, + }; + } + + const role = nextResult.value; + const config = await loadWorkflowConfig(storageRoot); + const agent = resolveAgentConfig(config, workflow, role, agentOverride); + + loadDotenv({ path: getEnvPath(storageRoot) }); + const newHead = spawnAgent(agent, threadId, role); + + const newNode = uwf.store.get(newHead); + if (newNode === null || newNode.type !== uwf.schemas.stepNode) { + fail(`agent returned hash that is not a StepNode: ${newHead}`); + } + + index[threadId] = newHead; + await saveThreadsIndex(storageRoot, index); + + const chainAfter = walkChain(uwf, newHead); + const contextAfter = buildModeratorContext(uwf, chainAfter); + const afterResult = evaluate(workflow, contextAfter); + if (!afterResult.ok) { + fail(afterResult.error.message); + } + + const done = afterResult.value === END_ROLE; + if (done) { + await archiveThread(storageRoot, threadId, workflowHash, newHead); + } + + return { + workflow: workflowHash, + thread: threadId, + head: newHead, + done, + }; +} + export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); const head = index[threadId]; diff --git a/packages/cli-uwf/src/schemas.ts b/packages/cli-uwf/src/schemas.ts index 70b7590..d3c3a49 100644 --- a/packages/cli-uwf/src/schemas.ts +++ b/packages/cli-uwf/src/schemas.ts @@ -67,19 +67,37 @@ const START_NODE: JSONSchema = { additionalProperties: false, }; +const STEP_NODE: JSONSchema = { + type: "object", + required: ["start", "prev", "role", "output", "detail", "agent"], + properties: { + start: { type: "string", format: "cas_ref" }, + prev: { + anyOf: [{ type: "string", format: "cas_ref" }, { type: "null" }], + }, + role: { type: "string" }, + output: { type: "string", format: "cas_ref" }, + detail: { type: "string", format: "cas_ref" }, + agent: { type: "string" }, + }, + additionalProperties: false, +}; + export type UwfSchemaHashes = { workflow: Hash; startNode: Hash; + stepNode: Hash; }; /** - * Register Workflow and StartNode JSON Schemas in the CAS store. + * Register Workflow, StartNode, and StepNode JSON Schemas in the CAS store. * Idempotent: safe to call on every CLI invocation. */ export async function registerUwfSchemas(store: Store): Promise { - const [workflow, startNode] = await Promise.all([ + const [workflow, startNode, stepNode] = await Promise.all([ putSchema(store, WORKFLOW), putSchema(store, START_NODE), + putSchema(store, STEP_NODE), ]); - return { workflow, startNode }; + return { workflow, startNode, stepNode }; } diff --git a/packages/cli-uwf/tsconfig.json b/packages/cli-uwf/tsconfig.json index a9954af..ceee718 100644 --- a/packages/cli-uwf/tsconfig.json +++ b/packages/cli-uwf/tsconfig.json @@ -5,5 +5,9 @@ "outDir": "dist" }, "include": ["src"], - "references": [{ "path": "../uwf-protocol" }] + "references": [ + { "path": "../uwf-protocol" }, + { "path": "../uwf-moderator" }, + { "path": "../uwf-agent-kit" } + ] } diff --git a/packages/uwf-agent-kit/src/index.ts b/packages/uwf-agent-kit/src/index.ts index 5506a31..acd6eb1 100644 --- a/packages/uwf-agent-kit/src/index.ts +++ b/packages/uwf-agent-kit/src/index.ts @@ -1,5 +1,6 @@ export type { BuildContextMeta } from "./context.js"; export { buildContext, buildContextWithMeta } from "./context.js"; +export { getConfigPath, getEnvPath, loadWorkflowConfig } from "./storage.js"; export type { ExtractResult, ResolvedLlmProvider } from "./extract.js"; export { extract,