feat: implement thread step — moderator → agent → update head

- Walk CAS chain to build ModeratorContext with expanded output
- Call uwf-moderator evaluate() for role decision
- Agent resolution: --agent > config overrides > default
- Spawn agent CLI, capture StepNode hash
- Update threads.yaml, check done via second evaluate
- Archive on $END

Refs #309, #315
This commit is contained in:
2026-05-18 09:19:37 +00:00
parent 4d477c67c0
commit b165049a13
6 changed files with 296 additions and 8 deletions
+3
View File
@@ -13,9 +13,12 @@
"dependencies": {
"@uncaged/json-cas": "workspace:^",
"@uncaged/json-cas-fs": "workspace:^",
"@uncaged/uwf-agent-kit": "workspace:^",
"@uncaged/uwf-moderator": "workspace:^",
"@uncaged/uwf-protocol": "workspace:^",
"@uncaged/workflow-util": "workspace:^",
"commander": "^14.0.3",
"dotenv": "^16.6.1",
"yaml": "^2.8.4"
},
"scripts": {
+14 -4
View File
@@ -2,7 +2,13 @@
import { Command } from "commander";
import { cmdThreadKill, cmdThreadList, cmdThreadShow, cmdThreadStart } from "./commands/thread.js";
import {
cmdThreadKill,
cmdThreadList,
cmdThreadShow,
cmdThreadStart,
cmdThreadStep,
} from "./commands/thread.js";
import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js";
import { resolveStorageRoot } from "./store.js";
@@ -79,9 +85,13 @@ thread
.description("Execute one step")
.argument("<thread-id>", "Thread ULID")
.option("--agent <cmd>", "Override agent command")
.action(() => {
process.stderr.write("uwf thread step: not implemented\n");
process.exit(1);
.action((threadId: string, opts: { agent: string | undefined }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const agentOverride = opts.agent ?? null;
const result = await cmdThreadStep(storageRoot, threadId, agentOverride);
writeJson(result);
});
});
thread
+252
View File
@@ -1,14 +1,25 @@
import { execFileSync } from "node:child_process";
import { validate } from "@uncaged/json-cas";
import { getEnvPath, loadWorkflowConfig } from "@uncaged/uwf-agent-kit";
import { evaluate } from "@uncaged/uwf-moderator";
import type {
AgentAlias,
AgentConfig,
CasRef,
ModeratorContext,
StartNodePayload,
StartOutput,
StepContext,
StepNodePayload,
StepOutput,
ThreadId,
ThreadListItem,
WorkflowConfig,
WorkflowPayload,
} from "@uncaged/uwf-protocol";
import { generateUlid } from "@uncaged/workflow-util";
import { config as loadDotenv } from "dotenv";
import {
appendThreadHistory,
@@ -24,6 +35,15 @@ import {
} from "../store.js";
import { isCasRef } from "../validate.js";
const END_ROLE = "$END";
type ChainState = {
startHash: CasRef;
start: StartNodePayload;
stepsNewestFirst: StepNodePayload[];
headIsStart: boolean;
};
export type KillOutput = {
thread: ThreadId;
archived: boolean;
@@ -184,6 +204,238 @@ export async function cmdThreadList(
return items;
}
function walkChain(uwf: UwfStore, headHash: CasRef): ChainState {
const headNode = uwf.store.get(headHash);
if (headNode === null) {
fail(`CAS node not found: ${headHash}`);
}
if (headNode.type === uwf.schemas.startNode) {
return {
startHash: headHash,
start: headNode.payload as StartNodePayload,
stepsNewestFirst: [],
headIsStart: true,
};
}
if (headNode.type !== uwf.schemas.stepNode) {
fail(`head ${headHash} is not a StartNode or StepNode`);
}
const stepsNewestFirst: StepNodePayload[] = [];
let hash: CasRef | null = headHash;
while (hash !== null) {
const node = uwf.store.get(hash);
if (node === null) {
fail(`CAS node not found while walking chain: ${hash}`);
}
if (node.type !== uwf.schemas.stepNode) {
break;
}
const payload = node.payload as StepNodePayload;
stepsNewestFirst.push(payload);
hash = payload.prev;
}
const newest = stepsNewestFirst[0];
if (newest === undefined) {
fail(`empty step chain at head ${headHash}`);
}
const startNode = uwf.store.get(newest.start);
if (startNode === null || startNode.type !== uwf.schemas.startNode) {
fail(`StartNode not found: ${newest.start}`);
}
return {
startHash: newest.start,
start: startNode.payload as StartNodePayload,
stepsNewestFirst,
headIsStart: false,
};
}
function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown {
const node = uwf.store.get(outputRef);
if (node === null) {
return {};
}
return node.payload;
}
function buildModeratorContext(uwf: UwfStore, chain: ChainState): ModeratorContext {
const chronological = [...chain.stepsNewestFirst].reverse();
const steps: StepContext[] = chronological.map((step) => ({
role: step.role,
output: expandOutput(uwf, step.output),
detail: step.detail,
agent: step.agent,
}));
return { start: chain.start, steps };
}
function loadWorkflowPayload(uwf: UwfStore, workflowRef: CasRef): WorkflowPayload {
const node = uwf.store.get(workflowRef);
if (node === null) {
fail(`workflow CAS node not found: ${workflowRef}`);
}
if (node.type !== uwf.schemas.workflow) {
fail(`node ${workflowRef} is not a Workflow`);
}
return node.payload as WorkflowPayload;
}
function parseAgentOverride(override: string): AgentConfig {
const parts = override
.trim()
.split(/\s+/)
.filter((p) => p.length > 0);
const command = parts[0];
if (command === undefined) {
fail("agent override must not be empty");
}
return { command, args: parts.slice(1) };
}
function resolveAgentConfig(
config: WorkflowConfig,
workflow: WorkflowPayload,
role: string,
agentOverride: string | null,
): AgentConfig {
if (agentOverride !== null) {
return parseAgentOverride(agentOverride);
}
let alias: AgentAlias = config.defaultAgent;
if (config.agentOverrides !== null) {
const roleOverrides = config.agentOverrides[workflow.name];
if (roleOverrides !== undefined && roleOverrides[role] !== undefined) {
alias = roleOverrides[role];
}
}
const agentConfig = config.agents[alias];
if (agentConfig === undefined) {
fail(`unknown agent alias in config: ${alias}`);
}
return agentConfig;
}
function spawnAgent(agent: AgentConfig, threadId: ThreadId, role: string): CasRef {
const argv = [...agent.args, threadId, role];
let stdout: string;
try {
stdout = execFileSync(agent.command, argv, {
encoding: "utf8",
env: process.env,
stdio: ["ignore", "pipe", "pipe"],
});
} catch (e) {
const err = e as NodeJS.ErrnoException & { stderr?: Buffer | string };
const stderr =
err.stderr === undefined
? ""
: typeof err.stderr === "string"
? err.stderr
: err.stderr.toString("utf8");
const detail = stderr.trim() !== "" ? `: ${stderr.trim()}` : "";
fail(`agent command failed (${agent.command})${detail}`);
}
const line = stdout.trim().split("\n").pop()?.trim() ?? "";
if (!isCasRef(line)) {
fail(`agent stdout is not a valid CAS hash: ${line || "(empty)"}`);
}
return line;
}
async function archiveThread(
storageRoot: string,
threadId: ThreadId,
workflow: CasRef,
head: CasRef,
): Promise<void> {
const index = await loadThreadsIndex(storageRoot);
delete index[threadId];
await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, {
thread: threadId,
workflow,
head,
completedAt: Date.now(),
});
}
export async function cmdThreadStep(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
fail(`thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow;
const workflow = loadWorkflowPayload(uwf, workflowHash);
const context = buildModeratorContext(uwf, chain);
const nextResult = evaluate(workflow, context);
if (!nextResult.ok) {
fail(nextResult.error.message);
}
if (nextResult.value === END_ROLE) {
await archiveThread(storageRoot, threadId, workflowHash, headHash);
return {
workflow: workflowHash,
thread: threadId,
head: headHash,
done: true,
};
}
const role = nextResult.value;
const config = await loadWorkflowConfig(storageRoot);
const agent = resolveAgentConfig(config, workflow, role, agentOverride);
loadDotenv({ path: getEnvPath(storageRoot) });
const newHead = spawnAgent(agent, threadId, role);
const newNode = uwf.store.get(newHead);
if (newNode === null || newNode.type !== uwf.schemas.stepNode) {
fail(`agent returned hash that is not a StepNode: ${newHead}`);
}
index[threadId] = newHead;
await saveThreadsIndex(storageRoot, index);
const chainAfter = walkChain(uwf, newHead);
const contextAfter = buildModeratorContext(uwf, chainAfter);
const afterResult = evaluate(workflow, contextAfter);
if (!afterResult.ok) {
fail(afterResult.error.message);
}
const done = afterResult.value === END_ROLE;
if (done) {
await archiveThread(storageRoot, threadId, workflowHash, newHead);
}
return {
workflow: workflowHash,
thread: threadId,
head: newHead,
done,
};
}
export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise<KillOutput> {
const index = await loadThreadsIndex(storageRoot);
const head = index[threadId];
+21 -3
View File
@@ -67,19 +67,37 @@ const START_NODE: JSONSchema = {
additionalProperties: false,
};
const STEP_NODE: JSONSchema = {
type: "object",
required: ["start", "prev", "role", "output", "detail", "agent"],
properties: {
start: { type: "string", format: "cas_ref" },
prev: {
anyOf: [{ type: "string", format: "cas_ref" }, { type: "null" }],
},
role: { type: "string" },
output: { type: "string", format: "cas_ref" },
detail: { type: "string", format: "cas_ref" },
agent: { type: "string" },
},
additionalProperties: false,
};
export type UwfSchemaHashes = {
workflow: Hash;
startNode: Hash;
stepNode: Hash;
};
/**
* Register Workflow and StartNode JSON Schemas in the CAS store.
* Register Workflow, StartNode, and StepNode JSON Schemas in the CAS store.
* Idempotent: safe to call on every CLI invocation.
*/
export async function registerUwfSchemas(store: Store): Promise<UwfSchemaHashes> {
const [workflow, startNode] = await Promise.all([
const [workflow, startNode, stepNode] = await Promise.all([
putSchema(store, WORKFLOW),
putSchema(store, START_NODE),
putSchema(store, STEP_NODE),
]);
return { workflow, startNode };
return { workflow, startNode, stepNode };
}
+5 -1
View File
@@ -5,5 +5,9 @@
"outDir": "dist"
},
"include": ["src"],
"references": [{ "path": "../uwf-protocol" }]
"references": [
{ "path": "../uwf-protocol" },
{ "path": "../uwf-moderator" },
{ "path": "../uwf-agent-kit" }
]
}
+1
View File
@@ -1,5 +1,6 @@
export type { BuildContextMeta } from "./context.js";
export { buildContext, buildContextWithMeta } from "./context.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig } from "./storage.js";
export type { ExtractResult, ResolvedLlmProvider } from "./extract.js";
export {
extract,