diff --git a/packages/cli-workflow/src/__tests__/config.test.ts b/packages/cli-workflow/src/__tests__/config.test.ts index c8cb0c9..6303aeb 100644 --- a/packages/cli-workflow/src/__tests__/config.test.ts +++ b/packages/cli-workflow/src/__tests__/config.test.ts @@ -720,7 +720,10 @@ defaultModel: default describe("no legacy apiKeyEnv references", () => { test("config.ts has no references to apiKeyEnv", () => { - const configSource = readFileSync(join(__dirname, "..", "..", "src", "commands", "config.ts"), "utf8"); + const configSource = readFileSync( + join(__dirname, "..", "..", "src", "commands", "config.ts"), + "utf8", + ); expect(configSource).not.toContain("apiKeyEnv"); }); diff --git a/packages/cli-workflow/src/__tests__/current-role.test.ts b/packages/cli-workflow/src/__tests__/current-role.test.ts index 25f206c..23e941f 100644 --- a/packages/cli-workflow/src/__tests__/current-role.test.ts +++ b/packages/cli-workflow/src/__tests__/current-role.test.ts @@ -175,8 +175,9 @@ async function insertStepNode( ): Promise { const uwf = await createUwfStore(storageRoot); const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) throw new Error(`thread ${threadId} not in index`); + const headEntry = index[threadId]; + if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); + const head = headEntry.head; const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); @@ -199,7 +200,7 @@ async function insertStepNode( detail: detailHash, })) as CasRef; - index[threadId] = stepHash; + index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; await saveThreadsIndex(storageRoot, index); } @@ -280,7 +281,7 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const index = await loadThreadsIndex(storageRoot); - const head = index[tid]!; + const head = index[tid]!.head; delete index[tid]; await saveThreadsIndex(storageRoot, index); await appendThreadHistory(storageRoot, { @@ -309,7 +310,7 @@ describe("currentRole field", () => { const tid = thread as ThreadId; const index = await loadThreadsIndex(storageRoot); - const head = index[tid]!; + const head = index[tid]!.head; delete index[tid]; await saveThreadsIndex(storageRoot, index); await appendThreadHistory(storageRoot, { @@ -371,7 +372,7 @@ describe("currentRole field", () => { const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const compId = comp.thread as ThreadId; const index = await loadThreadsIndex(storageRoot); - const compHead = index[compId]!; + const compHead = index[compId]!.head; delete index[compId]; await saveThreadsIndex(storageRoot, index); await appendThreadHistory(storageRoot, { diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts index e69328d..0a29d83 100644 --- a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -47,9 +47,7 @@ async function createTestThread( prompt: "test prompt", }; const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); - const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - index[threadId] = headHash; - await saveThreadsIndex(storageRoot, index); + await saveThreadsIndex(storageRoot, { [threadId]: headHash }); return threadId; } @@ -106,7 +104,7 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const thread3Head = index[thread3]; + const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -130,7 +128,7 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const thread3Head = index[thread3]; + const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -154,7 +152,7 @@ describe("cmdThreadList status filter", () => { const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const thread3Head = index[thread3]; + const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -176,7 +174,7 @@ describe("cmdThreadList status filter", () => { await markThreadRunning(tmpDir, thread2, workflowHash); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const thread3Head = index[thread3]; + const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -348,7 +346,7 @@ describe("combined filters", () => { await markThreadRunning(tmpDir, thread2, workflowHash); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const thread3Head = index[thread3]; + const thread3Head = index[thread3]!.head; if (thread3Head === undefined) throw new Error("thread3 head not found"); await completeThread(tmpDir, thread3, workflowHash, thread3Head); @@ -372,7 +370,7 @@ describe("combined filters", () => { const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); threads.push(thread); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const headHash = index[thread]; + const headHash = index[thread]!.head; if (headHash === undefined) throw new Error("head not found"); await completeThread(tmpDir, thread, workflowHash, headHash); } @@ -421,7 +419,7 @@ describe("combined filters", () => { if (i % 2 === 0) { const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - const headHash = index[thread]; + const headHash = index[thread]!.head; if (headHash === undefined) throw new Error("head not found"); await completeThread(tmpDir, thread, workflowHash, headHash); } else { @@ -479,7 +477,11 @@ describe("edge cases", () => { const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); - index["INVALID_ULID_FORMAT_HERE" as ThreadId] = "01J6HMVRNQKJV2"; + index["INVALID_ULID_FORMAT_HERE" as ThreadId] = { + head: "01J6HMVRNQKJV2", + suspendedRole: null, + suspendMessage: null, + }; await saveThreadsIndex(tmpDir, index); const afterMs = Date.now() - 3000; diff --git a/packages/cli-workflow/src/__tests__/thread-location.test.ts b/packages/cli-workflow/src/__tests__/thread-location.test.ts index bf297bb..f0ec957 100644 --- a/packages/cli-workflow/src/__tests__/thread-location.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-location.test.ts @@ -80,7 +80,7 @@ graph: // Verify StartNode has the cwd field const uwf = await createUwfStore(storageRoot); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - const headHash = index[result.thread as ThreadId]; + const headHash = index[result.thread as ThreadId]!.head; expect(headHash).toBeDefined(); const startNode = uwf.store.get(headHash as CasRef); @@ -175,7 +175,7 @@ graph: const uwf = await createUwfStore(storageRoot); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); - const headHash = index[result.thread as ThreadId]; + const headHash = index[result.thread as ThreadId]!.head; const startNode = uwf.store.get(headHash as CasRef); const startPayload = startNode?.payload as StartNodePayload; diff --git a/packages/cli-workflow/src/__tests__/thread-resume.test.ts b/packages/cli-workflow/src/__tests__/thread-resume.test.ts new file mode 100644 index 0000000..b539244 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-resume.test.ts @@ -0,0 +1,442 @@ +import { execFileSync } from "node:child_process"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@ocas/core"; +import { createFsStore } from "@ocas/fs"; +import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { parse } from "yaml"; +import { cmdThreadShow } from "../commands/thread.js"; +import { registerUwfSchemas } from "../schemas.js"; +import { saveThreadsIndex } from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, + required: ["$status"], + additionalProperties: false, +}; + +const THREAD_ID = "01RESUMESTEPTEST0000000" as ThreadId; +const SUSPEND_MESSAGE = "Please clarify: Which API?"; + +type MockAgentMode = "suspend" | "ok"; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-resume-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +async function setupSuspendedThread(mode: MockAgentMode): Promise<{ + casDir: string; + mockAgentPath: string; + promptCapturePath: string; +}> { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-resume", + description: "resume command integration test", + roles: { + worker: { + description: "Worker role", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + reviewer: { + description: "Reviewer role", + goal: "Review", + capabilities: [], + procedure: "review", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Please clarify: {{{question}}}", + location: null, + }, + ok: { role: "reviewer", prompt: "Review the work", location: null }, + }, + reviewer: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test resume task", + cwd: tmpDir, + }); + + await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + + const outputHash = await store.put(outputSchemaHash, { + $status: "needs_input", + question: "Which API?", + }); + const detailHash = await store.put(schemas.text, "mock detail"); + + const startedAtMs = 1716600000000; + const completedAtMs = 1716600001500; + + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs, + completedAtMs, + cwd: tmpDir, + assembledPrompt: null, + }); + + await saveThreadsIndex(tmpDir, { + [THREAD_ID]: { + head: stepHash, + suspendedRole: "worker", + suspendMessage: SUSPEND_MESSAGE, + }, + }); + + const promptCapturePath = join(tmpDir, "captured-prompt.txt"); + const mockAgentPath = join(tmpDir, "mock-agent.sh"); + + const frontmatter = + mode === "suspend" ? { $status: "needs_input", question: "Which API?" } : { $status: "ok" }; + + const adapterJson = JSON.stringify({ + stepHash: await store.put(schemas.stepNode, { + start: startHash, + prev: stepHash, + role: "worker", + output: await store.put(outputSchemaHash, frontmatter), + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "resume prompt placeholder", + startedAtMs: completedAtMs + 1, + completedAtMs: completedAtMs + 2, + cwd: tmpDir, + assembledPrompt: null, + }), + detailHash, + role: "worker", + frontmatter, + body: "", + startedAtMs: completedAtMs + 1, + completedAtMs: completedAtMs + 2, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + const configPath = join(tmpDir, "config.yaml"); + await writeFile( + configPath, + `defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`, + ); + + return { casDir, mockAgentPath, promptCapturePath }; +} + +function runUwf( + args: string[], + casDir: string, +): { stdout: string; stderr: string; status: number } { + const cliPath = join(import.meta.dirname, "..", "cli.js"); + try { + const stdout = execFileSync("bun", ["run", cliPath, ...args], { + encoding: "utf8", + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + WORKFLOW_STORAGE_ROOT: tmpDir, + UNCAGED_CAS_DIR: casDir, + }, + cwd: tmpDir, + timeout: 30000, + }); + return { stdout, stderr: "", status: 0 }; + } catch (error) { + const err = error as NodeJS.ErrnoException & { + stdout?: string | Buffer; + stderr?: string | Buffer; + status?: number; + }; + return { + stdout: typeof err.stdout === "string" ? err.stdout : (err.stdout?.toString("utf8") ?? ""), + stderr: typeof err.stderr === "string" ? err.stderr : (err.stderr?.toString("utf8") ?? ""), + status: err.status ?? 1, + }; + } +} + +describe("uwf thread resume", () => { + test("resume non-suspended thread returns error", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + + const workflowHash = await store.put(schemas.workflow, { + name: "idle-workflow", + description: "idle thread", + roles: { + worker: { + description: "Worker", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: await putSchema(store, OUTPUT_SCHEMA), + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start", location: null } }, + worker: { _: { role: "$END", prompt: "Done", location: null } }, + }, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "task", + cwd: tmpDir, + }); + + await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash }); + + const result = runUwf(["thread", "resume", THREAD_ID], casDir); + expect(result.status).not.toBe(0); + expect(result.stderr).toContain("thread is not suspended"); + }); + + test("resume suspended thread executes step and becomes idle", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(result.status).toBe(0); + + const cliOutput = JSON.parse(result.stdout.trim()); + expect(cliOutput.status).toBe("idle"); + expect(cliOutput.currentRole).toBe("reviewer"); + expect(cliOutput.suspendedRole).toBeNull(); + expect(cliOutput.suspendMessage).toBeNull(); + expect(cliOutput.done).toBe(false); + + const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); + const threadsIndex = parse(threadsYaml) as Record; + expect(threadsIndex[THREAD_ID]).toBe(cliOutput.head); + + const showResult = await cmdThreadShow(tmpDir, THREAD_ID); + expect(showResult.status).toBe("idle"); + expect(showResult.suspendedRole).toBeNull(); + expect(showResult.suspendMessage).toBeNull(); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("resume without -p uses suspend message as agent prompt", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(result.status).toBe(0); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(SUSPEND_MESSAGE); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("resume with -p appends supplementary info to agent prompt", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const supplement = "Use the REST API."; + const result = runUwf( + ["thread", "resume", THREAD_ID, "-p", supplement, "--agent", mockAgentPath], + casDir, + ); + expect(result.status).toBe(0); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(`${SUSPEND_MESSAGE}\n\n${supplement}`); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); + + test("multiple suspend/resume cycles", async () => { + const originalCasDir = process.env.UNCAGED_CAS_DIR; + const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("suspend"); + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const firstResult = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir); + expect(firstResult.status).toBe(0); + const firstResume = JSON.parse(firstResult.stdout.trim()); + expect(firstResume.status).toBe("suspended"); + expect(firstResume.suspendedRole).toBe("worker"); + expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE); + + const threadsAfterFirst = parse( + await readFile(join(tmpDir, "threads.yaml"), "utf8"), + ) as Record; + expect(threadsAfterFirst[THREAD_ID]).toEqual({ + head: firstResume.head, + suspendedRole: "worker", + suspendMessage: SUSPEND_MESSAGE, + }); + + const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent( + casDir, + firstResume.head as CasRef, + ); + + const secondResult = runUwf( + ["thread", "resume", THREAD_ID, "--agent", okMockAgentPath], + casDir, + ); + expect(secondResult.status).toBe(0); + const secondResume = JSON.parse(secondResult.stdout.trim()); + expect(secondResume.status).toBe("idle"); + expect(secondResume.currentRole).toBe("reviewer"); + expect(secondResume.suspendedRole).toBeNull(); + expect(secondResume.suspendMessage).toBeNull(); + + const capturedPrompt = await readFile(promptCapturePath, "utf8"); + expect(capturedPrompt).toBe(SUSPEND_MESSAGE); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); +}); + +async function setupOkMockAgent( + casDir: string, + prevHead: CasRef, +): Promise<{ mockAgentPath: string }> { + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const prevNode = store.get(prevHead); + if (prevNode === null || prevNode.type !== schemas.stepNode) { + throw new Error(`expected StepNode at ${prevHead}`); + } + const prevPayload = prevNode.payload as StepNodePayload; + + const outputHash = await store.put(outputSchemaHash, { $status: "ok" }); + const detailHash = await store.put(schemas.text, "ok detail"); + const startedAtMs = Date.now(); + const completedAtMs = startedAtMs + 1; + + const stepHash = await store.put(schemas.stepNode, { + start: prevPayload.start, + prev: prevHead, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "resume", + startedAtMs, + completedAtMs, + cwd: tmpDir, + assembledPrompt: null, + }); + + const promptCapturePath = join(tmpDir, "captured-prompt.txt"); + const mockAgentPath = join(tmpDir, "mock-agent-ok.sh"); + const adapterJson = JSON.stringify({ + stepHash, + detailHash, + role: "worker", + frontmatter: { $status: "ok" }, + body: "", + startedAtMs, + completedAtMs, + }); + + await writeFile( + mockAgentPath, + `#!/bin/sh +prompt="" +while [ $# -gt 0 ]; do + if [ "$1" = "--prompt" ]; then + prompt="$2" + shift 2 + else + shift + fi +done +printf '%s' "$prompt" > '${promptCapturePath}' +echo '${adapterJson}' +`, + { mode: 0o755 }, + ); + + return { mockAgentPath }; +} diff --git a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts index 80137f7..eb24764 100644 --- a/packages/cli-workflow/src/__tests__/thread-show-status.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-show-status.test.ts @@ -90,8 +90,9 @@ async function insertStepNode( ): Promise { const uwf = await createUwfStore(storageRoot); const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) throw new Error(`thread ${threadId} not in index`); + const headEntry = index[threadId]; + if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`); + const head = headEntry.head; const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); @@ -116,7 +117,7 @@ async function insertStepNode( assembledPrompt: null, })) as CasRef; - index[threadId] = stepHash; + index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; await saveThreadsIndex(storageRoot, index); } @@ -203,7 +204,7 @@ describe("thread show status field", () => { // Get the head hash before moving to history const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; + const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); // Move thread to history with reason 'completed' @@ -243,7 +244,7 @@ describe("thread show status field", () => { // Get the head hash before moving to history const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; + const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); // Move thread to history with reason 'cancelled' @@ -283,7 +284,7 @@ describe("thread show status field", () => { // Get the head hash before moving to history const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; + const head = index[threadId]!.head; if (!head) throw new Error("Thread not found in index"); // Move thread to history with reason null (legacy format) @@ -333,6 +334,8 @@ describe("thread show status field", () => { expect(result.status).toBe("suspended"); expect(result.done).toBe(false); expect(result.currentRole).toBe(null); + expect(result.suspendedRole).toBe("worker"); + expect(result.suspendMessage).toBe("Please clarify: Which API?"); expect(result.background).toBe(null); expect(result.thread).toBe(threadId); } finally { diff --git a/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts b/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts index 2da3ff0..c324ec5 100644 --- a/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts +++ b/packages/cli-workflow/src/__tests__/thread-start-cwd-cli.test.ts @@ -75,7 +75,7 @@ graph: async function getStartNodeCwd(threadId: string): Promise { const uwf = await createUwfStore(storageRoot); const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId as ThreadId]; + const headHash = index[threadId as ThreadId]!.head; expect(headHash).toBeDefined(); const startNode = uwf.store.get(headHash as CasRef); diff --git a/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts b/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts new file mode 100644 index 0000000..74ce2b3 --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-suspend-step.test.ts @@ -0,0 +1,179 @@ +import { execFileSync } from "node:child_process"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { putSchema } from "@ocas/core"; +import { createFsStore } from "@ocas/fs"; +import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { parse } from "yaml"; +import { cmdThreadShow } from "../commands/thread.js"; +import { registerUwfSchemas } from "../schemas.js"; +import { saveThreadsIndex } from "../store.js"; + +const OUTPUT_SCHEMA = { + type: "object" as const, + properties: { + $status: { type: "string" as const }, + question: { type: "string" as const }, + }, + required: ["$status"], + additionalProperties: false, +}; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspend-step-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("suspend step CAS chain and threads.yaml metadata", () => { + test("thread exec records suspend step in CAS and suspend metadata in threads.yaml", async () => { + const casDir = join(tmpDir, "cas"); + await mkdir(casDir, { recursive: true }); + const originalCasDir = process.env.UNCAGED_CAS_DIR; + process.env.UNCAGED_CAS_DIR = casDir; + + try { + const store = createFsStore(casDir); + const schemas = await registerUwfSchemas(store); + + const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA); + + const workflowHash = await store.put(schemas.workflow, { + name: "test-suspend-step", + description: "suspend step integration test", + roles: { + worker: { + description: "Worker role", + goal: "Work", + capabilities: [], + procedure: "work", + output: "result", + frontmatter: outputSchemaHash, + }, + }, + graph: { + $START: { _: { role: "worker", prompt: "Start work", location: null } }, + worker: { + needs_input: { + role: "$SUSPEND", + prompt: "Please clarify: {{{question}}}", + location: null, + }, + }, + }, + }); + + const startHash = await store.put(schemas.startNode, { + workflow: workflowHash, + prompt: "Test suspend task", + cwd: tmpDir, + }); + + const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId; + await saveThreadsIndex(tmpDir, { [threadId]: startHash }); + + const outputHash = await store.put(outputSchemaHash, { + $status: "needs_input", + question: "Which API?", + }); + const detailHash = await store.put(schemas.text, "mock detail"); + + const startedAtMs = 1716600000000; + const completedAtMs = 1716600001500; + + const stepHash = await store.put(schemas.stepNode, { + start: startHash, + prev: null, + role: "worker", + output: outputHash, + detail: detailHash, + agent: "uwf-mock", + edgePrompt: "Start work", + startedAtMs, + completedAtMs, + cwd: tmpDir, + assembledPrompt: null, + }); + + const mockAgentPath = join(tmpDir, "mock-agent.sh"); + const adapterJson = JSON.stringify({ + stepHash, + detailHash, + role: "worker", + frontmatter: { $status: "needs_input", question: "Which API?" }, + body: "", + startedAtMs, + completedAtMs, + }); + await writeFile(mockAgentPath, `#!/bin/sh\necho '${adapterJson}'\n`, { mode: 0o755 }); + + const configPath = join(tmpDir, "config.yaml"); + await writeFile( + configPath, + `defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`, + ); + + const cliPath = join(import.meta.dirname, "..", "cli.js"); + const stdout = execFileSync( + "bun", + ["run", cliPath, "thread", "exec", threadId, "--agent", mockAgentPath], + { + encoding: "utf8", + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + WORKFLOW_STORAGE_ROOT: tmpDir, + UNCAGED_CAS_DIR: casDir, + }, + cwd: tmpDir, + timeout: 30000, + }, + ); + + const cliOutput = JSON.parse(stdout.trim()); + expect(cliOutput.status).toBe("suspended"); + expect(cliOutput.head).toBe(stepHash); + expect(cliOutput.suspendedRole).toBe("worker"); + expect(cliOutput.suspendMessage).toBe("Please clarify: Which API?"); + + const storeAfter = createFsStore(casDir); + const stepNode = storeAfter.get(cliOutput.head as CasRef); + expect(stepNode).not.toBeNull(); + const payload = stepNode!.payload as StepNodePayload; + expect(payload.role).toBe("worker"); + expect(payload.output).toBe(outputHash); + + const outputNode = storeAfter.get(outputHash); + expect(outputNode?.payload).toEqual({ + $status: "needs_input", + question: "Which API?", + }); + + const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8"); + const threadsIndex = parse(threadsYaml) as Record; + const threadEntry = threadsIndex[threadId]; + expect(threadEntry).toEqual({ + head: stepHash, + suspendedRole: "worker", + suspendMessage: "Please clarify: Which API?", + }); + + const showResult = await cmdThreadShow(tmpDir, threadId); + expect(showResult.status).toBe("suspended"); + expect(showResult.suspendMessage).toBe("Please clarify: Which API?"); + expect(showResult.suspendedRole).toBe("worker"); + } finally { + if (originalCasDir === undefined) { + delete process.env.UNCAGED_CAS_DIR; + } else { + process.env.UNCAGED_CAS_DIR = originalCasDir; + } + } + }); +}); diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index 26dc0d8..1ba0fed 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -30,6 +30,7 @@ import { cmdThreadExec, cmdThreadList, cmdThreadRead, + cmdThreadResume, cmdThreadShow, cmdThreadStart, cmdThreadStop, @@ -280,6 +281,27 @@ thread }, ); +thread + .command("resume") + .description("Resume a suspended thread and re-run the suspended role") + .argument("", "Thread ULID") + .option("-p, --prompt ", "Supplementary info to append to the resume prompt") + .option("--agent ", "Override agent command") + .action((threadId: string, opts: { prompt: string | undefined; agent: string | undefined }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const supplement = opts.prompt ?? null; + const agentOverride = opts.agent ?? null; + const result = await cmdThreadResume( + storageRoot, + threadId as ThreadId, + supplement, + agentOverride, + ); + writeOutput(result); + }); + }); + thread .command("stop") .description("Stop background execution of a thread (keep thread active)") diff --git a/packages/cli-workflow/src/commands/shared.ts b/packages/cli-workflow/src/commands/shared.ts index fbb3df9..b39a74a 100644 --- a/packages/cli-workflow/src/commands/shared.ts +++ b/packages/cli-workflow/src/commands/shared.ts @@ -203,7 +203,7 @@ function collectOrderedSteps( async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); - const activeHead = index[threadId]; + const activeHead = index[threadId]?.head; if (activeHead !== undefined) { return activeHead; } diff --git a/packages/cli-workflow/src/commands/step.ts b/packages/cli-workflow/src/commands/step.ts index e304c79..01ff291 100644 --- a/packages/cli-workflow/src/commands/step.ts +++ b/packages/cli-workflow/src/commands/step.ts @@ -113,7 +113,7 @@ export async function cmdStepFork( const newThreadId = generateUlid(Date.now()) as ThreadId; const index = await loadThreadsIndex(storageRoot); - index[newThreadId] = stepHash; + index[newThreadId] = { head: stepHash, suspendedRole: null, suspendMessage: null }; await saveThreadsIndex(storageRoot, index); return { diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index 92e4c43..018ce87 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -11,12 +11,18 @@ import type { StepNodePayload, StepOutput, ThreadId, + ThreadIndexEntry, ThreadListItem, ThreadStatus, ThreadsIndex, WorkflowConfig, WorkflowPayload, } from "@uncaged/workflow-protocol"; +import { + createThreadIndexEntry, + markThreadSuspended, + updateThreadHead, +} from "@uncaged/workflow-protocol"; import { createProcessLogger, extractUlidTimestamp, @@ -68,8 +74,15 @@ function buildStepOutputFromEvaluation( ): StepOutput { const done = status === "completed"; let currentRole: string | null = null; - if (evaluation.ok && !isSuspendResult(evaluation.value) && evaluation.value.role !== END_ROLE) { - currentRole = evaluation.value.role; + let suspendedRole: string | null = null; + let suspendMessage: string | null = null; + if (evaluation.ok) { + if (isSuspendResult(evaluation.value)) { + suspendedRole = evaluation.value.suspendedRole; + suspendMessage = evaluation.value.prompt; + } else if (evaluation.value.role !== END_ROLE) { + currentRole = evaluation.value.role; + } } return { workflow: workflowHash, @@ -77,11 +90,68 @@ function buildStepOutputFromEvaluation( head, status, currentRole, + suspendedRole, + suspendMessage, done, background, }; } +function resolveSuspendFieldsFromGraph( + uwf: UwfStore, + head: CasRef, + workflowRef: CasRef, +): { suspendedRole: string | null; suspendMessage: string | null } { + const chain = walkChain(uwf, head); + const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); + const workflow = loadWorkflowPayload(uwf, workflowRef); + const result = evaluate(workflow.graph, lastRole, lastOutput); + if (result.ok && isSuspendResult(result.value)) { + return { + suspendedRole: result.value.suspendedRole, + suspendMessage: result.value.prompt, + }; + } + return { suspendedRole: null, suspendMessage: null }; +} + +function resolveSuspendFieldsForShow( + entry: ThreadIndexEntry, + status: ThreadStatus, + uwf: UwfStore, + head: CasRef, + workflowRef: CasRef, +): { suspendedRole: string | null; suspendMessage: string | null } { + if (status !== "suspended") { + return { suspendedRole: null, suspendMessage: null }; + } + if (entry.suspendedRole !== null && entry.suspendMessage !== null) { + return { suspendedRole: entry.suspendedRole, suspendMessage: entry.suspendMessage }; + } + const fromGraph = resolveSuspendFieldsFromGraph(uwf, head, workflowRef); + return { + suspendedRole: entry.suspendedRole ?? fromGraph.suspendedRole, + suspendMessage: entry.suspendMessage ?? fromGraph.suspendMessage, + }; +} + +async function ensureThreadSuspendMetadata( + storageRoot: string, + threadId: ThreadId, + entry: ThreadIndexEntry, + suspendedRole: string, + suspendMessage: string, +): Promise { + if (entry.suspendedRole !== null && entry.suspendMessage !== null) { + return entry; + } + const updated = markThreadSuspended(entry, suspendedRole, suspendMessage); + const index = await loadThreadsIndex(storageRoot); + index[threadId] = updated; + await saveThreadsIndex(storageRoot, index); + return updated; +} + async function resolveActiveThreadStatus( storageRoot: string, threadId: ThreadId, @@ -130,6 +200,25 @@ const PL_AGENT_DONE = "C6P9E3H7"; const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_STEP_ERROR = "B8T5N1V6"; const PL_BACKGROUND_START = "X7Q4W9M2"; +const PL_THREAD_RESUME = "K2R7M4N8"; + +type ResumeStepConfig = { + role: string; + prompt: string; +}; + +type AgentStepTarget = { + role: string; + edgePrompt: string; + effectiveCwd: string; +}; + +function buildResumePrompt(graphPrompt: string, supplement: string | null): string { + if (supplement === null || supplement === "") { + return graphPrompt; + } + return `${graphPrompt}\n\n${supplement}`; +} function failStep(plog: ProcessLogger, message: string): never { plog.log(PL_STEP_ERROR, message, null); @@ -380,7 +469,7 @@ export async function cmdThreadStart( } const index = await loadThreadsIndex(storageRoot); - index[threadId] = headHash; + index[threadId] = createThreadIndexEntry(headHash); await saveThreadsIndex(storageRoot, index); plog.log( @@ -394,8 +483,9 @@ export async function cmdThreadStart( export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); - const activeHead = index[threadId]; - if (activeHead !== undefined) { + const entry = index[threadId]; + if (entry !== undefined) { + const activeHead = entry.head; const uwf = await createUwfStore(storageRoot); const workflow = resolveWorkflowFromHead(uwf, activeHead); if (workflow === null) { @@ -410,6 +500,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr workflow, ); const currentRole = resolveCurrentRole(uwf, activeHead, workflow); + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow); return { workflow, @@ -417,6 +508,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr head: activeHead, status, currentRole, + suspendedRole: suspendFields.suspendedRole, + suspendMessage: suspendFields.suspendMessage, done: false, background: null, }; @@ -432,6 +525,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr head: hist.head, status, currentRole: null, + suspendedRole: null, + suspendMessage: null, done: true, background: null, }; @@ -473,13 +568,8 @@ async function collectActiveThreads( index: ThreadsIndex, ): Promise { const items: ThreadListItemWithStatus[] = []; - for (const [threadId, head] of Object.entries(index)) { - const item = await threadListItemFromActive( - storageRoot, - uwf, - threadId as ThreadId, - head as CasRef, - ); + for (const [threadId, entry] of Object.entries(index)) { + const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, entry.head); if (item !== null) { items.push(item); } @@ -945,6 +1035,65 @@ async function archiveThread( }); } +export async function cmdThreadResume( + storageRoot: string, + threadId: ThreadId, + supplement: string | null, + agentOverride: string | null, +): Promise { + const runningMarker = await isThreadRunning(storageRoot, threadId); + if (runningMarker !== null) { + fail(`thread already executing in background (PID: ${runningMarker.pid})`); + } + + const index = await loadThreadsIndex(storageRoot); + const entry = index[threadId]; + if (entry === undefined) { + fail(`thread not active: ${threadId}`); + } + + const uwf = await createUwfStore(storageRoot); + const headHash = entry.head; + const chain = walkChain(uwf, headHash); + const workflowHash = chain.start.workflow; + + const status = await resolveActiveThreadStatus( + storageRoot, + threadId, + uwf, + headHash, + workflowHash, + ); + if (status !== "suspended") { + fail(`thread is not suspended: ${threadId} (status: ${status})`); + } + + const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash); + if (suspendFields.suspendedRole === null) { + fail(`thread is suspended but suspendedRole is missing: ${threadId}`); + } + if (suspendFields.suspendMessage === null) { + fail(`thread is suspended but suspendMessage is missing: ${threadId}`); + } + + const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement); + const plog = createProcessLogger({ + storageRoot, + context: { thread: threadId, workflow: workflowHash }, + }); + + plog.log( + PL_THREAD_RESUME, + `resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`, + null, + ); + + return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, { + role: suspendFields.suspendedRole, + prompt: resumePrompt, + }); +} + export async function cmdThreadExec( storageRoot: string, threadId: ThreadId, @@ -1011,12 +1160,12 @@ async function resolveActiveThreadWorkflowHash( threadId: ThreadId, ): Promise { const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]; - if (headHash === undefined) { + const entry = index[threadId]; + if (entry === undefined) { fail(`thread not active: ${threadId}`); } const uwf = await createUwfStore(storageRoot); - const chain = walkChain(uwf, headHash); + const chain = walkChain(uwf, entry.head); return chain.start.workflow; } @@ -1030,10 +1179,11 @@ async function cmdThreadStepBackground( ): Promise { // Get current head to return to caller const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]; - if (headHash === undefined) { + const entry = index[threadId]; + if (entry === undefined) { failStep(plog, `thread not active: ${threadId}`); } + const headHash = entry.head; const uwf = await createUwfStore(storageRoot); @@ -1069,30 +1219,42 @@ async function cmdThreadStepBackground( head: headHash, status: "running", currentRole: resolveCurrentRole(uwf, headHash, workflowHash), + suspendedRole: null, + suspendMessage: null, done: false, background: true, }, ]; } -async function cmdThreadStepOnce( +function resolveResumeStepTarget( + resume: ResumeStepConfig, + chain: ChainState, + threadCwd: string, + plog: ProcessLogger, +): AgentStepTarget { + const lastStep = chain.stepsNewestFirst[0]; + plog.log(PL_MODERATOR, `resume role=${resume.role} prompt=${resume.prompt}`, null); + return { + role: resume.role, + edgePrompt: resume.prompt, + effectiveCwd: lastStep !== undefined && lastStep.cwd !== "" ? lastStep.cwd : threadCwd, + }; +} + +async function resolveModeratorStepTarget( storageRoot: string, threadId: ThreadId, - agentOverride: string | null, + entry: ThreadIndexEntry, + headHash: CasRef, + workflowHash: CasRef, + workflow: WorkflowPayload, + uwf: UwfStore, + chain: ChainState, + threadCwd: string, plog: ProcessLogger, -): Promise { - const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]; - if (headHash === undefined) { - failStep(plog, `thread not active: ${threadId}`); - } - - const uwf = await createUwfStore(storageRoot); - const chain = walkChain(uwf, headHash); - const workflowHash = chain.start.workflow; - const workflow = loadWorkflowPayload(uwf, workflowHash); +): Promise { const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); - const nextResult = evaluate(workflow.graph, lastRole, lastOutput); if (!nextResult.ok) { failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`); @@ -1109,6 +1271,13 @@ async function cmdThreadStepOnce( ); if (isSuspendResult(nextResult.value)) { + await ensureThreadSuspendMetadata( + storageRoot, + threadId, + entry, + nextResult.value.suspendedRole, + nextResult.value.prompt, + ); return buildStepOutputFromEvaluation( workflowHash, threadId, @@ -1128,41 +1297,32 @@ async function cmdThreadStepOnce( head: headHash, status: "completed", currentRole: null, + suspendedRole: null, + suspendMessage: null, done: true, background: null, }; } - const role = nextResult.value.role; - const edgePrompt = nextResult.value.prompt; + return { + role: nextResult.value.role, + edgePrompt: nextResult.value.prompt, + effectiveCwd: nextResult.value.location !== null ? nextResult.value.location : threadCwd, + }; +} - // Resolve cwd: use edge location if provided, otherwise inherit thread.cwd - const threadCwd = chain.start.cwd; - const effectiveCwd = nextResult.value.location !== null ? nextResult.value.location : threadCwd; - - const config = await loadWorkflowConfig(storageRoot); - const agent = resolveAgentConfig(config, workflow, role, agentOverride); - - plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { - args: [...agent.args, threadId, role].join(" "), - }); - - loadDotenv({ path: getEnvPath(storageRoot) }); - const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd); - const newHead = agentResult.stepHash as CasRef; - - plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null); - - // Re-create store to pick up nodes written by the agent subprocess - const uwfAfter = await createUwfStore(storageRoot); - const newNode = uwfAfter.store.get(newHead); - if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) { - failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`); - } - - // Reload threads index to avoid overwriting changes made by the agent subprocess +async function finalizeAgentStep( + storageRoot: string, + threadId: ThreadId, + workflowHash: CasRef, + workflow: WorkflowPayload, + newHead: CasRef, + uwfAfter: UwfStore, + plog: ProcessLogger, +): Promise { const freshIndex = await loadThreadsIndex(storageRoot); - freshIndex[threadId] = newHead; + const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead); + freshIndex[threadId] = updateThreadHead(priorEntry, newHead); await saveThreadsIndex(storageRoot, freshIndex); const chainAfter = walkChain(uwfAfter, newHead); @@ -1176,6 +1336,12 @@ async function cmdThreadStepOnce( } if (isSuspendResult(afterResult.value)) { + freshIndex[threadId] = markThreadSuspended( + freshIndex[threadId] ?? createThreadIndexEntry(newHead), + afterResult.value.suspendedRole, + afterResult.value.prompt, + ); + await saveThreadsIndex(storageRoot, freshIndex); return buildStepOutputFromEvaluation( workflowHash, threadId, @@ -1192,7 +1358,6 @@ async function cmdThreadStepOnce( await archiveThread(storageRoot, threadId, workflowHash, newHead); } - // Determine status based on whether thread is done and running state const status: ThreadStatus = done ? "completed" : "idle"; const currentRole = done ? null : afterResult.value.role; @@ -1202,14 +1367,80 @@ async function cmdThreadStepOnce( head: newHead, status, currentRole, + suspendedRole: null, + suspendMessage: null, done, background: null, }; } +async function cmdThreadStepOnce( + storageRoot: string, + threadId: ThreadId, + agentOverride: string | null, + plog: ProcessLogger, + resume: ResumeStepConfig | null = null, +): Promise { + const index = await loadThreadsIndex(storageRoot); + const entry = index[threadId]; + if (entry === undefined) { + failStep(plog, `thread not active: ${threadId}`); + } + const headHash = entry.head; + + const uwf = await createUwfStore(storageRoot); + const chain = walkChain(uwf, headHash); + const workflowHash = chain.start.workflow; + const workflow = loadWorkflowPayload(uwf, workflowHash); + const threadCwd = chain.start.cwd; + + const targetOrOutput = + resume !== null + ? resolveResumeStepTarget(resume, chain, threadCwd, plog) + : await resolveModeratorStepTarget( + storageRoot, + threadId, + entry, + headHash, + workflowHash, + workflow, + uwf, + chain, + threadCwd, + plog, + ); + + if ("status" in targetOrOutput) { + return targetOrOutput; + } + + const { role, edgePrompt, effectiveCwd } = targetOrOutput; + + const config = await loadWorkflowConfig(storageRoot); + const agent = resolveAgentConfig(config, workflow, role, agentOverride); + + plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { + args: [...agent.args, threadId, role].join(" "), + }); + + loadDotenv({ path: getEnvPath(storageRoot) }); + const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd); + const newHead = agentResult.stepHash as CasRef; + + plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null); + + const uwfAfter = await createUwfStore(storageRoot); + const newNode = uwfAfter.store.get(newHead); + if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) { + failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`); + } + + return finalizeAgentStep(storageRoot, threadId, workflowHash, workflow, newHead, uwfAfter, plog); +} + async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); - const activeHead = index[threadId]; + const activeHead = index[threadId]?.head; if (activeHead !== undefined) { return activeHead; } @@ -1262,8 +1493,8 @@ export type CancelOutput = { */ export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise { const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) { + const entry = index[threadId]; + if (entry === undefined) { fail(`thread not active: ${threadId}`); } @@ -1292,10 +1523,11 @@ export async function cmdThreadCancel( threadId: ThreadId, ): Promise { const index = await loadThreadsIndex(storageRoot); - const head = index[threadId]; - if (head === undefined) { + const entry = index[threadId]; + if (entry === undefined) { fail(`thread not active: ${threadId}`); } + const head = entry.head; // Check if thread is running in background and terminate it const runningMarker = await isThreadRunning(storageRoot, threadId); diff --git a/packages/cli-workflow/src/store.ts b/packages/cli-workflow/src/store.ts index 48cf232..7544cc1 100644 --- a/packages/cli-workflow/src/store.ts +++ b/packages/cli-workflow/src/store.ts @@ -5,7 +5,18 @@ import { join } from "node:path"; import type { BootstrapCapableStore, Hash } from "@ocas/core"; import { createFsStore } from "@ocas/fs"; -import type { CasRef, ThreadId, ThreadListItem, ThreadsIndex } from "@uncaged/workflow-protocol"; +import type { + CasRef, + ThreadId, + ThreadIndexEntry, + ThreadListItem, + ThreadsIndex, +} from "@uncaged/workflow-protocol"; +import { + createThreadIndexEntry, + parseThreadsIndex, + serializeThreadsIndex, +} from "@uncaged/workflow-protocol"; import { parse, stringify } from "yaml"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; @@ -234,16 +245,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise)) { - if (typeof head === "string") { - index[threadId as ThreadId] = head; - } - } - return index; + return parseThreadsIndex(raw); } catch (e) { const err = e as NodeJS.ErrnoException; if (err.code === "ENOENT") { @@ -253,10 +255,25 @@ export async function loadThreadsIndex(storageRoot: string): Promise { +/** Accept legacy CasRef values for test convenience. */ +export type ThreadsIndexInput = Record; + +function normalizeThreadsIndexInput(index: ThreadsIndexInput): ThreadsIndex { + const normalized: ThreadsIndex = {}; + for (const [threadId, value] of Object.entries(index)) { + normalized[threadId as ThreadId] = + typeof value === "string" ? createThreadIndexEntry(value as CasRef) : value; + } + return normalized; +} + +export async function saveThreadsIndex( + storageRoot: string, + index: ThreadsIndexInput, +): Promise { const path = getThreadsPath(storageRoot); await mkdir(storageRoot, { recursive: true }); - const text = stringify(index, { indent: 2 }); + const text = stringify(serializeThreadsIndex(normalizeThreadsIndexInput(index)), { indent: 2 }); await writeFile(path, text, "utf8"); } diff --git a/packages/workflow-protocol/src/__tests__/thread-index.test.ts b/packages/workflow-protocol/src/__tests__/thread-index.test.ts new file mode 100644 index 0000000..2d58e7e --- /dev/null +++ b/packages/workflow-protocol/src/__tests__/thread-index.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, test } from "vitest"; +import { + createThreadIndexEntry, + markThreadSuspended, + normalizeThreadIndexEntry, + parseThreadsIndex, + serializeThreadIndexEntry, + serializeThreadsIndex, + updateThreadHead, +} from "../thread-index.js"; + +describe("thread-index", () => { + test("parse legacy string head hash", () => { + const entry = normalizeThreadIndexEntry("0123456789ABC"); + expect(entry).toEqual({ + head: "0123456789ABC", + suspendedRole: null, + suspendMessage: null, + }); + }); + + test("parse suspended object entry", () => { + const entry = normalizeThreadIndexEntry({ + head: "0123456789ABC", + suspendedRole: "worker", + suspendMessage: "Please clarify: Which API?", + }); + expect(entry).toEqual({ + head: "0123456789ABC", + suspendedRole: "worker", + suspendMessage: "Please clarify: Which API?", + }); + }); + + test("serialize non-suspended entry as compact string", () => { + const entry = createThreadIndexEntry("0123456789ABC"); + expect(serializeThreadIndexEntry(entry)).toBe("0123456789ABC"); + }); + + test("serialize suspended entry as object", () => { + const entry = markThreadSuspended( + createThreadIndexEntry("0123456789ABC"), + "worker", + "Please clarify: Which API?", + ); + expect(serializeThreadIndexEntry(entry)).toEqual({ + head: "0123456789ABC", + suspendedRole: "worker", + suspendMessage: "Please clarify: Which API?", + }); + }); + + test("updateThreadHead clears suspend metadata", () => { + const suspended = markThreadSuspended( + createThreadIndexEntry("OLDHEAD0123456"), + "worker", + "Waiting", + ); + const resumed = updateThreadHead(suspended, "NEWHEAD01234567"); + expect(resumed).toEqual({ + head: "NEWHEAD01234567", + suspendedRole: null, + suspendMessage: null, + }); + }); + + test("parseThreadsIndex round-trip", () => { + const raw = { + "01THREAD0000000000000001": "HEAD00000000001", + "01THREAD0000000000000002": { + head: "HEAD00000000002", + suspendedRole: "reviewer", + suspendMessage: "Need input", + }, + }; + const parsed = parseThreadsIndex(raw); + expect(serializeThreadsIndex(parsed)).toEqual(raw); + }); +}); diff --git a/packages/workflow-protocol/src/index.ts b/packages/workflow-protocol/src/index.ts index bd9a358..9f8bffa 100644 --- a/packages/workflow-protocol/src/index.ts +++ b/packages/workflow-protocol/src/index.ts @@ -3,6 +3,15 @@ export { STEP_NODE_SCHEMA, WORKFLOW_SCHEMA, } from "./schemas.js"; +export { + createThreadIndexEntry, + markThreadSuspended, + normalizeThreadIndexEntry, + parseThreadsIndex, + serializeThreadIndexEntry, + serializeThreadsIndex, + updateThreadHead, +} from "./thread-index.js"; export type { AgentAlias, AgentConfig, @@ -29,6 +38,7 @@ export type { Target, ThreadForkOutput, ThreadId, + ThreadIndexEntry, ThreadListItem, ThreadStatus, ThreadStepsOutput, diff --git a/packages/workflow-protocol/src/thread-index.ts b/packages/workflow-protocol/src/thread-index.ts new file mode 100644 index 0000000..5fcf7d9 --- /dev/null +++ b/packages/workflow-protocol/src/thread-index.ts @@ -0,0 +1,89 @@ +import type { CasRef, ThreadId, ThreadIndexEntry, ThreadsIndex } from "./types.js"; + +/** Normalize a legacy head hash or entry object into {@link ThreadIndexEntry}. */ +export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null { + if (typeof raw === "string") { + return createThreadIndexEntry(raw as CasRef); + } + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + return null; + } + const rec = raw as Record; + const head = rec.head; + if (typeof head !== "string") { + return null; + } + const suspendedRole = rec.suspendedRole; + const suspendMessage = rec.suspendMessage; + return { + head: head as CasRef, + suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null, + suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null, + }; +} + +export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry { + return { + head, + suspendedRole: null, + suspendMessage: null, + }; +} + +export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): ThreadIndexEntry { + return { + head, + suspendedRole: null, + suspendMessage: null, + }; +} + +export function markThreadSuspended( + entry: ThreadIndexEntry, + suspendedRole: string, + suspendMessage: string, +): ThreadIndexEntry { + return { + head: entry.head, + suspendedRole, + suspendMessage, + }; +} + +/** Serialize for threads.yaml — compact string when not suspended. */ +export function serializeThreadIndexEntry( + entry: ThreadIndexEntry, +): string | Record { + if (entry.suspendedRole === null || entry.suspendMessage === null) { + return entry.head; + } + return { + head: entry.head, + suspendedRole: entry.suspendedRole, + suspendMessage: entry.suspendMessage, + }; +} + +export function parseThreadsIndex(raw: unknown): ThreadsIndex { + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + return {}; + } + const index: ThreadsIndex = {}; + for (const [threadId, value] of Object.entries(raw as Record)) { + const entry = normalizeThreadIndexEntry(value); + if (entry !== null) { + index[threadId as ThreadId] = entry; + } + } + return index; +} + +export function serializeThreadsIndex( + index: ThreadsIndex, +): Record> { + const out: Record> = {}; + for (const [threadId, entry] of Object.entries(index)) { + out[threadId] = serializeThreadIndexEntry(entry); + } + return out; +} diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index 87259e1..a5e8d95 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -105,10 +105,21 @@ export type StepOutput = { status: ThreadStatus; /** The current or next role. Null when completed, cancelled, suspended, or next is $END. */ currentRole: string | null; + /** Role whose output triggered suspension. Null when thread is not suspended. */ + suspendedRole: string | null; + /** Rendered suspend prompt for the user. Null when thread is not suspended. */ + suspendMessage: string | null; done: boolean; background: boolean | null; }; +/** Active thread entry in ~/.uncaged/workflow/threads.yaml */ +export type ThreadIndexEntry = { + head: CasRef; + suspendedRole: string | null; + suspendMessage: string | null; +}; + /** uwf thread steps — single step entry */ export type StepEntry = { hash: CasRef; @@ -200,4 +211,4 @@ export type WorkflowConfig = { }; /** ~/.uncaged/workflow/threads.yaml */ -export type ThreadsIndex = Record; +export type ThreadsIndex = Record; diff --git a/packages/workflow-util-agent/src/context.ts b/packages/workflow-util-agent/src/context.ts index 462665a..a2e0a51 100644 --- a/packages/workflow-util-agent/src/context.ts +++ b/packages/workflow-util-agent/src/context.ts @@ -163,7 +163,7 @@ export async function buildContext( const { store, schemas } = agentStore; const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]; + const headHash = index[threadId]?.head; if (headHash === undefined) { fail(`thread not found in threads.yaml: ${threadId}`); } @@ -212,7 +212,7 @@ export async function buildContextWithMeta( const { store, schemas } = agentStore; const index = await loadThreadsIndex(storageRoot); - const headHash = index[threadId]; + const headHash = index[threadId]?.head; if (headHash === undefined) { fail(`thread not found in threads.yaml: ${threadId}`); } diff --git a/packages/workflow-util-agent/src/storage.ts b/packages/workflow-util-agent/src/storage.ts index 573d632..9a407dd 100644 --- a/packages/workflow-util-agent/src/storage.ts +++ b/packages/workflow-util-agent/src/storage.ts @@ -12,11 +12,11 @@ import type { ProviderAlias, ProviderConfig, Scenario, - ThreadId, ThreadsIndex, WorkflowConfig, WorkflowName, } from "@uncaged/workflow-protocol"; +import { parseThreadsIndex } from "@uncaged/workflow-protocol"; import { parse } from "yaml"; import { registerAgentSchemas } from "./schemas.js"; @@ -207,16 +207,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise