From 0d5678c96170e65e80d8e93ac5fa8d9a819c7ed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 18 May 2026 09:09:10 +0000 Subject: [PATCH] feat: add thread start/show/list/kill commands - thread start: ULID generation, StartNode to CAS, threads.yaml - thread show: active (done:false) or archived (done:true) - thread list: active threads, --all includes history - thread kill: archive to history.jsonl Refs #309, #313 --- packages/cli-uwf/package.json | 1 + packages/cli-uwf/src/cli.ts | 39 +++-- packages/cli-uwf/src/commands/thread.ts | 209 +++++++++++++++++++++++- packages/cli-uwf/src/store.ts | 114 ++++++++++++- 4 files changed, 347 insertions(+), 16 deletions(-) diff --git a/packages/cli-uwf/package.json b/packages/cli-uwf/package.json index 98ca5c7..a056665 100644 --- a/packages/cli-uwf/package.json +++ b/packages/cli-uwf/package.json @@ -14,6 +14,7 @@ "@uncaged/json-cas": "workspace:^", "@uncaged/json-cas-fs": "workspace:^", "@uncaged/uwf-protocol": "workspace:^", + "@uncaged/workflow-util": "workspace:^", "commander": "^14.0.3", "yaml": "^2.8.4" }, diff --git a/packages/cli-uwf/src/cli.ts b/packages/cli-uwf/src/cli.ts index 730b2df..c080175 100644 --- a/packages/cli-uwf/src/cli.ts +++ b/packages/cli-uwf/src/cli.ts @@ -2,7 +2,7 @@ import { Command } from "commander"; -import { cmdThreadPlaceholder } from "./commands/thread.js"; +import { cmdThreadKill, cmdThreadList, cmdThreadShow, cmdThreadStart } from "./commands/thread.js"; import { cmdWorkflowList, cmdWorkflowPut, cmdWorkflowShow } from "./commands/workflow.js"; import { resolveStorageRoot } from "./store.js"; @@ -59,15 +59,19 @@ workflow }); }); -const thread = program.command("thread").description("Thread execution (Phase 4)"); +const thread = program.command("thread").description("Thread lifecycle and execution"); thread .command("start") .description("Create a thread without executing") .argument("", "Workflow name or hash") .requiredOption("-p, --prompt ", "User prompt") - .action(() => { - cmdThreadPlaceholder("start"); + .action((workflow: string, opts: { prompt: string }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadStart(storageRoot, workflow, opts.prompt); + writeJson(result); + }); }); thread @@ -76,31 +80,44 @@ thread .argument("", "Thread ULID") .option("--agent ", "Override agent command") .action(() => { - cmdThreadPlaceholder("step"); + process.stderr.write("uwf thread step: not implemented\n"); + process.exit(1); }); thread .command("show") .description("Show thread head pointer") .argument("", "Thread ULID") - .action(() => { - cmdThreadPlaceholder("show"); + .action((threadId: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadShow(storageRoot, threadId); + writeJson(result); + }); }); thread .command("list") .description("List active threads") .option("--all", "Include archived threads") - .action(() => { - cmdThreadPlaceholder("list"); + .action((opts: { all: boolean }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadList(storageRoot, opts.all); + writeJson(result); + }); }); thread .command("kill") .description("Terminate and archive a thread") .argument("", "Thread ULID") - .action(() => { - cmdThreadPlaceholder("kill"); + .action((threadId: string) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const result = await cmdThreadKill(storageRoot, threadId); + writeJson(result); + }); }); program.parseAsync(process.argv).catch((e: unknown) => { diff --git a/packages/cli-uwf/src/commands/thread.ts b/packages/cli-uwf/src/commands/thread.ts index 0265ebd..ade3ae8 100644 --- a/packages/cli-uwf/src/commands/thread.ts +++ b/packages/cli-uwf/src/commands/thread.ts @@ -1,9 +1,212 @@ +import { validate } from "@uncaged/json-cas"; +import type { + CasRef, + StartNodePayload, + StartOutput, + StepNodePayload, + StepOutput, + ThreadId, + ThreadListItem, +} from "@uncaged/uwf-protocol"; +import { generateUlid } from "@uncaged/workflow-util"; + +import { + appendThreadHistory, + createUwfStore, + findThreadInHistory, + loadThreadHistory, + loadThreadsIndex, + loadWorkflowRegistry, + resolveWorkflowHash, + saveThreadsIndex, + type ThreadHistoryLine, + type UwfStore, +} from "../store.js"; +import { isCasRef } from "../validate.js"; + +export type KillOutput = { + thread: ThreadId; + archived: boolean; +}; + function fail(message: string): never { process.stderr.write(`${message}\n`); process.exit(1); } -/** Phase 4 placeholder — thread commands are not implemented yet. */ -export function cmdThreadPlaceholder(command: string): never { - fail(`uwf thread ${command}: not implemented (Phase 4)`); +async function resolveWorkflowCasRef( + uwf: UwfStore, + storageRoot: string, + workflowId: string, +): Promise { + const registry = await loadWorkflowRegistry(storageRoot); + const hash = resolveWorkflowHash(registry, workflowId); + if (hash === null) { + fail(`workflow not found: ${workflowId}`); + } + if (!isCasRef(hash)) { + fail(`workflow not found: ${workflowId}`); + } + const node = uwf.store.get(hash); + if (node === null) { + fail(`CAS node not found: ${hash}`); + } + if (node.type !== uwf.schemas.workflow) { + fail(`node ${hash} is not a Workflow (type ${node.type})`); + } + return hash; +} + +function resolveWorkflowFromHead(uwf: UwfStore, head: CasRef): CasRef | null { + const node = uwf.store.get(head); + if (node === null) { + return null; + } + + if (node.type === uwf.schemas.startNode) { + const payload = node.payload as StartNodePayload; + return payload.workflow; + } + + const payload = node.payload as StepNodePayload; + if (typeof payload.start !== "string") { + return null; + } + + const startNode = uwf.store.get(payload.start); + if (startNode === null || startNode.type !== uwf.schemas.startNode) { + return null; + } + + return (startNode.payload as StartNodePayload).workflow; +} + +export async function cmdThreadStart( + storageRoot: string, + workflowId: string, + prompt: string, +): Promise { + const uwf = await createUwfStore(storageRoot); + const workflowHash = await resolveWorkflowCasRef(uwf, storageRoot, workflowId); + + const threadId = generateUlid(Date.now()) as ThreadId; + const startPayload: StartNodePayload = { + workflow: workflowHash, + prompt, + }; + + const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); + const node = uwf.store.get(headHash); + if (node === null || !validate(uwf.store, node)) { + fail("stored StartNode failed schema validation"); + } + + const index = await loadThreadsIndex(storageRoot); + index[threadId] = headHash; + await saveThreadsIndex(storageRoot, index); + + return { workflow: workflowHash, thread: threadId }; +} + +export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise { + const index = await loadThreadsIndex(storageRoot); + const activeHead = index[threadId]; + if (activeHead !== undefined) { + const uwf = await createUwfStore(storageRoot); + const workflow = resolveWorkflowFromHead(uwf, activeHead); + if (workflow === null) { + fail(`failed to resolve workflow from head: ${activeHead}`); + } + return { + workflow, + thread: threadId, + head: activeHead, + done: false, + }; + } + + const hist = await findThreadInHistory(storageRoot, threadId); + if (hist !== null) { + return { + workflow: hist.workflow, + thread: threadId, + head: hist.head, + done: true, + }; + } + + fail(`thread not found: ${threadId}`); +} + +async function threadListItemFromActive( + uwf: UwfStore, + threadId: ThreadId, + head: CasRef, +): Promise { + const workflow = resolveWorkflowFromHead(uwf, head); + if (workflow === null) { + return null; + } + return { thread: threadId, workflow, head }; +} + +export async function cmdThreadList( + storageRoot: string, + includeAll: boolean, +): Promise { + const uwf = await createUwfStore(storageRoot); + const index = await loadThreadsIndex(storageRoot); + const items: ThreadListItem[] = []; + + for (const [threadId, head] of Object.entries(index)) { + const item = await threadListItemFromActive(uwf, threadId as ThreadId, head); + if (item !== null) { + items.push(item); + } + } + + if (!includeAll) { + return items; + } + + const activeIds = new Set(items.map((i) => i.thread)); + const history = await loadThreadHistory(storageRoot); + for (const entry of history) { + if (!activeIds.has(entry.thread)) { + items.push({ + thread: entry.thread, + workflow: entry.workflow, + head: entry.head, + }); + } + } + + return items; +} + +export async function cmdThreadKill(storageRoot: string, threadId: ThreadId): Promise { + const index = await loadThreadsIndex(storageRoot); + const head = index[threadId]; + if (head === undefined) { + fail(`thread not active: ${threadId}`); + } + + const uwf = await createUwfStore(storageRoot); + const workflow = resolveWorkflowFromHead(uwf, head); + if (workflow === null) { + fail(`failed to resolve workflow from head: ${head}`); + } + + delete index[threadId]; + await saveThreadsIndex(storageRoot, index); + + const historyEntry: ThreadHistoryLine = { + thread: threadId, + workflow, + head, + completedAt: Date.now(), + }; + await appendThreadHistory(storageRoot, historyEntry); + + return { thread: threadId, archived: true }; } diff --git a/packages/cli-uwf/src/store.ts b/packages/cli-uwf/src/store.ts index f15870b..207234c 100644 --- a/packages/cli-uwf/src/store.ts +++ b/packages/cli-uwf/src/store.ts @@ -1,10 +1,10 @@ -import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises"; import { homedir } from "node:os"; import { join } from "node:path"; import type { Hash, Store } from "@uncaged/json-cas"; import { createFsStore } from "@uncaged/json-cas-fs"; -import type { CasRef } from "@uncaged/uwf-protocol"; +import type { CasRef, ThreadId, ThreadListItem, ThreadsIndex } from "@uncaged/uwf-protocol"; import { parse, stringify } from "yaml"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; @@ -40,6 +40,18 @@ export function getRegistryPath(storageRoot: string): string { return join(storageRoot, "workflows.yaml"); } +export function getThreadsPath(storageRoot: string): string { + return join(storageRoot, "threads.yaml"); +} + +export function getHistoryPath(storageRoot: string): string { + return join(storageRoot, "history.jsonl"); +} + +export type ThreadHistoryLine = ThreadListItem & { + completedAt: number; +}; + export type UwfStore = { storageRoot: string; store: Store; @@ -103,3 +115,101 @@ export function findRegistryName(registry: WorkflowRegistry, hash: Hash): string } return null; } + +export async function loadThreadsIndex(storageRoot: string): Promise { + const path = getThreadsPath(storageRoot); + try { + const text = await readFile(path, "utf8"); + const raw = parse(text) as unknown; + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + return {}; + } + const index: ThreadsIndex = {}; + for (const [threadId, head] of Object.entries(raw as Record)) { + if (typeof head === "string") { + index[threadId as ThreadId] = head; + } + } + return index; + } catch (e) { + const err = e as NodeJS.ErrnoException; + if (err.code === "ENOENT") { + return {}; + } + throw e; + } +} + +export async function saveThreadsIndex(storageRoot: string, index: ThreadsIndex): Promise { + const path = getThreadsPath(storageRoot); + await mkdir(storageRoot, { recursive: true }); + const text = stringify(index, { indent: 2 }); + await writeFile(path, text, "utf8"); +} + +export async function loadThreadHistory(storageRoot: string): Promise { + const path = getHistoryPath(storageRoot); + try { + const text = await readFile(path, "utf8"); + const lines: ThreadHistoryLine[] = []; + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (trimmed === "") { + continue; + } + let raw: unknown; + try { + raw = JSON.parse(trimmed) as unknown; + } catch { + continue; + } + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + continue; + } + const rec = raw as Record; + const thread = rec.thread; + const workflow = rec.workflow; + const head = rec.head; + const completedAt = rec.completedAt; + if ( + typeof thread === "string" && + typeof workflow === "string" && + typeof head === "string" && + typeof completedAt === "number" + ) { + lines.push({ thread: thread as ThreadId, workflow, head, completedAt }); + } + } + return lines; + } catch (e) { + const err = e as NodeJS.ErrnoException; + if (err.code === "ENOENT") { + return []; + } + throw e; + } +} + +export async function findThreadInHistory( + storageRoot: string, + threadId: ThreadId, +): Promise { + const history = await loadThreadHistory(storageRoot); + for (let i = history.length - 1; i >= 0; i--) { + const entry = history[i]; + if (entry !== undefined && entry.thread === threadId) { + return entry; + } + } + return null; +} + +export async function appendThreadHistory( + storageRoot: string, + entry: ThreadHistoryLine, +): Promise { + const path = getHistoryPath(storageRoot); + await mkdir(storageRoot, { recursive: true }); + const line = `${JSON.stringify(entry)}\n`; + await appendFile(path, line, "utf8"); +}