diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index d6c8bf9..c7d7fa4 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -16,7 +16,7 @@ import { import { cmdLogClean, cmdLogList, cmdLogShow } from "./commands/log.js"; import { cmdSetup, cmdSetupInteractive } from "./commands/setup.js"; import { cmdSkillCli } from "./commands/skill.js"; -import { cmdStepFork, cmdStepList, cmdStepRead, cmdStepShow } from "./commands/step.js"; +import { cmdStepFork, cmdStepList, cmdStepShow } from "./commands/step.js"; import { cmdThreadCancel, cmdThreadExec, @@ -272,19 +272,7 @@ step }); }); -step - .command("read") - .description("Read a step's agent output as markdown") - .argument("", "CAS hash of the StepNode") - .option("--before ", "Show only first N turns") - .action((stepHash: string, opts: { before: string | undefined }) => { - const storageRoot = resolveStorageRoot(); - runAction(async () => { - const before = opts.before !== undefined ? Number.parseInt(opts.before, 10) : null; - const markdown = await cmdStepRead(storageRoot, stepHash as CasRef, before); - process.stdout.write(markdown.endsWith("\n") ? markdown : `${markdown}\n`); - }); - }); +// step read is not yet registered (half-baked, see step.ts cmdStepRead) step .command("fork") diff --git a/packages/cli-workflow/src/commands/shared.ts b/packages/cli-workflow/src/commands/shared.ts new file mode 100644 index 0000000..6579de6 --- /dev/null +++ b/packages/cli-workflow/src/commands/shared.ts @@ -0,0 +1,227 @@ +import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; +import { getSchema } from "@uncaged/json-cas"; +import type { + CasRef, + StartNodePayload, + StepNodePayload, + ThreadId, +} from "@uncaged/workflow-protocol"; +import { loadThreadsIndex, type UwfStore } from "../store.js"; + +type ChainState = { + startHash: CasRef; + start: StartNodePayload; + stepsNewestFirst: StepNodePayload[]; + headIsStart: boolean; +}; + +type OrderedStepItem = { + hash: CasRef; + payload: StepNodePayload; + timestamp: number; +}; + +function fail(message: string): never { + process.stderr.write(`${message}\n`); + process.exit(1); +} + +function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { + const headNode = uwf.store.get(headHash); + if (headNode === null) { + fail(`CAS node not found: ${headHash}`); + } + + if (headNode.type === uwf.schemas.startNode) { + return { + startHash: headHash, + start: headNode.payload as StartNodePayload, + stepsNewestFirst: [], + headIsStart: true, + }; + } + + if (headNode.type !== uwf.schemas.stepNode) { + fail(`head ${headHash} is not a StartNode or StepNode`); + } + + const stepsNewestFirst: StepNodePayload[] = []; + let hash: CasRef | null = headHash; + + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null) { + fail(`CAS node not found while walking chain: ${hash}`); + } + if (node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + stepsNewestFirst.push(payload); + hash = payload.prev; + } + + const newest = stepsNewestFirst[0]; + if (newest === undefined) { + fail(`empty step chain at head ${headHash}`); + } + + const startNode = uwf.store.get(newest.start); + if (startNode === null || startNode.type !== uwf.schemas.startNode) { + fail(`StartNode not found: ${newest.start}`); + } + + return { + startHash: newest.start, + start: startNode.payload as StartNodePayload, + stepsNewestFirst, + headIsStart: false, + }; +} + +function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { + const node = uwf.store.get(outputRef); + if (node === null) { + return {}; + } + return node.payload; +} + +/** + * Recursively expand all cas_ref fields in a CAS node's payload, + * replacing hash strings with the referenced node's expanded payload. + */ +function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { + const seen = visited ?? new Set(); + if (seen.has(hash)) return hash; // cycle guard + seen.add(hash); + + const node = store.get(hash); + if (node === null) return hash; + + const schema = getSchema(store, node.type); + if (schema === null) return node.payload; + + return expandValue(store, schema, node.payload, seen); +} + +function expandCasRefField(store: CasStore, value: unknown, visited: Set): unknown { + if (typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + return value; +} + +function expandAnyOfField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (!Array.isArray(schema.anyOf)) return value; + for (const sub of schema.anyOf as JSONSchema[]) { + if (sub.format === "cas_ref" && typeof value === "string") { + return expandDeep(store, value as CasRef, visited); + } + } + return value; +} + +function expandArrayField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (!schema.items || !Array.isArray(value)) return value; + const itemSchema = schema.items as JSONSchema; + return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited)); +} + +function expandObjectField( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) { + return value; + } + const props = schema.properties as Record; + const obj = value as Record; + const result: Record = {}; + for (const [key, val] of Object.entries(obj)) { + const propSchema = props[key]; + result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val; + } + return result; +} + +function expandValue( + store: CasStore, + schema: JSONSchema, + value: unknown, + visited: Set, +): unknown { + if (schema.format === "cas_ref") return expandCasRefField(store, value, visited); + if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited); + if (schema.type === "array") return expandArrayField(store, schema, value, visited); + return expandObjectField(store, schema, value, visited); +} + +function collectOrderedSteps( + uwf: UwfStore, + headHash: CasRef, + chain: ChainState, +): OrderedStepItem[] { + let hash: CasRef | null = headHash; + const hashToNode = new Map(); + while (hash !== null) { + const node = uwf.store.get(hash); + if (node === null || node.type !== uwf.schemas.stepNode) { + break; + } + const payload = node.payload as StepNodePayload; + hashToNode.set(hash, { payload, timestamp: node.timestamp }); + hash = payload.prev; + } + + let cur: CasRef | null = chain.headIsStart ? null : headHash; + const ordered: OrderedStepItem[] = []; + while (cur !== null) { + const entry = hashToNode.get(cur); + if (entry === undefined) { + break; + } + ordered.push({ hash: cur, ...entry }); + cur = entry.payload.prev; + } + + ordered.reverse(); + return ordered; +} + +async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { + const index = await loadThreadsIndex(storageRoot); + const head = index[threadId]; + if (head === undefined) { + fail(`thread not active: ${threadId}`); + } + return head; +} + +export { + type ChainState, + collectOrderedSteps, + expandAnyOfField, + expandArrayField, + expandCasRefField, + expandDeep, + expandObjectField, + expandOutput, + expandValue, + fail, + type OrderedStepItem, + resolveHeadHash, + walkChain, +}; diff --git a/packages/cli-workflow/src/commands/step.ts b/packages/cli-workflow/src/commands/step.ts index 45ee014..33b5d47 100644 --- a/packages/cli-workflow/src/commands/step.ts +++ b/packages/cli-workflow/src/commands/step.ts @@ -1,9 +1,6 @@ -import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; -import { getSchema } from "@uncaged/json-cas"; import type { CasRef, StartEntry, - StartNodePayload, StepEntry, StepNodePayload, ThreadForkOutput, @@ -11,213 +8,15 @@ import type { ThreadStepsOutput, } from "@uncaged/workflow-protocol"; import { generateUlid } from "@uncaged/workflow-util"; -import { createUwfStore, loadThreadsIndex, saveThreadsIndex, type UwfStore } from "../store.js"; - -function fail(message: string): never { - process.stderr.write(`${message}\n`); - process.exit(1); -} - -type ChainState = { - startHash: CasRef; - start: StartNodePayload; - stepsNewestFirst: StepNodePayload[]; - headIsStart: boolean; -}; - -type OrderedStepItem = { - hash: CasRef; - payload: StepNodePayload; - timestamp: number; -}; - -function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { - const headNode = uwf.store.get(headHash); - if (headNode === null) { - fail(`CAS node not found: ${headHash}`); - } - - if (headNode.type === uwf.schemas.startNode) { - return { - startHash: headHash, - start: headNode.payload as StartNodePayload, - stepsNewestFirst: [], - headIsStart: true, - }; - } - - if (headNode.type !== uwf.schemas.stepNode) { - fail(`head ${headHash} is not a StartNode or StepNode`); - } - - const stepsNewestFirst: StepNodePayload[] = []; - let hash: CasRef | null = headHash; - - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null) { - fail(`CAS node not found while walking chain: ${hash}`); - } - if (node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - stepsNewestFirst.push(payload); - hash = payload.prev; - } - - const newest = stepsNewestFirst[0]; - if (newest === undefined) { - fail(`empty step chain at head ${headHash}`); - } - - const startNode = uwf.store.get(newest.start); - if (startNode === null || startNode.type !== uwf.schemas.startNode) { - fail(`StartNode not found: ${newest.start}`); - } - - return { - startHash: newest.start, - start: startNode.payload as StartNodePayload, - stepsNewestFirst, - headIsStart: false, - }; -} - -function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { - const node = uwf.store.get(outputRef); - if (node === null) { - return {}; - } - return node.payload; -} - -/** - * Recursively expand all cas_ref fields in a CAS node's payload, - * replacing hash strings with the referenced node's expanded payload. - */ -function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { - const seen = visited ?? new Set(); - if (seen.has(hash)) return hash; // cycle guard - seen.add(hash); - - const node = store.get(hash); - if (node === null) return hash; - - const schema = getSchema(store, node.type); - if (schema === null) return node.payload; - - return expandValue(store, schema, node.payload, seen); -} - -function expandCasRefField(store: CasStore, value: unknown, visited: Set): unknown { - if (typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - return value; -} - -function expandAnyOfField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!Array.isArray(schema.anyOf)) return value; - for (const sub of schema.anyOf as JSONSchema[]) { - if (sub.format === "cas_ref" && typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - } - return value; -} - -function expandArrayField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!Array.isArray(value)) return value; - const itemSchema = schema.items as JSONSchema | undefined; - if (itemSchema === undefined) return value; - return value.map((item) => expandValue(store, itemSchema, item, visited)); -} - -function expandObjectField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (typeof value !== "object" || value === null || Array.isArray(value)) return value; - const props = schema.properties as Record | undefined; - if (props === undefined) return value; - const result: Record = {}; - for (const [key, val] of Object.entries(value)) { - const propSchema = props[key]; - result[key] = propSchema !== undefined ? expandValue(store, propSchema, val, visited) : val; - } - return result; -} - -function expandValue( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (schema.format === "cas_ref") { - return expandCasRefField(store, value, visited); - } - if (schema.anyOf !== undefined) { - return expandAnyOfField(store, schema, value, visited); - } - if (schema.type === "array") { - return expandArrayField(store, schema, value, visited); - } - if (schema.type === "object") { - return expandObjectField(store, schema, value, visited); - } - return value; -} - -function collectOrderedSteps( - uwf: UwfStore, - headHash: CasRef, - chain: ChainState, -): OrderedStepItem[] { - const reversed = chain.stepsNewestFirst.slice().reverse(); - const ordered: OrderedStepItem[] = []; - - let hash: CasRef | null = chain.headIsStart ? null : headHash; - for (const payload of reversed) { - if (hash === null) { - fail("unexpected null hash while collecting ordered steps"); - } - const node = uwf.store.get(hash); - if (node === null) { - fail(`CAS node not found: ${hash}`); - } - ordered.push({ - hash, - payload, - timestamp: node.timestamp, - }); - hash = payload.prev; - } - - return ordered; -} - -async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { - const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) { - fail(`thread not active: ${threadId}`); - } - return head; -} +import { createUwfStore, loadThreadsIndex, saveThreadsIndex } from "../store.js"; +import { + collectOrderedSteps, + expandDeep, + expandOutput, + fail, + resolveHeadHash, + walkChain, +} from "./shared.js"; /** * List all steps in a thread (previously: thread steps) @@ -250,7 +49,7 @@ export async function cmdStepList( hash: item.hash, role: item.payload.role, output: expandOutput(uwf, item.payload.output), - detail: item.payload.detail ? expandDeep(uwf.store, item.payload.detail) : null, + detail: item.payload.detail ?? null, agent: item.payload.agent, timestamp: item.timestamp, }); diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index 56ceac1..556801f 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -1,8 +1,7 @@ import { execFileSync, spawn } from "node:child_process"; import { access, readFile } from "node:fs/promises"; import { dirname, isAbsolute, resolve as resolvePath } from "node:path"; -import type { Store as CasStore, JSONSchema } from "@uncaged/json-cas"; -import { getSchema, validate } from "@uncaged/json-cas"; +import { validate } from "@uncaged/json-cas"; import { getEnvPath, loadWorkflowConfig } from "@uncaged/workflow-agent-kit"; import { evaluate } from "@uncaged/workflow-moderator"; import type { @@ -43,6 +42,14 @@ import { type UwfStore, } from "../store.js"; import { checkWorkflowFilenameConsistency, isCasRef, parseWorkflowPayload } from "../validate.js"; +import { + type ChainState, + collectOrderedSteps, + expandOutput, + fail, + type OrderedStepItem, + walkChain, +} from "./shared.js"; import { materializeWorkflowPayload } from "./workflow.js"; const END_ROLE = "$END"; @@ -61,29 +68,6 @@ function failStep(plog: ProcessLogger, message: string): never { fail(message); } -type ChainState = { - startHash: CasRef; - start: StartNodePayload; - stepsNewestFirst: StepNodePayload[]; - headIsStart: boolean; -}; - -type OrderedStepItem = { - hash: CasRef; - payload: StepNodePayload; - timestamp: number; -}; - -export type KillOutput = { - thread: ThreadId; - archived: boolean; -}; - -function fail(message: string): never { - process.stderr.write(`${message}\n`); - process.exit(1); -} - /** * Check if a string looks like a file path (contains path separators or has .yaml/.yml extension). */ @@ -406,180 +390,6 @@ export async function cmdThreadList( return items; } -function walkChain(uwf: UwfStore, headHash: CasRef): ChainState { - const headNode = uwf.store.get(headHash); - if (headNode === null) { - fail(`CAS node not found: ${headHash}`); - } - - if (headNode.type === uwf.schemas.startNode) { - return { - startHash: headHash, - start: headNode.payload as StartNodePayload, - stepsNewestFirst: [], - headIsStart: true, - }; - } - - if (headNode.type !== uwf.schemas.stepNode) { - fail(`head ${headHash} is not a StartNode or StepNode`); - } - - const stepsNewestFirst: StepNodePayload[] = []; - let hash: CasRef | null = headHash; - - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null) { - fail(`CAS node not found while walking chain: ${hash}`); - } - if (node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - stepsNewestFirst.push(payload); - hash = payload.prev; - } - - const newest = stepsNewestFirst[0]; - if (newest === undefined) { - fail(`empty step chain at head ${headHash}`); - } - - const startNode = uwf.store.get(newest.start); - if (startNode === null || startNode.type !== uwf.schemas.startNode) { - fail(`StartNode not found: ${newest.start}`); - } - - return { - startHash: newest.start, - start: startNode.payload as StartNodePayload, - stepsNewestFirst, - headIsStart: false, - }; -} - -function expandOutput(uwf: UwfStore, outputRef: CasRef): unknown { - const node = uwf.store.get(outputRef); - if (node === null) { - return {}; - } - return node.payload; -} - -/** - * Recursively expand all cas_ref fields in a CAS node's payload, - * replacing hash strings with the referenced node's expanded payload. - */ -function expandDeep(store: CasStore, hash: CasRef, visited?: Set): unknown { - const seen = visited ?? new Set(); - if (seen.has(hash)) return hash; // cycle guard - seen.add(hash); - - const node = store.get(hash); - if (node === null) return hash; - - const schema = getSchema(store, node.type); - if (schema === null) return node.payload; - - return expandValue(store, schema, node.payload, seen); -} - -function expandCasRefField(store: CasStore, value: unknown, visited: Set): unknown { - if (typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - return value; -} - -function expandAnyOfField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!Array.isArray(schema.anyOf)) return value; - for (const sub of schema.anyOf as JSONSchema[]) { - if (sub.format === "cas_ref" && typeof value === "string") { - return expandDeep(store, value as CasRef, visited); - } - } - return value; -} - -function expandArrayField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (!schema.items || !Array.isArray(value)) return value; - const itemSchema = schema.items as JSONSchema; - return (value as unknown[]).map((item) => expandValue(store, itemSchema, item, visited)); -} - -function expandObjectField( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (value === null || typeof value !== "object" || Array.isArray(value) || !schema.properties) { - return value; - } - const props = schema.properties as Record; - const obj = value as Record; - const result: Record = {}; - for (const [key, val] of Object.entries(obj)) { - const propSchema = props[key]; - result[key] = propSchema ? expandValue(store, propSchema, val, visited) : val; - } - return result; -} - -function expandValue( - store: CasStore, - schema: JSONSchema, - value: unknown, - visited: Set, -): unknown { - if (schema.format === "cas_ref") return expandCasRefField(store, value, visited); - if (Array.isArray(schema.anyOf)) return expandAnyOfField(store, schema, value, visited); - if (schema.type === "array") return expandArrayField(store, schema, value, visited); - return expandObjectField(store, schema, value, visited); -} - -function collectOrderedSteps( - uwf: UwfStore, - headHash: CasRef, - chain: ChainState, -): OrderedStepItem[] { - let hash: CasRef | null = headHash; - const hashToNode = new Map(); - while (hash !== null) { - const node = uwf.store.get(hash); - if (node === null || node.type !== uwf.schemas.stepNode) { - break; - } - const payload = node.payload as StepNodePayload; - hashToNode.set(hash, { payload, timestamp: node.timestamp }); - hash = payload.prev; - } - - let cur: CasRef | null = chain.headIsStart ? null : headHash; - const ordered: OrderedStepItem[] = []; - while (cur !== null) { - const entry = hashToNode.get(cur); - if (entry === undefined) { - break; - } - ordered.push({ hash: cur, ...entry }); - cur = entry.payload.prev; - } - ordered.reverse(); - return ordered; -} - function formatYaml(value: unknown): string { return stringify(value, { aliasDuplicateObjects: false }).trimEnd(); } @@ -1207,44 +1017,6 @@ export async function cmdThreadCancel( return { thread: threadId, cancelled: true }; } -export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise { - const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) { - fail(`thread not active: ${threadId}`); - } - - // Check if thread is running in background and terminate it - const runningMarker = await isThreadRunning(storageRoot, threadId); - if (runningMarker !== null) { - try { - process.kill(runningMarker.pid, "SIGTERM"); - } catch { - // Process may have already exited, ignore error - } - await deleteMarker(storageRoot, threadId); - } - - const uwf = await createUwfStore(storageRoot); - const workflow = resolveWorkflowFromHead(uwf, head); - if (workflow === null) { - fail(`failed to resolve workflow from head: ${head}`); - } - - delete index[threadId]; - await saveThreadsIndex(storageRoot, index); - - const historyEntry: ThreadHistoryLine = { - thread: threadId, - workflow, - head, - completedAt: Date.now(), - }; - await appendThreadHistory(storageRoot, historyEntry); - - return { thread: threadId, archived: true }; -} - export async function cmdThreadRunning(storageRoot: string): Promise { const threads = await listRunningThreads(storageRoot); return { threads }; diff --git a/packages/workflow-agent-claude-code/package.json b/packages/workflow-agent-claude-code/package.json index 4a54820..5d8d0c2 100644 --- a/packages/workflow-agent-claude-code/package.json +++ b/packages/workflow-agent-claude-code/package.json @@ -22,8 +22,7 @@ }, "dependencies": { "@uncaged/json-cas": "^0.4.0", - "@uncaged/workflow-agent-kit": "workspace:^", - "@uncaged/workflow-util": "workspace:^" + "@uncaged/workflow-agent-kit": "workspace:^" }, "devDependencies": { "typescript": "^5.8.3" diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index 38a03d7..ddd7f32 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -92,7 +92,7 @@ export type StepEntry = { hash: CasRef; role: string; output: unknown; - detail: unknown; + detail: CasRef; agent: string; timestamp: number; };