Merge pull request 'refactor: unified thread storage + resume completed threads' (#45) from refactor/39-unified-thread-storage into main
CI / check (push) Failing after 1m26s

refactor: unified thread storage + resume completed threads (#45)
This commit was merged in pull request #45.
This commit is contained in:
2026-06-04 07:25:56 +00:00
19 changed files with 1031 additions and 375 deletions
@@ -118,8 +118,10 @@ async function setupSuspendedThread(mode: MockAgentMode): Promise<{
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: stepHash,
status: "suspended",
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
},
});
@@ -247,7 +249,7 @@ describe("uwf thread resume", () => {
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread is not suspended");
expect(result.stderr).toContain("thread cannot be resumed");
});
test("resume suspended thread executes step and becomes idle", async () => {
@@ -347,8 +349,10 @@ describe("uwf thread resume", () => {
const uwfAfterFirst = await createUwfStore(tmpDir);
expect(getThread(uwfAfterFirst.varStore, THREAD_ID)).toEqual({
head: firstResume.head,
status: "suspended",
suspendedRole: "worker",
suspendMessage: SUSPEND_MESSAGE,
completedAt: null,
});
const { mockAgentPath: okMockAgentPath } = await setupOkMockAgent(
@@ -444,3 +448,266 @@ echo '${adapterJson}'
return { mockAgentPath };
}
describe("uwf thread resume - completed threads", () => {
test("resume completed thread starts from $START role", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const outputSchemaHash = await putSchema(store, OUTPUT_SCHEMA);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "test-completed-resume",
description: "completed thread resume test",
roles: {
worker: {
description: "Worker role",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: outputSchemaHash,
},
reviewer: {
description: "Reviewer role",
goal: "Review",
capabilities: [],
procedure: "review",
output: "result",
frontmatter: outputSchemaHash,
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start work", location: null } },
worker: { _: { role: "reviewer", prompt: "Review the work", location: null } },
reviewer: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Initial task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
const workerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const reviewerOutputHash = await store.cas.put(outputSchemaHash, { $status: "_" });
const detailHash = await store.cas.put(schemas.text, "mock detail");
const workerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600000000,
completedAtMs: 1716600001000,
cwd: tmpDir,
assembledPrompt: null,
});
const reviewerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: workerStepHash,
role: "reviewer",
output: reviewerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Review the work",
startedAtMs: 1716600001000,
completedAtMs: 1716600002000,
cwd: tmpDir,
assembledPrompt: null,
});
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: reviewerStepHash,
status: "completed",
suspendedRole: null,
suspendMessage: null,
completedAt: 1716600002000,
},
});
// Verify the status was actually set
const { createUwfStore, getThread } = await import("../store.js");
const verifyUwf = await createUwfStore(tmpDir);
const verifyEntry = getThread(verifyUwf.varStore, THREAD_ID);
// biome-ignore lint/nursery/noConsole: test debugging
console.log("Seeded entry status:", verifyEntry?.status);
// biome-ignore lint/nursery/noConsole: test debugging
console.log("Seeded entry:", JSON.stringify(verifyEntry, null, 2));
const promptCapturePath = join(tmpDir, "captured-prompt-completed.txt");
const mockAgentPath = join(tmpDir, "mock-agent-completed.sh");
const newWorkerStepHash = await store.cas.put(schemas.stepNode, {
start: startHash,
prev: reviewerStepHash,
role: "worker",
output: workerOutputHash,
detail: detailHash,
agent: "uwf-mock",
edgePrompt: "Start work",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
cwd: tmpDir,
assembledPrompt: null,
});
const adapterJson = JSON.stringify({
stepHash: newWorkerStepHash,
detailHash,
role: "worker",
frontmatter: { $status: "_" },
body: "",
startedAtMs: 1716600003000,
completedAtMs: 1716600004000,
});
await writeFile(
mockAgentPath,
`#!/bin/sh
prompt=""
while [ $# -gt 0 ]; do
if [ "$1" = "--prompt" ]; then
prompt="$2"
shift 2
else
shift
fi
done
printf '%s' "$prompt" > '${promptCapturePath}'
echo '${adapterJson}'
`,
{ mode: 0o755 },
);
const configPath = join(tmpDir, "config.yaml");
await writeFile(
configPath,
`defaultAgent: uwf-hermes\ndefaultModel: test-model\nagentOverrides: null\nagents: {}\nproviders: {}\nmodels: {}\n`,
);
const result = runUwf(
["thread", "resume", THREAD_ID, "-p", "Additional context", "--agent", mockAgentPath],
casDir,
);
if (result.status !== 0) {
// biome-ignore lint/nursery/noConsole: test debugging
console.error("Command failed:", result.stderr);
}
expect(result.status).toBe(0);
const cliOutput = JSON.parse(result.stdout.trim());
expect(cliOutput.status).toBe("idle");
expect(cliOutput.currentRole).toBe("reviewer");
expect(cliOutput.done).toBe(false);
const capturedPrompt = await readFile(promptCapturePath, "utf8");
expect(capturedPrompt).toContain("Previous run completed");
expect(capturedPrompt).toContain("Additional context");
const storeModule = await import("../store.js");
const uwf2 = await storeModule.createUwfStore(tmpDir);
const entry2 = storeModule.getThread(uwf2.varStore, THREAD_ID);
expect(entry2?.status).toBe("idle");
expect(entry2?.completedAt).toBeNull();
});
test("resume cancelled thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "cancelled-workflow",
description: "cancelled thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
await seedThreads(tmpDir, {
[THREAD_ID]: {
head: startHash,
status: "cancelled",
suspendedRole: null,
suspendMessage: null,
completedAt: null,
},
});
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("cancelled");
});
test("resume idle thread returns error", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = await openStore(casDir);
const schemas = await registerUwfSchemas(store);
const workflowHash = await store.cas.put(schemas.workflow, {
name: "idle-workflow",
description: "idle thread",
roles: {
worker: {
description: "Worker",
goal: "Work",
capabilities: [],
procedure: "work",
output: "result",
frontmatter: await putSchema(store, OUTPUT_SCHEMA),
},
},
graph: {
$START: { _: { role: "worker", prompt: "Start", location: null } },
worker: { _: { role: "$END", prompt: "Done", location: null } },
},
});
const startHash = await store.cas.put(schemas.startNode, {
workflow: workflowHash,
prompt: "task",
cwd: tmpDir,
});
process.env.OCAS_DIR = casDir;
await seedThreads(tmpDir, { [THREAD_ID]: startHash });
const result = runUwf(["thread", "resume", THREAD_ID], casDir);
expect(result.status).not.toBe(0);
expect(result.stderr).toContain("thread cannot be resumed");
expect(result.stderr).toContain("idle");
});
});