Merge pull request 'feat: uwf thread resume command' (#595) from feat/590-thread-resume into main
CI / check (push) Failing after 1m37s

This commit was merged in pull request #595.
This commit is contained in:
2026-06-02 05:12:10 +00:00
19 changed files with 1202 additions and 121 deletions
@@ -720,7 +720,10 @@ defaultModel: default
describe("no legacy apiKeyEnv references", () => { describe("no legacy apiKeyEnv references", () => {
test("config.ts has no references to apiKeyEnv", () => { test("config.ts has no references to apiKeyEnv", () => {
const configSource = readFileSync(join(__dirname, "..", "..", "src", "commands", "config.ts"), "utf8"); const configSource = readFileSync(
join(__dirname, "..", "..", "src", "commands", "config.ts"),
"utf8",
);
expect(configSource).not.toContain("apiKeyEnv"); expect(configSource).not.toContain("apiKeyEnv");
}); });
@@ -175,8 +175,9 @@ async function insertStepNode(
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const headEntry = index[threadId];
if (head === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head;
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload);
@@ -199,7 +200,7 @@ async function insertStepNode(
detail: detailHash, detail: detailHash,
})) as CasRef; })) as CasRef;
index[threadId] = stepHash; index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
} }
@@ -280,7 +281,7 @@ describe("currentRole field", () => {
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!; const head = index[tid]!.head;
delete index[tid]; delete index[tid];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -309,7 +310,7 @@ describe("currentRole field", () => {
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!; const head = index[tid]!.head;
delete index[tid]; delete index[tid];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -371,7 +372,7 @@ describe("currentRole field", () => {
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId; const compId = comp.thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const compHead = index[compId]!; const compHead = index[compId]!.head;
delete index[compId]; delete index[compId];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -47,9 +47,7 @@ async function createTestThread(
prompt: "test prompt", prompt: "test prompt",
}; };
const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); await saveThreadsIndex(storageRoot, { [threadId]: headHash });
index[threadId] = headHash;
await saveThreadsIndex(storageRoot, index);
return threadId; return threadId;
} }
@@ -106,7 +104,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -130,7 +128,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -154,7 +152,7 @@ describe("cmdThreadList status filter", () => {
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -176,7 +174,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -348,7 +346,7 @@ describe("combined filters", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -372,7 +370,7 @@ describe("combined filters", () => {
const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000);
threads.push(thread); threads.push(thread);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const headHash = index[thread]; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
} }
@@ -421,7 +419,7 @@ describe("combined filters", () => {
if (i % 2 === 0) { if (i % 2 === 0) {
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const headHash = index[thread]; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
} else { } else {
@@ -479,7 +477,11 @@ describe("edge cases", () => {
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = "01J6HMVRNQKJV2"; index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
head: "01J6HMVRNQKJV2",
suspendedRole: null,
suspendMessage: null,
};
await saveThreadsIndex(tmpDir, index); await saveThreadsIndex(tmpDir, index);
const afterMs = Date.now() - 3000; const afterMs = Date.now() - 3000;
@@ -80,7 +80,7 @@ graph:
// Verify StartNode has the cwd field // Verify StartNode has the cwd field
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
const headHash = index[result.thread as ThreadId]; const headHash = index[result.thread as ThreadId]!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -175,7 +175,7 @@ graph:
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
const headHash = index[result.thread as ThreadId]; const headHash = index[result.thread as ThreadId]!.head;
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
const startPayload = startNode?.payload as StartNodePayload; const startPayload = startNode?.payload as StartNodePayload;
@@ -0,0 +1,442 @@
import { execFileSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { parse } from "yaml";
import { cmdThreadShow } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js";
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const },
question: { type: "string" as const },
},
required: ["$status"],
additionalProperties: false,
};
const THREAD_ID = "01RESUMESTEPTEST0000000" as ThreadId;
const SUSPEND_MESSAGE = "Please clarify: Which API?";
type MockAgentMode = "suspend" | "ok";
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-resume-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
async function setupSuspendedThread(mode: MockAgentMode): Promise<{
casDir: string;
mockAgentPath: string;
promptCapturePath: string;
}> {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.put(schemas.workflow, {
name: "test-resume",
description: "resume command integration test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
reviewer: {
description: "Reviewer role",
goal: "Review",
capabilities: [],
procedure: "review",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: {
needs_input: {
role: "$SUSPEND",
prompt: "Please clarify: {{{question}}}",
location: null,
},
ok: { role: "reviewer", prompt: "Review the work", location: null },
},
reviewer: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Test resume task",
cwd: tmpDir,
});
await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash });
const outputHash = await store.put(outputSchemaHash, {
$status: "needs_input",
question: "Which API?",
});
const detailHash = await store.put(schemas.text, "mock detail");
const startedAtMs = 1716600000000;
const completedAtMs = 1716600001500;
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs,
completedAtMs,
cwd: tmpDir,
assembledPrompt: null,
});
await saveThreadsIndex(tmpDir, {
[THREAD_ID]: {
head: stepHash,
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
},
});
const promptCapturePath = join(tmpDir, "captured-prompt.txt");
const mockAgentPath = join(tmpDir, "mock-agent.sh");
const frontmatter =
mode === "suspend" ? { $status: "needs_input", question: "Which API?" } : { $status: "ok" };
const adapterJson = JSON.stringify({
stepHash: await store.put(schemas.stepNode, {
start: startHash,
prev: stepHash,
role: "worker",
output: await store.put(outputSchemaHash, frontmatter),
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "resume prompt placeholder",
startedAtMs: completedAtMs + 1,
completedAtMs: completedAtMs + 2,
cwd: tmpDir,
assembledPrompt: null,
}),
detailHash,
role: "worker",
frontmatter,
body: "",
startedAtMs: completedAtMs + 1,
completedAtMs: completedAtMs + 2,
});
await writeFile(
mockAgentPath,
`#!/bin/sh
prompt=""
while [ $# -gt 0 ]; do
if [ "$1" = "--prompt" ]; then
prompt="$2"
shift 2
else
shift
fi
done
printf '%s' "$prompt" > '${promptCapturePath}'
echo '${adapterJson}'
`,
{ mode: 0o755 },
);
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
return { casDir, mockAgentPath, promptCapturePath };
}
function runUwf(
args: string[],
casDir: string,
): { stdout: string; stderr: string; status: number } {
const cliPath = join(import.meta.dirname, "..", "cli.js");
try {
const stdout = execFileSync("bun", ["run", cliPath, ...args], {
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
WORKFLOW_STORAGE_ROOT: tmpDir,
UNCAGED_CAS_DIR: casDir,
},
cwd: tmpDir,
timeout: 30000,
});
return { stdout, stderr: "", status: 0 };
} catch (error) {
const err = error as NodeJS.ErrnoException & {
stdout?: string | Buffer;
stderr?: string | Buffer;
status?: number;
};
return {
stdout: typeof err.stdout === "string" ? err.stdout : (err.stdout?.toString("utf8") ?? ""),
stderr: typeof err.stderr === "string" ? err.stderr : (err.stderr?.toString("utf8") ?? ""),
status: err.status ?? 1,
};
}
}
describe("uwf thread resume", () => {
test("resume non-suspended thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.put(schemas.workflow, {
name: "idle-workflow",
description: "idle thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
await saveThreadsIndex(tmpDir, { [THREAD_ID]: startHash });
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread is not suspended");
});
test("resume suspended thread executes step and becomes idle", async () => {
const originalCasDir = process.env.UNCAGED_CAS_DIR;
const { casDir, mockAgentPath } = await setupSuspendedThread("ok");
process.env.UNCAGED_CAS_DIR = casDir;
try {
const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir);
expect(result.status).toBe(0);
const cliOutput = JSON.parse(result.stdout.trim());
expect(cliOutput.status).toBe("idle");
expect(cliOutput.currentRole).toBe("reviewer");
expect(cliOutput.suspendedRole).toBeNull();
expect(cliOutput.suspendMessage).toBeNull();
expect(cliOutput.done).toBe(false);
const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8");
const threadsIndex = parse(threadsYaml) as Record<string, unknown>;
expect(threadsIndex[THREAD_ID]).toBe(cliOutput.head);
const showResult = await cmdThreadShow(tmpDir, THREAD_ID);
expect(showResult.status).toBe("idle");
expect(showResult.suspendedRole).toBeNull();
expect(showResult.suspendMessage).toBeNull();
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
test("resume without -p uses suspend message as agent prompt", async () => {
const originalCasDir = process.env.UNCAGED_CAS_DIR;
const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok");
process.env.UNCAGED_CAS_DIR = casDir;
try {
const result = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir);
expect(result.status).toBe(0);
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toBe(SUSPEND_MESSAGE);
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
test("resume with -p appends supplementary info to agent prompt", async () => {
const originalCasDir = process.env.UNCAGED_CAS_DIR;
const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("ok");
process.env.UNCAGED_CAS_DIR = casDir;
try {
const supplement = "Use the REST API.";
const result = runUwf(
["thread", "resume", THREAD_ID, "-p", supplement, "--agent", mockAgentPath],
casDir,
);
expect(result.status).toBe(0);
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toBe(`${SUSPEND_MESSAGE}\n\n${supplement}`);
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
test("multiple suspend/resume cycles", async () => {
const originalCasDir = process.env.UNCAGED_CAS_DIR;
const { casDir, mockAgentPath, promptCapturePath } = await setupSuspendedThread("suspend");
process.env.UNCAGED_CAS_DIR = casDir;
try {
const firstResult = runUwf(["thread", "resume", THREAD_ID, "--agent", mockAgentPath], casDir);
expect(firstResult.status).toBe(0);
const firstResume = JSON.parse(firstResult.stdout.trim());
expect(firstResume.status).toBe("suspended");
expect(firstResume.suspendedRole).toBe("worker");
expect(firstResume.suspendMessage).toBe(SUSPEND_MESSAGE);
const threadsAfterFirst = parse(
await readFile(join(tmpDir, "threads.yaml"), "utf8"),
) as Record<string, unknown>;
expect(threadsAfterFirst[THREAD_ID]).toEqual({
head: firstResume.head,
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
});
const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent(
casDir,
firstResume.head as CasRef,
);
const secondResult = runUwf(
["thread", "resume", THREAD_ID, "--agent", okMockAgentPath],
casDir,
);
expect(secondResult.status).toBe(0);
const secondResume = JSON.parse(secondResult.stdout.trim());
expect(secondResume.status).toBe("idle");
expect(secondResume.currentRole).toBe("reviewer");
expect(secondResume.suspendedRole).toBeNull();
expect(secondResume.suspendMessage).toBeNull();
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toBe(SUSPEND_MESSAGE);
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
});
async function setupOkMockAgent(
casDir: string,
prevHead: CasRef,
): Promise<{ mockAgentPath: string }> {
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const prevNode = store.get(prevHead);
if (prevNode === null || prevNode.type !== schemas.stepNode) {
throw new Error(`expected StepNode at ${prevHead}`);
}
const prevPayload = prevNode.payload as StepNodePayload;
const outputHash = await store.put(outputSchemaHash, { $status: "ok" });
const detailHash = await store.put(schemas.text, "ok detail");
const startedAtMs = Date.now();
const completedAtMs = startedAtMs + 1;
const stepHash = await store.put(schemas.stepNode, {
start: prevPayload.start,
prev: prevHead,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "resume",
startedAtMs,
completedAtMs,
cwd: tmpDir,
assembledPrompt: null,
});
const promptCapturePath = join(tmpDir, "captured-prompt.txt");
const mockAgentPath = join(tmpDir, "mock-agent-ok.sh");
const adapterJson = JSON.stringify({
stepHash,
detailHash,
role: "worker",
frontmatter: { $status: "ok" },
body: "",
startedAtMs,
completedAtMs,
});
await writeFile(
mockAgentPath,
`#!/bin/sh
prompt=""
while [ $# -gt 0 ]; do
if [ "$1" = "--prompt" ]; then
prompt="$2"
shift 2
else
shift
fi
done
printf '%s' "$prompt" > '${promptCapturePath}'
echo '${adapterJson}'
`,
{ mode: 0o755 },
);
return { mockAgentPath };
}
@@ -90,8 +90,9 @@ async function insertStepNode(
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const headEntry = index[threadId];
if (head === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head;
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload);
@@ -116,7 +117,7 @@ async function insertStepNode(
assembledPrompt: null, assembledPrompt: null,
})) as CasRef; })) as CasRef;
index[threadId] = stepHash; index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
} }
@@ -203,7 +204,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'completed' // Move thread to history with reason 'completed'
@@ -243,7 +244,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'cancelled' // Move thread to history with reason 'cancelled'
@@ -283,7 +284,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason null (legacy format) // Move thread to history with reason null (legacy format)
@@ -333,6 +334,8 @@ describe("thread show status field", () => {
expect(result.status).toBe("suspended"); expect(result.status).toBe("suspended");
expect(result.done).toBe(false); expect(result.done).toBe(false);
expect(result.currentRole).toBe(null); expect(result.currentRole).toBe(null);
expect(result.suspendedRole).toBe("worker");
expect(result.suspendMessage).toBe("Please clarify: Which API?");
expect(result.background).toBe(null); expect(result.background).toBe(null);
expect(result.thread).toBe(threadId); expect(result.thread).toBe(threadId);
} finally { } finally {
@@ -75,7 +75,7 @@ graph:
async function getStartNodeCwd(threadId: string): Promise<string> { async function getStartNodeCwd(threadId: string): Promise<string> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId as ThreadId]; const headHash = index[threadId as ThreadId]!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -0,0 +1,179 @@
import { execFileSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { parse } from "yaml";
import { cmdThreadShow } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js";
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const },
question: { type: "string" as const },
},
required: ["$status"],
additionalProperties: false,
};
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspend-step-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
describe("suspend step CAS chain and threads.yaml metadata", () => {
test("thread exec records suspend step in CAS and suspend metadata in threads.yaml", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const originalCasDir = process.env.UNCAGED_CAS_DIR;
process.env.UNCAGED_CAS_DIR = casDir;
try {
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.put(schemas.workflow, {
name: "test-suspend-step",
description: "suspend step integration test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: {
needs_input: {
role: "$SUSPEND",
prompt: "Please clarify: {{{question}}}",
location: null,
},
},
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Test suspend task",
cwd: tmpDir,
});
const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: startHash });
const outputHash = await store.put(outputSchemaHash, {
$status: "needs_input",
question: "Which API?",
});
const detailHash = await store.put(schemas.text, "mock detail");
const startedAtMs = 1716600000000;
const completedAtMs = 1716600001500;
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs,
completedAtMs,
cwd: tmpDir,
assembledPrompt: null,
});
const mockAgentPath = join(tmpDir, "mock-agent.sh");
const adapterJson = JSON.stringify({
stepHash,
detailHash,
role: "worker",
frontmatter: { $status: "needs_input", question: "Which API?" },
body: "",
startedAtMs,
completedAtMs,
});
await writeFile(mockAgentPath, `#!/bin/sh\necho '${adapterJson}'\n`, { mode: 0o755 });
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
const cliPath = join(import.meta.dirname, "..", "cli.js");
const stdout = execFileSync(
"bun",
["run", cliPath, "thread", "exec", threadId, "--agent", mockAgentPath],
{
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
WORKFLOW_STORAGE_ROOT: tmpDir,
UNCAGED_CAS_DIR: casDir,
},
cwd: tmpDir,
timeout: 30000,
},
);
const cliOutput = JSON.parse(stdout.trim());
expect(cliOutput.status).toBe("suspended");
expect(cliOutput.head).toBe(stepHash);
expect(cliOutput.suspendedRole).toBe("worker");
expect(cliOutput.suspendMessage).toBe("Please clarify: Which API?");
const storeAfter = createFsStore(casDir);
const stepNode = storeAfter.get(cliOutput.head as CasRef);
expect(stepNode).not.toBeNull();
const payload = stepNode!.payload as StepNodePayload;
expect(payload.role).toBe("worker");
expect(payload.output).toBe(outputHash);
const outputNode = storeAfter.get(outputHash);
expect(outputNode?.payload).toEqual({
$status: "needs_input",
question: "Which API?",
});
const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8");
const threadsIndex = parse(threadsYaml) as Record<string, unknown>;
const threadEntry = threadsIndex[threadId];
expect(threadEntry).toEqual({
head: stepHash,
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
const showResult = await cmdThreadShow(tmpDir, threadId);
expect(showResult.status).toBe("suspended");
expect(showResult.suspendMessage).toBe("Please clarify: Which API?");
expect(showResult.suspendedRole).toBe("worker");
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
});
+22
View File
@@ -30,6 +30,7 @@ import {
cmdThreadExec, cmdThreadExec,
cmdThreadList, cmdThreadList,
cmdThreadRead, cmdThreadRead,
cmdThreadResume,
cmdThreadShow, cmdThreadShow,
cmdThreadStart, cmdThreadStart,
cmdThreadStop, cmdThreadStop,
@@ -280,6 +281,27 @@ thread
}, },
); );
thread
.command("resume")
.description("Resume a suspended thread and re-run the suspended role")
.argument("<thread-id>", "Thread ULID")
.option("-p, --prompt <text>", "Supplementary info to append to the resume prompt")
.option("--agent <cmd>", "Override agent command")
.action((threadId: string, opts: { prompt: string | undefined; agent: string | undefined }) => {
const storageRoot = resolveStorageRoot();
runAction(async () => {
const supplement = opts.prompt ?? null;
const agentOverride = opts.agent ?? null;
const result = await cmdThreadResume(
storageRoot,
threadId as ThreadId,
supplement,
agentOverride,
);
writeOutput(result);
});
});
thread thread
.command("stop") .command("stop")
.description("Stop background execution of a thread (keep thread active)") .description("Stop background execution of a thread (keep thread active)")
+1 -1
View File
@@ -203,7 +203,7 @@ function collectOrderedSteps(
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const activeHead = index[threadId]?.head;
if (activeHead !== undefined) { if (activeHead !== undefined) {
return activeHead; return activeHead;
} }
+1 -1
View File
@@ -113,7 +113,7 @@ export async function cmdStepFork(
const newThreadId = generateUlid(Date.now()) as ThreadId; const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash; index[newThreadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
return { return {
+297 -65
View File
@@ -11,12 +11,18 @@ import type {
StepNodePayload, StepNodePayload,
StepOutput, StepOutput,
ThreadId, ThreadId,
ThreadIndexEntry,
ThreadListItem, ThreadListItem,
ThreadStatus, ThreadStatus,
ThreadsIndex, ThreadsIndex,
WorkflowConfig, WorkflowConfig,
WorkflowPayload, WorkflowPayload,
} from "@uncaged/workflow-protocol"; } from "@uncaged/workflow-protocol";
import {
createThreadIndexEntry,
markThreadSuspended,
updateThreadHead,
} from "@uncaged/workflow-protocol";
import { import {
createProcessLogger, createProcessLogger,
extractUlidTimestamp, extractUlidTimestamp,
@@ -68,8 +74,15 @@ function buildStepOutputFromEvaluation(
): StepOutput { ): StepOutput {
const done = status === "completed"; const done = status === "completed";
let currentRole: string | null = null; let currentRole: string | null = null;
if (evaluation.ok && !isSuspendResult(evaluation.value) && evaluation.value.role !== END_ROLE) { let suspendedRole: string | null = null;
currentRole = evaluation.value.role; let suspendMessage: string | null = null;
if (evaluation.ok) {
if (isSuspendResult(evaluation.value)) {
suspendedRole = evaluation.value.suspendedRole;
suspendMessage = evaluation.value.prompt;
} else if (evaluation.value.role !== END_ROLE) {
currentRole = evaluation.value.role;
}
} }
return { return {
workflow: workflowHash, workflow: workflowHash,
@@ -77,11 +90,68 @@ function buildStepOutputFromEvaluation(
head, head,
status, status,
currentRole, currentRole,
suspendedRole,
suspendMessage,
done, done,
background, background,
}; };
} }
function resolveSuspendFieldsFromGraph(
uwf: UwfStore,
head: CasRef,
workflowRef: CasRef,
): { suspendedRole: string | null; suspendMessage: string | null } {
const chain = walkChain(uwf, head);
const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain);
const workflow = loadWorkflowPayload(uwf, workflowRef);
const result = evaluate(workflow.graph, lastRole, lastOutput);
if (result.ok && isSuspendResult(result.value)) {
return {
suspendedRole: result.value.suspendedRole,
suspendMessage: result.value.prompt,
};
}
return { suspendedRole: null, suspendMessage: null };
}
function resolveSuspendFieldsForShow(
entry: ThreadIndexEntry,
status: ThreadStatus,
uwf: UwfStore,
head: CasRef,
workflowRef: CasRef,
): { suspendedRole: string | null; suspendMessage: string | null } {
if (status !== "suspended") {
return { suspendedRole: null, suspendMessage: null };
}
if (entry.suspendedRole !== null && entry.suspendMessage !== null) {
return { suspendedRole: entry.suspendedRole, suspendMessage: entry.suspendMessage };
}
const fromGraph = resolveSuspendFieldsFromGraph(uwf, head, workflowRef);
return {
suspendedRole: entry.suspendedRole ?? fromGraph.suspendedRole,
suspendMessage: entry.suspendMessage ?? fromGraph.suspendMessage,
};
}
async function ensureThreadSuspendMetadata(
storageRoot: string,
threadId: ThreadId,
entry: ThreadIndexEntry,
suspendedRole: string,
suspendMessage: string,
): Promise<ThreadIndexEntry> {
if (entry.suspendedRole !== null && entry.suspendMessage !== null) {
return entry;
}
const updated = markThreadSuspended(entry, suspendedRole, suspendMessage);
const index = await loadThreadsIndex(storageRoot);
index[threadId] = updated;
await saveThreadsIndex(storageRoot, index);
return updated;
}
async function resolveActiveThreadStatus( async function resolveActiveThreadStatus(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
@@ -130,6 +200,25 @@ const PL_AGENT_DONE = "C6P9E3H7";
const PL_THREAD_ARCHIVED = "F4D8Q2K5"; const PL_THREAD_ARCHIVED = "F4D8Q2K5";
const PL_STEP_ERROR = "B8T5N1V6"; const PL_STEP_ERROR = "B8T5N1V6";
const PL_BACKGROUND_START = "X7Q4W9M2"; const PL_BACKGROUND_START = "X7Q4W9M2";
const PL_THREAD_RESUME = "K2R7M4N8";
type ResumeStepConfig = {
role: string;
prompt: string;
};
type AgentStepTarget = {
role: string;
edgePrompt: string;
effectiveCwd: string;
};
function buildResumePrompt(graphPrompt: string, supplement: string | null): string {
if (supplement === null || supplement === "") {
return graphPrompt;
}
return `${graphPrompt}\n\n${supplement}`;
}
function failStep(plog: ProcessLogger, message: string): never { function failStep(plog: ProcessLogger, message: string): never {
plog.log(PL_STEP_ERROR, message, null); plog.log(PL_STEP_ERROR, message, null);
@@ -380,7 +469,7 @@ export async function cmdThreadStart(
} }
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
index[threadId] = headHash; index[threadId] = createThreadIndexEntry(headHash);
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
plog.log( plog.log(
@@ -394,8 +483,9 @@ export async function cmdThreadStart(
export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise<StepOutput> { export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const entry = index[threadId];
if (activeHead !== undefined) { if (entry !== undefined) {
const activeHead = entry.head;
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, activeHead); const workflow = resolveWorkflowFromHead(uwf, activeHead);
if (workflow === null) { if (workflow === null) {
@@ -410,6 +500,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
workflow, workflow,
); );
const currentRole = resolveCurrentRole(uwf, activeHead, workflow); const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
return { return {
workflow, workflow,
@@ -417,6 +508,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
head: activeHead, head: activeHead,
status, status,
currentRole, currentRole,
suspendedRole: suspendFields.suspendedRole,
suspendMessage: suspendFields.suspendMessage,
done: false, done: false,
background: null, background: null,
}; };
@@ -432,6 +525,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
head: hist.head, head: hist.head,
status, status,
currentRole: null, currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true, done: true,
background: null, background: null,
}; };
@@ -473,13 +568,8 @@ async function collectActiveThreads(
index: ThreadsIndex, index: ThreadsIndex,
): Promise<ThreadListItemWithStatus[]> { ): Promise<ThreadListItemWithStatus[]> {
const items: ThreadListItemWithStatus[] = []; const items: ThreadListItemWithStatus[] = [];
for (const [threadId, head] of Object.entries(index)) { for (const [threadId, entry] of Object.entries(index)) {
const item = await threadListItemFromActive( const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, entry.head);
storageRoot,
uwf,
threadId as ThreadId,
head as CasRef,
);
if (item !== null) { if (item !== null) {
items.push(item); items.push(item);
} }
@@ -945,6 +1035,65 @@ async function archiveThread(
}); });
} }
export async function cmdThreadResume(
storageRoot: string,
threadId: ThreadId,
supplement: string | null,
agentOverride: string | null,
): Promise<StepOutput> {
const runningMarker = await isThreadRunning(storageRoot, threadId);
if (runningMarker !== null) {
fail(`thread already executing in background (PID: ${runningMarker.pid})`);
}
const index = await loadThreadsIndex(storageRoot);
const entry = index[threadId];
if (entry === undefined) {
fail(`thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const headHash = entry.head;
const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow;
const status = await resolveActiveThreadStatus(
storageRoot,
threadId,
uwf,
headHash,
workflowHash,
);
if (status !== "suspended") {
fail(`thread is not suspended: ${threadId} (status: ${status})`);
}
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, headHash, workflowHash);
if (suspendFields.suspendedRole === null) {
fail(`thread is suspended but suspendedRole is missing: ${threadId}`);
}
if (suspendFields.suspendMessage === null) {
fail(`thread is suspended but suspendMessage is missing: ${threadId}`);
}
const resumePrompt = buildResumePrompt(suspendFields.suspendMessage, supplement);
const plog = createProcessLogger({
storageRoot,
context: { thread: threadId, workflow: workflowHash },
});
plog.log(
PL_THREAD_RESUME,
`resume role=${suspendFields.suspendedRole} supplement=${supplement !== null}`,
null,
);
return cmdThreadStepOnce(storageRoot, threadId, agentOverride, plog, {
role: suspendFields.suspendedRole,
prompt: resumePrompt,
});
}
export async function cmdThreadExec( export async function cmdThreadExec(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
@@ -1011,12 +1160,12 @@ async function resolveActiveThreadWorkflowHash(
threadId: ThreadId, threadId: ThreadId,
): Promise<CasRef> { ): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const entry = index[threadId];
if (headHash === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, entry.head);
return chain.start.workflow; return chain.start.workflow;
} }
@@ -1030,10 +1179,11 @@ async function cmdThreadStepBackground(
): Promise<StepOutput[]> { ): Promise<StepOutput[]> {
// Get current head to return to caller // Get current head to return to caller
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const entry = index[threadId];
if (headHash === undefined) { if (entry === undefined) {
failStep(plog, `thread not active: ${threadId}`); failStep(plog, `thread not active: ${threadId}`);
} }
const headHash = entry.head;
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
@@ -1069,30 +1219,42 @@ async function cmdThreadStepBackground(
head: headHash, head: headHash,
status: "running", status: "running",
currentRole: resolveCurrentRole(uwf, headHash, workflowHash), currentRole: resolveCurrentRole(uwf, headHash, workflowHash),
suspendedRole: null,
suspendMessage: null,
done: false, done: false,
background: true, background: true,
}, },
]; ];
} }
async function cmdThreadStepOnce( function resolveResumeStepTarget(
resume: ResumeStepConfig,
chain: ChainState,
threadCwd: string,
plog: ProcessLogger,
): AgentStepTarget {
const lastStep = chain.stepsNewestFirst[0];
plog.log(PL_MODERATOR, `resume role=${resume.role} prompt=${resume.prompt}`, null);
return {
role: resume.role,
edgePrompt: resume.prompt,
effectiveCwd: lastStep !== undefined && lastStep.cwd !== "" ? lastStep.cwd : threadCwd,
};
}
async function resolveModeratorStepTarget(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
agentOverride: string | null, entry: ThreadIndexEntry,
headHash: CasRef,
workflowHash: CasRef,
workflow: WorkflowPayload,
uwf: UwfStore,
chain: ChainState,
threadCwd: string,
plog: ProcessLogger, plog: ProcessLogger,
): Promise<StepOutput> { ): Promise<StepOutput | AgentStepTarget> {
const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId];
if (headHash === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow;
const workflow = loadWorkflowPayload(uwf, workflowHash);
const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain); const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain);
const nextResult = evaluate(workflow.graph, lastRole, lastOutput); const nextResult = evaluate(workflow.graph, lastRole, lastOutput);
if (!nextResult.ok) { if (!nextResult.ok) {
failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`); failStep(plog, `moderator evaluate failed: ${nextResult.error.message}`);
@@ -1109,6 +1271,13 @@ async function cmdThreadStepOnce(
); );
if (isSuspendResult(nextResult.value)) { if (isSuspendResult(nextResult.value)) {
await ensureThreadSuspendMetadata(
storageRoot,
threadId,
entry,
nextResult.value.suspendedRole,
nextResult.value.prompt,
);
return buildStepOutputFromEvaluation( return buildStepOutputFromEvaluation(
workflowHash, workflowHash,
threadId, threadId,
@@ -1128,41 +1297,32 @@ async function cmdThreadStepOnce(
head: headHash, head: headHash,
status: "completed", status: "completed",
currentRole: null, currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true, done: true,
background: null, background: null,
}; };
} }
const role = nextResult.value.role; return {
const edgePrompt = nextResult.value.prompt; role: nextResult.value.role,
edgePrompt: nextResult.value.prompt,
effectiveCwd: nextResult.value.location !== null ? nextResult.value.location : threadCwd,
};
}
// Resolve cwd: use edge location if provided, otherwise inherit thread.cwd async function finalizeAgentStep(
const threadCwd = chain.start.cwd; storageRoot: string,
const effectiveCwd = nextResult.value.location !== null ? nextResult.value.location : threadCwd; threadId: ThreadId,
workflowHash: CasRef,
const config = await loadWorkflowConfig(storageRoot); workflow: WorkflowPayload,
const agent = resolveAgentConfig(config, workflow, role, agentOverride); newHead: CasRef,
uwfAfter: UwfStore,
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, { plog: ProcessLogger,
args: [...agent.args, threadId, role].join(" "), ): Promise<StepOutput> {
});
loadDotenv({ path: getEnvPath(storageRoot) });
const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd);
const newHead = agentResult.stepHash as CasRef;
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
// Re-create store to pick up nodes written by the agent subprocess
const uwfAfter = await createUwfStore(storageRoot);
const newNode = uwfAfter.store.get(newHead);
if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) {
failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`);
}
// Reload threads index to avoid overwriting changes made by the agent subprocess
const freshIndex = await loadThreadsIndex(storageRoot); const freshIndex = await loadThreadsIndex(storageRoot);
freshIndex[threadId] = newHead; const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead);
freshIndex[threadId] = updateThreadHead(priorEntry, newHead);
await saveThreadsIndex(storageRoot, freshIndex); await saveThreadsIndex(storageRoot, freshIndex);
const chainAfter = walkChain(uwfAfter, newHead); const chainAfter = walkChain(uwfAfter, newHead);
@@ -1176,6 +1336,12 @@ async function cmdThreadStepOnce(
} }
if (isSuspendResult(afterResult.value)) { if (isSuspendResult(afterResult.value)) {
freshIndex[threadId] = markThreadSuspended(
freshIndex[threadId] ?? createThreadIndexEntry(newHead),
afterResult.value.suspendedRole,
afterResult.value.prompt,
);
await saveThreadsIndex(storageRoot, freshIndex);
return buildStepOutputFromEvaluation( return buildStepOutputFromEvaluation(
workflowHash, workflowHash,
threadId, threadId,
@@ -1192,7 +1358,6 @@ async function cmdThreadStepOnce(
await archiveThread(storageRoot, threadId, workflowHash, newHead); await archiveThread(storageRoot, threadId, workflowHash, newHead);
} }
// Determine status based on whether thread is done and running state
const status: ThreadStatus = done ? "completed" : "idle"; const status: ThreadStatus = done ? "completed" : "idle";
const currentRole = done ? null : afterResult.value.role; const currentRole = done ? null : afterResult.value.role;
@@ -1202,14 +1367,80 @@ async function cmdThreadStepOnce(
head: newHead, head: newHead,
status, status,
currentRole, currentRole,
suspendedRole: null,
suspendMessage: null,
done, done,
background: null, background: null,
}; };
} }
async function cmdThreadStepOnce(
storageRoot: string,
threadId: ThreadId,
agentOverride: string | null,
plog: ProcessLogger,
resume: ResumeStepConfig | null = null,
): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot);
const entry = index[threadId];
if (entry === undefined) {
failStep(plog, `thread not active: ${threadId}`);
}
const headHash = entry.head;
const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash);
const workflowHash = chain.start.workflow;
const workflow = loadWorkflowPayload(uwf, workflowHash);
const threadCwd = chain.start.cwd;
const targetOrOutput =
resume !== null
? resolveResumeStepTarget(resume, chain, threadCwd, plog)
: await resolveModeratorStepTarget(
storageRoot,
threadId,
entry,
headHash,
workflowHash,
workflow,
uwf,
chain,
threadCwd,
plog,
);
if ("status" in targetOrOutput) {
return targetOrOutput;
}
const { role, edgePrompt, effectiveCwd } = targetOrOutput;
const config = await loadWorkflowConfig(storageRoot);
const agent = resolveAgentConfig(config, workflow, role, agentOverride);
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, {
args: [...agent.args, threadId, role].join(" "),
});
loadDotenv({ path: getEnvPath(storageRoot) });
const agentResult = spawnAgent(plog, agent, threadId, role, edgePrompt, effectiveCwd);
const newHead = agentResult.stepHash as CasRef;
plog.log(PL_AGENT_DONE, `agent returned head=${newHead}`, null);
const uwfAfter = await createUwfStore(storageRoot);
const newNode = uwfAfter.store.get(newHead);
if (newNode === null || newNode.type !== uwfAfter.schemas.stepNode) {
failStep(plog, `agent returned hash that is not a StepNode: ${newHead}`);
}
return finalizeAgentStep(storageRoot, threadId, workflowHash, workflow, newHead, uwfAfter, plog);
}
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const activeHead = index[threadId]?.head;
if (activeHead !== undefined) { if (activeHead !== undefined) {
return activeHead; return activeHead;
} }
@@ -1262,8 +1493,8 @@ export type CancelOutput = {
*/ */
export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> { export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const entry = index[threadId];
if (head === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
@@ -1292,10 +1523,11 @@ export async function cmdThreadCancel(
threadId: ThreadId, threadId: ThreadId,
): Promise<CancelOutput> { ): Promise<CancelOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const entry = index[threadId];
if (head === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const head = entry.head;
// Check if thread is running in background and terminate it // Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId); const runningMarker = await isThreadRunning(storageRoot, threadId);
+30 -13
View File
@@ -5,7 +5,18 @@ import { join } from "node:path";
import type { BootstrapCapableStore, Hash } from "@ocas/core"; import type { BootstrapCapableStore, Hash } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId, ThreadListItem, ThreadsIndex } from "@uncaged/workflow-protocol"; import type {
CasRef,
ThreadId,
ThreadIndexEntry,
ThreadListItem,
ThreadsIndex,
} from "@uncaged/workflow-protocol";
import {
createThreadIndexEntry,
parseThreadsIndex,
serializeThreadsIndex,
} from "@uncaged/workflow-protocol";
import { parse, stringify } from "yaml"; import { parse, stringify } from "yaml";
import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js";
@@ -234,16 +245,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
try { try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { return parseThreadsIndex(raw);
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, head] of Object.entries(raw as Record<string, unknown>)) {
if (typeof head === "string") {
index[threadId as ThreadId] = head;
}
}
return index;
} catch (e) { } catch (e) {
const err = e as NodeJS.ErrnoException; const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") { if (err.code === "ENOENT") {
@@ -253,10 +255,25 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
} }
} }
export async function saveThreadsIndex(storageRoot: string, index: ThreadsIndex): Promise<void> { /** Accept legacy CasRef values for test convenience. */
export type ThreadsIndexInput = Record<ThreadId, ThreadIndexEntry | CasRef>;
function normalizeThreadsIndexInput(index: ThreadsIndexInput): ThreadsIndex {
const normalized: ThreadsIndex = {};
for (const [threadId, value] of Object.entries(index)) {
normalized[threadId as ThreadId] =
typeof value === "string" ? createThreadIndexEntry(value as CasRef) : value;
}
return normalized;
}
export async function saveThreadsIndex(
storageRoot: string,
index: ThreadsIndexInput,
): Promise<void> {
const path = getThreadsPath(storageRoot); const path = getThreadsPath(storageRoot);
await mkdir(storageRoot, { recursive: true }); await mkdir(storageRoot, { recursive: true });
const text = stringify(index, { indent: 2 }); const text = stringify(serializeThreadsIndex(normalizeThreadsIndexInput(index)), { indent: 2 });
await writeFile(path, text, "utf8"); await writeFile(path, text, "utf8");
} }
@@ -0,0 +1,79 @@
import { describe, expect, test } from "vitest";
import {
createThreadIndexEntry,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
serializeThreadIndexEntry,
serializeThreadsIndex,
updateThreadHead,
} from "../thread-index.js";
describe("thread-index", () => {
test("parse legacy string head hash", () => {
const entry = normalizeThreadIndexEntry("0123456789ABC");
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
});
});
test("parse suspended object entry", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
});
test("serialize non-suspended entry as compact string", () => {
const entry = createThreadIndexEntry("0123456789ABC");
expect(serializeThreadIndexEntry(entry)).toBe("0123456789ABC");
});
test("serialize suspended entry as object", () => {
const entry = markThreadSuspended(
createThreadIndexEntry("0123456789ABC"),
"worker",
"Please clarify: Which API?",
);
expect(serializeThreadIndexEntry(entry)).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
});
test("updateThreadHead clears suspend metadata", () => {
const suspended = markThreadSuspended(
createThreadIndexEntry("OLDHEAD0123456"),
"worker",
"Waiting",
);
const resumed = updateThreadHead(suspended, "NEWHEAD01234567");
expect(resumed).toEqual({
head: "NEWHEAD01234567",
suspendedRole: null,
suspendMessage: null,
});
});
test("parseThreadsIndex round-trip", () => {
const raw = {
"01THREAD0000000000000001": "HEAD00000000001",
"01THREAD0000000000000002": {
head: "HEAD00000000002",
suspendedRole: "reviewer",
suspendMessage: "Need input",
},
};
const parsed = parseThreadsIndex(raw);
expect(serializeThreadsIndex(parsed)).toEqual(raw);
});
});
+10
View File
@@ -3,6 +3,15 @@ export {
STEP_NODE_SCHEMA, STEP_NODE_SCHEMA,
WORKFLOW_SCHEMA, WORKFLOW_SCHEMA,
} from "./schemas.js"; } from "./schemas.js";
export {
createThreadIndexEntry,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
serializeThreadIndexEntry,
serializeThreadsIndex,
updateThreadHead,
} from "./thread-index.js";
export type { export type {
AgentAlias, AgentAlias,
AgentConfig, AgentConfig,
@@ -29,6 +38,7 @@ export type {
Target, Target,
ThreadForkOutput, ThreadForkOutput,
ThreadId, ThreadId,
ThreadIndexEntry,
ThreadListItem, ThreadListItem,
ThreadStatus, ThreadStatus,
ThreadStepsOutput, ThreadStepsOutput,
@@ -0,0 +1,89 @@
import type { CasRef, ThreadId, ThreadIndexEntry, ThreadsIndex } from "./types.js";
/** Normalize a legacy head hash or entry object into {@link ThreadIndexEntry}. */
export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null {
if (typeof raw === "string") {
return createThreadIndexEntry(raw as CasRef);
}
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return null;
}
const rec = raw as Record<string, unknown>;
const head = rec.head;
if (typeof head !== "string") {
return null;
}
const suspendedRole = rec.suspendedRole;
const suspendMessage = rec.suspendMessage;
return {
head: head as CasRef,
suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null,
suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null,
};
}
export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry {
return {
head,
suspendedRole: null,
suspendMessage: null,
};
}
export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): ThreadIndexEntry {
return {
head,
suspendedRole: null,
suspendMessage: null,
};
}
export function markThreadSuspended(
entry: ThreadIndexEntry,
suspendedRole: string,
suspendMessage: string,
): ThreadIndexEntry {
return {
head: entry.head,
suspendedRole,
suspendMessage,
};
}
/** Serialize for threads.yaml — compact string when not suspended. */
export function serializeThreadIndexEntry(
entry: ThreadIndexEntry,
): string | Record<string, string> {
if (entry.suspendedRole === null || entry.suspendMessage === null) {
return entry.head;
}
return {
head: entry.head,
suspendedRole: entry.suspendedRole,
suspendMessage: entry.suspendMessage,
};
}
export function parseThreadsIndex(raw: unknown): ThreadsIndex {
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, value] of Object.entries(raw as Record<string, unknown>)) {
const entry = normalizeThreadIndexEntry(value);
if (entry !== null) {
index[threadId as ThreadId] = entry;
}
}
return index;
}
export function serializeThreadsIndex(
index: ThreadsIndex,
): Record<string, string | Record<string, string>> {
const out: Record<string, string | Record<string, string>> = {};
for (const [threadId, entry] of Object.entries(index)) {
out[threadId] = serializeThreadIndexEntry(entry);
}
return out;
}
+12 -1
View File
@@ -105,10 +105,21 @@ export type StepOutput = {
status: ThreadStatus; status: ThreadStatus;
/** The current or next role. Null when completed, cancelled, suspended, or next is $END. */ /** The current or next role. Null when completed, cancelled, suspended, or next is $END. */
currentRole: string | null; currentRole: string | null;
/** Role whose output triggered suspension. Null when thread is not suspended. */
suspendedRole: string | null;
/** Rendered suspend prompt for the user. Null when thread is not suspended. */
suspendMessage: string | null;
done: boolean; done: boolean;
background: boolean | null; background: boolean | null;
}; };
/** Active thread entry in ~/.uncaged/workflow/threads.yaml */
export type ThreadIndexEntry = {
head: CasRef;
suspendedRole: string | null;
suspendMessage: string | null;
};
/** uwf thread steps — single step entry */ /** uwf thread steps — single step entry */
export type StepEntry = { export type StepEntry = {
hash: CasRef; hash: CasRef;
@@ -200,4 +211,4 @@ export type WorkflowConfig = {
}; };
/** ~/.uncaged/workflow/threads.yaml */ /** ~/.uncaged/workflow/threads.yaml */
export type ThreadsIndex = Record<ThreadId, CasRef>; export type ThreadsIndex = Record<ThreadId, ThreadIndexEntry>;
+2 -2
View File
@@ -163,7 +163,7 @@ export async function buildContext(
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const headHash = index[threadId]?.head;
if (headHash === undefined) { if (headHash === undefined) {
fail(`thread not found in threads.yaml: ${threadId}`); fail(`thread not found in threads.yaml: ${threadId}`);
} }
@@ -212,7 +212,7 @@ export async function buildContextWithMeta(
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const headHash = index[threadId]?.head;
if (headHash === undefined) { if (headHash === undefined) {
fail(`thread not found in threads.yaml: ${threadId}`); fail(`thread not found in threads.yaml: ${threadId}`);
} }
+2 -11
View File
@@ -12,11 +12,11 @@ import type {
ProviderAlias, ProviderAlias,
ProviderConfig, ProviderConfig,
Scenario, Scenario,
ThreadId,
ThreadsIndex, ThreadsIndex,
WorkflowConfig, WorkflowConfig,
WorkflowName, WorkflowName,
} from "@uncaged/workflow-protocol"; } from "@uncaged/workflow-protocol";
import { parseThreadsIndex } from "@uncaged/workflow-protocol";
import { parse } from "yaml"; import { parse } from "yaml";
import { registerAgentSchemas } from "./schemas.js"; import { registerAgentSchemas } from "./schemas.js";
@@ -207,16 +207,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
try { try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
if (!isRecord(raw)) { return parseThreadsIndex(raw);
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, head] of Object.entries(raw)) {
if (typeof head === "string") {
index[threadId as ThreadId] = head;
}
}
return index;
} catch (e) { } catch (e) {
const err = e as NodeJS.ErrnoException; const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") { if (err.code === "ENOENT") {