feat: record suspend event as StepNode in CAS chain

- ThreadIndexEntry supports suspendedRole + suspendMessage metadata
- threads.yaml: suspended threads serialize as objects (backward compat)
- cmdThreadStepOnce writes step before marking thread suspended
- StepOutput extended with suspendedRole/suspendMessage fields
- thread show displays suspend message

Closes #589
This commit is contained in:
2026-06-02 04:44:05 +00:00
parent b0ef9c55a9
commit 10b478640d
17 changed files with 560 additions and 81 deletions
@@ -720,7 +720,10 @@ defaultModel: default
describe("no legacy apiKeyEnv references", () => { describe("no legacy apiKeyEnv references", () => {
test("config.ts has no references to apiKeyEnv", () => { test("config.ts has no references to apiKeyEnv", () => {
const configSource = readFileSync(join(__dirname, "..", "..", "src", "commands", "config.ts"), "utf8"); const configSource = readFileSync(
join(__dirname, "..", "..", "src", "commands", "config.ts"),
"utf8",
);
expect(configSource).not.toContain("apiKeyEnv"); expect(configSource).not.toContain("apiKeyEnv");
}); });
@@ -175,8 +175,9 @@ async function insertStepNode(
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const headEntry = index[threadId];
if (head === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head;
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload);
@@ -199,7 +200,7 @@ async function insertStepNode(
detail: detailHash, detail: detailHash,
})) as CasRef; })) as CasRef;
index[threadId] = stepHash; index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
} }
@@ -280,7 +281,7 @@ describe("currentRole field", () => {
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!; const head = index[tid]!.head;
delete index[tid]; delete index[tid];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -309,7 +310,7 @@ describe("currentRole field", () => {
const tid = thread as ThreadId; const tid = thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[tid]!; const head = index[tid]!.head;
delete index[tid]; delete index[tid];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -371,7 +372,7 @@ describe("currentRole field", () => {
const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir); const comp = await cmdThreadStart(storageRoot, wf, "completed", tmpDir);
const compId = comp.thread as ThreadId; const compId = comp.thread as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const compHead = index[compId]!; const compHead = index[compId]!.head;
delete index[compId]; delete index[compId];
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
await appendThreadHistory(storageRoot, { await appendThreadHistory(storageRoot, {
@@ -47,9 +47,7 @@ async function createTestThread(
prompt: "test prompt", prompt: "test prompt",
}; };
const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); await saveThreadsIndex(storageRoot, { [threadId]: headHash });
index[threadId] = headHash;
await saveThreadsIndex(storageRoot, index);
return threadId; return threadId;
} }
@@ -106,7 +104,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -130,7 +128,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -154,7 +152,7 @@ describe("cmdThreadList status filter", () => {
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -176,7 +174,7 @@ describe("cmdThreadList status filter", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -348,7 +346,7 @@ describe("combined filters", () => {
await markThreadRunning(tmpDir, thread2, workflowHash); await markThreadRunning(tmpDir, thread2, workflowHash);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const thread3Head = index[thread3]; const thread3Head = index[thread3]!.head;
if (thread3Head === undefined) throw new Error("thread3 head not found"); if (thread3Head === undefined) throw new Error("thread3 head not found");
await completeThread(tmpDir, thread3, workflowHash, thread3Head); await completeThread(tmpDir, thread3, workflowHash, thread3Head);
@@ -372,7 +370,7 @@ describe("combined filters", () => {
const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000);
threads.push(thread); threads.push(thread);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const headHash = index[thread]; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
} }
@@ -421,7 +419,7 @@ describe("combined filters", () => {
if (i % 2 === 0) { if (i % 2 === 0) {
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
const headHash = index[thread]; const headHash = index[thread]!.head;
if (headHash === undefined) throw new Error("head not found"); if (headHash === undefined) throw new Error("head not found");
await completeThread(tmpDir, thread, workflowHash, headHash); await completeThread(tmpDir, thread, workflowHash, headHash);
} else { } else {
@@ -479,7 +477,11 @@ describe("edge cases", () => {
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = "01J6HMVRNQKJV2"; index["INVALID_ULID_FORMAT_HERE" as ThreadId] = {
head: "01J6HMVRNQKJV2",
suspendedRole: null,
suspendMessage: null,
};
await saveThreadsIndex(tmpDir, index); await saveThreadsIndex(tmpDir, index);
const afterMs = Date.now() - 3000; const afterMs = Date.now() - 3000;
@@ -80,7 +80,7 @@ graph:
// Verify StartNode has the cwd field // Verify StartNode has the cwd field
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
const headHash = index[result.thread as ThreadId]; const headHash = index[result.thread as ThreadId]!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -175,7 +175,7 @@ graph:
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
const headHash = index[result.thread as ThreadId]; const headHash = index[result.thread as ThreadId]!.head;
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
const startPayload = startNode?.payload as StartNodePayload; const startPayload = startNode?.payload as StartNodePayload;
@@ -90,8 +90,9 @@ async function insertStepNode(
): Promise<void> { ): Promise<void> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const headEntry = index[threadId];
if (head === undefined) throw new Error(`thread ${threadId} not in index`); if (headEntry === undefined) throw new Error(`thread ${threadId} not in index`);
const head = headEntry.head;
const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA); const outputSchemaHash = await putSchema(uwf.store, OUTPUT_SCHEMA);
const outputHash = await uwf.store.put(outputSchemaHash, outputPayload); const outputHash = await uwf.store.put(outputSchemaHash, outputPayload);
@@ -116,7 +117,7 @@ async function insertStepNode(
assembledPrompt: null, assembledPrompt: null,
})) as CasRef; })) as CasRef;
index[threadId] = stepHash; index[threadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
} }
@@ -203,7 +204,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'completed' // Move thread to history with reason 'completed'
@@ -243,7 +244,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason 'cancelled' // Move thread to history with reason 'cancelled'
@@ -283,7 +284,7 @@ describe("thread show status field", () => {
// Get the head hash before moving to history // Get the head hash before moving to history
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const head = index[threadId]!.head;
if (!head) throw new Error("Thread not found in index"); if (!head) throw new Error("Thread not found in index");
// Move thread to history with reason null (legacy format) // Move thread to history with reason null (legacy format)
@@ -333,6 +334,8 @@ describe("thread show status field", () => {
expect(result.status).toBe("suspended"); expect(result.status).toBe("suspended");
expect(result.done).toBe(false); expect(result.done).toBe(false);
expect(result.currentRole).toBe(null); expect(result.currentRole).toBe(null);
expect(result.suspendedRole).toBe("worker");
expect(result.suspendMessage).toBe("Please clarify: Which API?");
expect(result.background).toBe(null); expect(result.background).toBe(null);
expect(result.thread).toBe(threadId); expect(result.thread).toBe(threadId);
} finally { } finally {
@@ -75,7 +75,7 @@ graph:
async function getStartNodeCwd(threadId: string): Promise<string> { async function getStartNodeCwd(threadId: string): Promise<string> {
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId as ThreadId]; const headHash = index[threadId as ThreadId]!.head;
expect(headHash).toBeDefined(); expect(headHash).toBeDefined();
const startNode = uwf.store.get(headHash as CasRef); const startNode = uwf.store.get(headHash as CasRef);
@@ -0,0 +1,179 @@
import { execFileSync } from "node:child_process";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { putSchema } from "@ocas/core";
import { createFsStore } from "@ocas/fs";
import type { CasRef, StepNodePayload, ThreadId } from "@uncaged/workflow-protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { parse } from "yaml";
import { cmdThreadShow } from "../commands/thread.js";
import { registerUwfSchemas } from "../schemas.js";
import { saveThreadsIndex } from "../store.js";
const OUTPUT_SCHEMA = {
type: "object" as const,
properties: {
$status: { type: "string" as const },
question: { type: "string" as const },
},
required: ["$status"],
additionalProperties: false,
};
let tmpDir: string;
beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "cli-uwf-suspend-step-test-"));
});
afterEach(async () => {
await rm(tmpDir, { recursive: true, force: true });
});
describe("suspend step CAS chain and threads.yaml metadata", () => {
test("thread exec records suspend step in CAS and suspend metadata in threads.yaml", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const originalCasDir = process.env.UNCAGED_CAS_DIR;
process.env.UNCAGED_CAS_DIR = casDir;
try {
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.put(schemas.workflow, {
name: "test-suspend-step",
description: "suspend step integration test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: {
needs_input: {
role: "$SUSPEND",
prompt: "Please clarify: {{{question}}}",
location: null,
},
},
},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Test suspend task",
cwd: tmpDir,
});
const threadId = "01SUSPENDSTEPTEST0000000" as ThreadId;
await saveThreadsIndex(tmpDir, { [threadId]: startHash });
const outputHash = await store.put(outputSchemaHash, {
$status: "needs_input",
question: "Which API?",
});
const detailHash = await store.put(schemas.text, "mock detail");
const startedAtMs = 1716600000000;
const completedAtMs = 1716600001500;
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs,
completedAtMs,
cwd: tmpDir,
assembledPrompt: null,
});
const mockAgentPath = join(tmpDir, "mock-agent.sh");
const adapterJson = JSON.stringify({
stepHash,
detailHash,
role: "worker",
frontmatter: { $status: "needs_input", question: "Which API?" },
body: "",
startedAtMs,
completedAtMs,
});
await writeFile(mockAgentPath, `#!/bin/sh\necho '${adapterJson}'\n`, { mode: 0o755 });
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
const cliPath = join(import.meta.dirname, "..", "cli.js");
const stdout = execFileSync(
"bun",
["run", cliPath, "thread", "exec", threadId, "--agent", mockAgentPath],
{
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
WORKFLOW_STORAGE_ROOT: tmpDir,
UNCAGED_CAS_DIR: casDir,
},
cwd: tmpDir,
timeout: 30000,
},
);
const cliOutput = JSON.parse(stdout.trim());
expect(cliOutput.status).toBe("suspended");
expect(cliOutput.head).toBe(stepHash);
expect(cliOutput.suspendedRole).toBe("worker");
expect(cliOutput.suspendMessage).toBe("Please clarify: Which API?");
const storeAfter = createFsStore(casDir);
const stepNode = storeAfter.get(cliOutput.head as CasRef);
expect(stepNode).not.toBeNull();
const payload = stepNode!.payload as StepNodePayload;
expect(payload.role).toBe("worker");
expect(payload.output).toBe(outputHash);
const outputNode = storeAfter.get(outputHash);
expect(outputNode?.payload).toEqual({
$status: "needs_input",
question: "Which API?",
});
const threadsYaml = await readFile(join(tmpDir, "threads.yaml"), "utf8");
const threadsIndex = parse(threadsYaml) as Record<string, unknown>;
const threadEntry = threadsIndex[threadId];
expect(threadEntry).toEqual({
head: stepHash,
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
const showResult = await cmdThreadShow(tmpDir, threadId);
expect(showResult.status).toBe("suspended");
expect(showResult.suspendMessage).toBe("Please clarify: Which API?");
expect(showResult.suspendedRole).toBe("worker");
} finally {
if (originalCasDir === undefined) {
delete process.env.UNCAGED_CAS_DIR;
} else {
process.env.UNCAGED_CAS_DIR = originalCasDir;
}
}
});
});
+1 -1
View File
@@ -203,7 +203,7 @@ function collectOrderedSteps(
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const activeHead = index[threadId]?.head;
if (activeHead !== undefined) { if (activeHead !== undefined) {
return activeHead; return activeHead;
} }
+1 -1
View File
@@ -113,7 +113,7 @@ export async function cmdStepFork(
const newThreadId = generateUlid(Date.now()) as ThreadId; const newThreadId = generateUlid(Date.now()) as ThreadId;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
index[newThreadId] = stepHash; index[newThreadId] = { head: stepHash, suspendedRole: null, suspendMessage: null };
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
return { return {
+118 -24
View File
@@ -11,12 +11,18 @@ import type {
StepNodePayload, StepNodePayload,
StepOutput, StepOutput,
ThreadId, ThreadId,
ThreadIndexEntry,
ThreadListItem, ThreadListItem,
ThreadStatus, ThreadStatus,
ThreadsIndex, ThreadsIndex,
WorkflowConfig, WorkflowConfig,
WorkflowPayload, WorkflowPayload,
} from "@uncaged/workflow-protocol"; } from "@uncaged/workflow-protocol";
import {
createThreadIndexEntry,
markThreadSuspended,
updateThreadHead,
} from "@uncaged/workflow-protocol";
import { import {
createProcessLogger, createProcessLogger,
extractUlidTimestamp, extractUlidTimestamp,
@@ -68,20 +74,84 @@ function buildStepOutputFromEvaluation(
): StepOutput { ): StepOutput {
const done = status === "completed"; const done = status === "completed";
let currentRole: string | null = null; let currentRole: string | null = null;
if (evaluation.ok && !isSuspendResult(evaluation.value) && evaluation.value.role !== END_ROLE) { let suspendedRole: string | null = null;
let suspendMessage: string | null = null;
if (evaluation.ok) {
if (isSuspendResult(evaluation.value)) {
suspendedRole = evaluation.value.suspendedRole;
suspendMessage = evaluation.value.prompt;
} else if (evaluation.value.role !== END_ROLE) {
currentRole = evaluation.value.role; currentRole = evaluation.value.role;
} }
}
return { return {
workflow: workflowHash, workflow: workflowHash,
thread: threadId, thread: threadId,
head, head,
status, status,
currentRole, currentRole,
suspendedRole,
suspendMessage,
done, done,
background, background,
}; };
} }
function resolveSuspendFieldsFromGraph(
uwf: UwfStore,
head: CasRef,
workflowRef: CasRef,
): { suspendedRole: string | null; suspendMessage: string | null } {
const chain = walkChain(uwf, head);
const { lastRole, lastOutput } = resolveEvaluateArgs(uwf, chain);
const workflow = loadWorkflowPayload(uwf, workflowRef);
const result = evaluate(workflow.graph, lastRole, lastOutput);
if (result.ok && isSuspendResult(result.value)) {
return {
suspendedRole: result.value.suspendedRole,
suspendMessage: result.value.prompt,
};
}
return { suspendedRole: null, suspendMessage: null };
}
function resolveSuspendFieldsForShow(
entry: ThreadIndexEntry,
status: ThreadStatus,
uwf: UwfStore,
head: CasRef,
workflowRef: CasRef,
): { suspendedRole: string | null; suspendMessage: string | null } {
if (status !== "suspended") {
return { suspendedRole: null, suspendMessage: null };
}
if (entry.suspendedRole !== null && entry.suspendMessage !== null) {
return { suspendedRole: entry.suspendedRole, suspendMessage: entry.suspendMessage };
}
const fromGraph = resolveSuspendFieldsFromGraph(uwf, head, workflowRef);
return {
suspendedRole: entry.suspendedRole ?? fromGraph.suspendedRole,
suspendMessage: entry.suspendMessage ?? fromGraph.suspendMessage,
};
}
async function ensureThreadSuspendMetadata(
storageRoot: string,
threadId: ThreadId,
entry: ThreadIndexEntry,
suspendedRole: string,
suspendMessage: string,
): Promise<ThreadIndexEntry> {
if (entry.suspendedRole !== null && entry.suspendMessage !== null) {
return entry;
}
const updated = markThreadSuspended(entry, suspendedRole, suspendMessage);
const index = await loadThreadsIndex(storageRoot);
index[threadId] = updated;
await saveThreadsIndex(storageRoot, index);
return updated;
}
async function resolveActiveThreadStatus( async function resolveActiveThreadStatus(
storageRoot: string, storageRoot: string,
threadId: ThreadId, threadId: ThreadId,
@@ -380,7 +450,7 @@ export async function cmdThreadStart(
} }
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
index[threadId] = headHash; index[threadId] = createThreadIndexEntry(headHash);
await saveThreadsIndex(storageRoot, index); await saveThreadsIndex(storageRoot, index);
plog.log( plog.log(
@@ -394,8 +464,9 @@ export async function cmdThreadStart(
export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise<StepOutput> { export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const entry = index[threadId];
if (activeHead !== undefined) { if (entry !== undefined) {
const activeHead = entry.head;
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const workflow = resolveWorkflowFromHead(uwf, activeHead); const workflow = resolveWorkflowFromHead(uwf, activeHead);
if (workflow === null) { if (workflow === null) {
@@ -410,6 +481,7 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
workflow, workflow,
); );
const currentRole = resolveCurrentRole(uwf, activeHead, workflow); const currentRole = resolveCurrentRole(uwf, activeHead, workflow);
const suspendFields = resolveSuspendFieldsForShow(entry, status, uwf, activeHead, workflow);
return { return {
workflow, workflow,
@@ -417,6 +489,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
head: activeHead, head: activeHead,
status, status,
currentRole, currentRole,
suspendedRole: suspendFields.suspendedRole,
suspendMessage: suspendFields.suspendMessage,
done: false, done: false,
background: null, background: null,
}; };
@@ -432,6 +506,8 @@ export async function cmdThreadShow(storageRoot: string, threadId: ThreadId): Pr
head: hist.head, head: hist.head,
status, status,
currentRole: null, currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true, done: true,
background: null, background: null,
}; };
@@ -473,13 +549,8 @@ async function collectActiveThreads(
index: ThreadsIndex, index: ThreadsIndex,
): Promise<ThreadListItemWithStatus[]> { ): Promise<ThreadListItemWithStatus[]> {
const items: ThreadListItemWithStatus[] = []; const items: ThreadListItemWithStatus[] = [];
for (const [threadId, head] of Object.entries(index)) { for (const [threadId, entry] of Object.entries(index)) {
const item = await threadListItemFromActive( const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, entry.head);
storageRoot,
uwf,
threadId as ThreadId,
head as CasRef,
);
if (item !== null) { if (item !== null) {
items.push(item); items.push(item);
} }
@@ -1011,12 +1082,12 @@ async function resolveActiveThreadWorkflowHash(
threadId: ThreadId, threadId: ThreadId,
): Promise<CasRef> { ): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const entry = index[threadId];
if (headHash === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, entry.head);
return chain.start.workflow; return chain.start.workflow;
} }
@@ -1030,10 +1101,11 @@ async function cmdThreadStepBackground(
): Promise<StepOutput[]> { ): Promise<StepOutput[]> {
// Get current head to return to caller // Get current head to return to caller
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const entry = index[threadId];
if (headHash === undefined) { if (entry === undefined) {
failStep(plog, `thread not active: ${threadId}`); failStep(plog, `thread not active: ${threadId}`);
} }
const headHash = entry.head;
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
@@ -1069,6 +1141,8 @@ async function cmdThreadStepBackground(
head: headHash, head: headHash,
status: "running", status: "running",
currentRole: resolveCurrentRole(uwf, headHash, workflowHash), currentRole: resolveCurrentRole(uwf, headHash, workflowHash),
suspendedRole: null,
suspendMessage: null,
done: false, done: false,
background: true, background: true,
}, },
@@ -1082,10 +1156,11 @@ async function cmdThreadStepOnce(
plog: ProcessLogger, plog: ProcessLogger,
): Promise<StepOutput> { ): Promise<StepOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const entry = index[threadId];
if (headHash === undefined) { if (entry === undefined) {
failStep(plog, `thread not active: ${threadId}`); failStep(plog, `thread not active: ${threadId}`);
} }
const headHash = entry.head;
const uwf = await createUwfStore(storageRoot); const uwf = await createUwfStore(storageRoot);
const chain = walkChain(uwf, headHash); const chain = walkChain(uwf, headHash);
@@ -1109,6 +1184,13 @@ async function cmdThreadStepOnce(
); );
if (isSuspendResult(nextResult.value)) { if (isSuspendResult(nextResult.value)) {
await ensureThreadSuspendMetadata(
storageRoot,
threadId,
entry,
nextResult.value.suspendedRole,
nextResult.value.prompt,
);
return buildStepOutputFromEvaluation( return buildStepOutputFromEvaluation(
workflowHash, workflowHash,
threadId, threadId,
@@ -1128,6 +1210,8 @@ async function cmdThreadStepOnce(
head: headHash, head: headHash,
status: "completed", status: "completed",
currentRole: null, currentRole: null,
suspendedRole: null,
suspendMessage: null,
done: true, done: true,
background: null, background: null,
}; };
@@ -1162,7 +1246,8 @@ async function cmdThreadStepOnce(
// Reload threads index to avoid overwriting changes made by the agent subprocess // Reload threads index to avoid overwriting changes made by the agent subprocess
const freshIndex = await loadThreadsIndex(storageRoot); const freshIndex = await loadThreadsIndex(storageRoot);
freshIndex[threadId] = newHead; const priorEntry = freshIndex[threadId] ?? createThreadIndexEntry(newHead);
freshIndex[threadId] = updateThreadHead(priorEntry, newHead);
await saveThreadsIndex(storageRoot, freshIndex); await saveThreadsIndex(storageRoot, freshIndex);
const chainAfter = walkChain(uwfAfter, newHead); const chainAfter = walkChain(uwfAfter, newHead);
@@ -1176,6 +1261,12 @@ async function cmdThreadStepOnce(
} }
if (isSuspendResult(afterResult.value)) { if (isSuspendResult(afterResult.value)) {
freshIndex[threadId] = markThreadSuspended(
freshIndex[threadId] ?? createThreadIndexEntry(newHead),
afterResult.value.suspendedRole,
afterResult.value.prompt,
);
await saveThreadsIndex(storageRoot, freshIndex);
return buildStepOutputFromEvaluation( return buildStepOutputFromEvaluation(
workflowHash, workflowHash,
threadId, threadId,
@@ -1202,6 +1293,8 @@ async function cmdThreadStepOnce(
head: newHead, head: newHead,
status, status,
currentRole, currentRole,
suspendedRole: null,
suspendMessage: null,
done, done,
background: null, background: null,
}; };
@@ -1209,7 +1302,7 @@ async function cmdThreadStepOnce(
async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> { async function resolveHeadHash(storageRoot: string, threadId: ThreadId): Promise<CasRef> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const activeHead = index[threadId]; const activeHead = index[threadId]?.head;
if (activeHead !== undefined) { if (activeHead !== undefined) {
return activeHead; return activeHead;
} }
@@ -1262,8 +1355,8 @@ export type CancelOutput = {
*/ */
export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> { export async function cmdThreadStop(storageRoot: string, threadId: ThreadId): Promise<StopOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const entry = index[threadId];
if (head === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
@@ -1292,10 +1385,11 @@ export async function cmdThreadCancel(
threadId: ThreadId, threadId: ThreadId,
): Promise<CancelOutput> { ): Promise<CancelOutput> {
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const head = index[threadId]; const entry = index[threadId];
if (head === undefined) { if (entry === undefined) {
fail(`thread not active: ${threadId}`); fail(`thread not active: ${threadId}`);
} }
const head = entry.head;
// Check if thread is running in background and terminate it // Check if thread is running in background and terminate it
const runningMarker = await isThreadRunning(storageRoot, threadId); const runningMarker = await isThreadRunning(storageRoot, threadId);
+30 -13
View File
@@ -5,7 +5,18 @@ import { join } from "node:path";
import type { BootstrapCapableStore, Hash } from "@ocas/core"; import type { BootstrapCapableStore, Hash } from "@ocas/core";
import { createFsStore } from "@ocas/fs"; import { createFsStore } from "@ocas/fs";
import type { CasRef, ThreadId, ThreadListItem, ThreadsIndex } from "@uncaged/workflow-protocol"; import type {
CasRef,
ThreadId,
ThreadIndexEntry,
ThreadListItem,
ThreadsIndex,
} from "@uncaged/workflow-protocol";
import {
createThreadIndexEntry,
parseThreadsIndex,
serializeThreadsIndex,
} from "@uncaged/workflow-protocol";
import { parse, stringify } from "yaml"; import { parse, stringify } from "yaml";
import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js"; import { registerUwfSchemas, type UwfSchemaHashes } from "./schemas.js";
@@ -234,16 +245,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
try { try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { return parseThreadsIndex(raw);
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, head] of Object.entries(raw as Record<string, unknown>)) {
if (typeof head === "string") {
index[threadId as ThreadId] = head;
}
}
return index;
} catch (e) { } catch (e) {
const err = e as NodeJS.ErrnoException; const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") { if (err.code === "ENOENT") {
@@ -253,10 +255,25 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
} }
} }
export async function saveThreadsIndex(storageRoot: string, index: ThreadsIndex): Promise<void> { /** Accept legacy CasRef values for test convenience. */
export type ThreadsIndexInput = Record<ThreadId, ThreadIndexEntry | CasRef>;
function normalizeThreadsIndexInput(index: ThreadsIndexInput): ThreadsIndex {
const normalized: ThreadsIndex = {};
for (const [threadId, value] of Object.entries(index)) {
normalized[threadId as ThreadId] =
typeof value === "string" ? createThreadIndexEntry(value as CasRef) : value;
}
return normalized;
}
export async function saveThreadsIndex(
storageRoot: string,
index: ThreadsIndexInput,
): Promise<void> {
const path = getThreadsPath(storageRoot); const path = getThreadsPath(storageRoot);
await mkdir(storageRoot, { recursive: true }); await mkdir(storageRoot, { recursive: true });
const text = stringify(index, { indent: 2 }); const text = stringify(serializeThreadsIndex(normalizeThreadsIndexInput(index)), { indent: 2 });
await writeFile(path, text, "utf8"); await writeFile(path, text, "utf8");
} }
@@ -0,0 +1,79 @@
import { describe, expect, test } from "vitest";
import {
createThreadIndexEntry,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
serializeThreadIndexEntry,
serializeThreadsIndex,
updateThreadHead,
} from "../thread-index.js";
describe("thread-index", () => {
test("parse legacy string head hash", () => {
const entry = normalizeThreadIndexEntry("0123456789ABC");
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
});
});
test("parse suspended object entry", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
});
test("serialize non-suspended entry as compact string", () => {
const entry = createThreadIndexEntry("0123456789ABC");
expect(serializeThreadIndexEntry(entry)).toBe("0123456789ABC");
});
test("serialize suspended entry as object", () => {
const entry = markThreadSuspended(
createThreadIndexEntry("0123456789ABC"),
"worker",
"Please clarify: Which API?",
);
expect(serializeThreadIndexEntry(entry)).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
});
});
test("updateThreadHead clears suspend metadata", () => {
const suspended = markThreadSuspended(
createThreadIndexEntry("OLDHEAD0123456"),
"worker",
"Waiting",
);
const resumed = updateThreadHead(suspended, "NEWHEAD01234567");
expect(resumed).toEqual({
head: "NEWHEAD01234567",
suspendedRole: null,
suspendMessage: null,
});
});
test("parseThreadsIndex round-trip", () => {
const raw = {
"01THREAD0000000000000001": "HEAD00000000001",
"01THREAD0000000000000002": {
head: "HEAD00000000002",
suspendedRole: "reviewer",
suspendMessage: "Need input",
},
};
const parsed = parseThreadsIndex(raw);
expect(serializeThreadsIndex(parsed)).toEqual(raw);
});
});
+10
View File
@@ -3,6 +3,15 @@ export {
STEP_NODE_SCHEMA, STEP_NODE_SCHEMA,
WORKFLOW_SCHEMA, WORKFLOW_SCHEMA,
} from "./schemas.js"; } from "./schemas.js";
export {
createThreadIndexEntry,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
serializeThreadIndexEntry,
serializeThreadsIndex,
updateThreadHead,
} from "./thread-index.js";
export type { export type {
AgentAlias, AgentAlias,
AgentConfig, AgentConfig,
@@ -29,6 +38,7 @@ export type {
Target, Target,
ThreadForkOutput, ThreadForkOutput,
ThreadId, ThreadId,
ThreadIndexEntry,
ThreadListItem, ThreadListItem,
ThreadStatus, ThreadStatus,
ThreadStepsOutput, ThreadStepsOutput,
@@ -0,0 +1,89 @@
import type { CasRef, ThreadId, ThreadIndexEntry, ThreadsIndex } from "./types.js";
/** Normalize a legacy head hash or entry object into {@link ThreadIndexEntry}. */
export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null {
if (typeof raw === "string") {
return createThreadIndexEntry(raw as CasRef);
}
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return null;
}
const rec = raw as Record<string, unknown>;
const head = rec.head;
if (typeof head !== "string") {
return null;
}
const suspendedRole = rec.suspendedRole;
const suspendMessage = rec.suspendMessage;
return {
head: head as CasRef,
suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null,
suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null,
};
}
export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry {
return {
head,
suspendedRole: null,
suspendMessage: null,
};
}
export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): ThreadIndexEntry {
return {
head,
suspendedRole: null,
suspendMessage: null,
};
}
export function markThreadSuspended(
entry: ThreadIndexEntry,
suspendedRole: string,
suspendMessage: string,
): ThreadIndexEntry {
return {
head: entry.head,
suspendedRole,
suspendMessage,
};
}
/** Serialize for threads.yaml — compact string when not suspended. */
export function serializeThreadIndexEntry(
entry: ThreadIndexEntry,
): string | Record<string, string> {
if (entry.suspendedRole === null || entry.suspendMessage === null) {
return entry.head;
}
return {
head: entry.head,
suspendedRole: entry.suspendedRole,
suspendMessage: entry.suspendMessage,
};
}
export function parseThreadsIndex(raw: unknown): ThreadsIndex {
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, value] of Object.entries(raw as Record<string, unknown>)) {
const entry = normalizeThreadIndexEntry(value);
if (entry !== null) {
index[threadId as ThreadId] = entry;
}
}
return index;
}
export function serializeThreadsIndex(
index: ThreadsIndex,
): Record<string, string | Record<string, string>> {
const out: Record<string, string | Record<string, string>> = {};
for (const [threadId, entry] of Object.entries(index)) {
out[threadId] = serializeThreadIndexEntry(entry);
}
return out;
}
+12 -1
View File
@@ -105,10 +105,21 @@ export type StepOutput = {
status: ThreadStatus; status: ThreadStatus;
/** The current or next role. Null when completed, cancelled, suspended, or next is $END. */ /** The current or next role. Null when completed, cancelled, suspended, or next is $END. */
currentRole: string | null; currentRole: string | null;
/** Role whose output triggered suspension. Null when thread is not suspended. */
suspendedRole: string | null;
/** Rendered suspend prompt for the user. Null when thread is not suspended. */
suspendMessage: string | null;
done: boolean; done: boolean;
background: boolean | null; background: boolean | null;
}; };
/** Active thread entry in ~/.uncaged/workflow/threads.yaml */
export type ThreadIndexEntry = {
head: CasRef;
suspendedRole: string | null;
suspendMessage: string | null;
};
/** uwf thread steps — single step entry */ /** uwf thread steps — single step entry */
export type StepEntry = { export type StepEntry = {
hash: CasRef; hash: CasRef;
@@ -200,4 +211,4 @@ export type WorkflowConfig = {
}; };
/** ~/.uncaged/workflow/threads.yaml */ /** ~/.uncaged/workflow/threads.yaml */
export type ThreadsIndex = Record<ThreadId, CasRef>; export type ThreadsIndex = Record<ThreadId, ThreadIndexEntry>;
+2 -2
View File
@@ -163,7 +163,7 @@ export async function buildContext(
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const headHash = index[threadId]?.head;
if (headHash === undefined) { if (headHash === undefined) {
fail(`thread not found in threads.yaml: ${threadId}`); fail(`thread not found in threads.yaml: ${threadId}`);
} }
@@ -212,7 +212,7 @@ export async function buildContextWithMeta(
const { store, schemas } = agentStore; const { store, schemas } = agentStore;
const index = await loadThreadsIndex(storageRoot); const index = await loadThreadsIndex(storageRoot);
const headHash = index[threadId]; const headHash = index[threadId]?.head;
if (headHash === undefined) { if (headHash === undefined) {
fail(`thread not found in threads.yaml: ${threadId}`); fail(`thread not found in threads.yaml: ${threadId}`);
} }
+2 -11
View File
@@ -12,11 +12,11 @@ import type {
ProviderAlias, ProviderAlias,
ProviderConfig, ProviderConfig,
Scenario, Scenario,
ThreadId,
ThreadsIndex, ThreadsIndex,
WorkflowConfig, WorkflowConfig,
WorkflowName, WorkflowName,
} from "@uncaged/workflow-protocol"; } from "@uncaged/workflow-protocol";
import { parseThreadsIndex } from "@uncaged/workflow-protocol";
import { parse } from "yaml"; import { parse } from "yaml";
import { registerAgentSchemas } from "./schemas.js"; import { registerAgentSchemas } from "./schemas.js";
@@ -207,16 +207,7 @@ export async function loadThreadsIndex(storageRoot: string): Promise<ThreadsInde
try { try {
const text = await readFile(path, "utf8"); const text = await readFile(path, "utf8");
const raw = parse(text) as unknown; const raw = parse(text) as unknown;
if (!isRecord(raw)) { return parseThreadsIndex(raw);
return {};
}
const index: ThreadsIndex = {};
for (const [threadId, head] of Object.entries(raw)) {
if (typeof head === "string") {
index[threadId as ThreadId] = head;
}
}
return index;
} catch (e) { } catch (e) {
const err = e as NodeJS.ErrnoException; const err = e as NodeJS.ErrnoException;
if (err.code === "ENOENT") { if (err.code === "ENOENT") {