refactor: unified thread storage + resume completed threads #45

Merged
xiaomo merged 3 commits from refactor/39-unified-thread-storage into main 2026-06-04 07:25:57 +00:00
4 changed files with 146 additions and 8 deletions
Showing only changes of commit 23e2ae9eb4 - Show all commits
@@ -1,6 +1,7 @@
import { describe, expect, test } from "vitest";
import {
createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
@@ -16,6 +17,8 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
@@ -29,6 +32,40 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
status: "idle",
completedAt: null,
});
});
test("normalizeThreadIndexEntry preserves status and completedAt from new data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("normalizeThreadIndexEntry defaults status=idle, completedAt=null for old data", () => {
const entry = normalizeThreadIndexEntry({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
});
expect(entry).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
@@ -47,10 +84,24 @@ describe("thread-index", () => {
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Please clarify: Which API?",
status: "suspended",
});
});
test("updateThreadHead clears suspend metadata", () => {
test("serialize completed entry as object", () => {
const entry = markThreadCompleted(
createThreadIndexEntry("0123456789ABC"),
"completed",
1234567890,
);
expect(serializeThreadIndexEntry(entry)).toEqual({
head: "0123456789ABC",
status: "completed",
completedAt: 1234567890,
});
});
test("updateThreadHead clears suspend metadata and resets status to idle", () => {
const suspended = markThreadSuspended(
createThreadIndexEntry("OLDHEAD0123456"),
"worker",
@@ -61,6 +112,44 @@ describe("thread-index", () => {
head: "NEWHEAD01234567",
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
});
});
test("markThreadSuspended sets status to suspended", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const suspended = markThreadSuspended(entry, "worker", "Waiting for input");
expect(suspended).toEqual({
head: "0123456789ABC",
suspendedRole: "worker",
suspendMessage: "Waiting for input",
status: "suspended",
completedAt: null,
});
});
test("markThreadCompleted sets status and completedAt", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const completed = markThreadCompleted(entry, "completed", 1234567890);
expect(completed).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "completed",
completedAt: 1234567890,
});
});
test("markThreadCompleted with cancelled status", () => {
const entry = createThreadIndexEntry("0123456789ABC");
const cancelled = markThreadCompleted(entry, "cancelled", 9876543210);
expect(cancelled).toEqual({
head: "0123456789ABC",
suspendedRole: null,
suspendMessage: null,
status: "cancelled",
completedAt: 9876543210,
});
});
@@ -71,6 +160,7 @@ describe("thread-index", () => {
head: "HEAD00000000002",
suspendedRole: "reviewer",
suspendMessage: "Need input",
status: "suspended",
},
};
const parsed = parseThreadsIndex(raw);
+1
View File
@@ -5,6 +5,7 @@ export {
} from "./schemas.js";
export {
createThreadIndexEntry,
markThreadCompleted,
markThreadSuspended,
normalizeThreadIndexEntry,
parseThreadsIndex,
+52 -7
View File
@@ -15,10 +15,14 @@ export function normalizeThreadIndexEntry(raw: unknown): ThreadIndexEntry | null
}
const suspendedRole = rec.suspendedRole;
const suspendMessage = rec.suspendMessage;
const status = rec.status;
const completedAt = rec.completedAt;
return {
head: head as CasRef,
suspendedRole: typeof suspendedRole === "string" ? suspendedRole : null,
suspendMessage: typeof suspendMessage === "string" ? suspendMessage : null,
status: typeof status === "string" ? (status as "idle" | "running" | "suspended" | "completed" | "cancelled") : "idle",
completedAt: typeof completedAt === "number" ? completedAt : null,
};
}
@@ -27,6 +31,8 @@ export function createThreadIndexEntry(head: CasRef): ThreadIndexEntry {
head,
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
};
}
@@ -35,6 +41,8 @@ export function updateThreadHead(_entry: ThreadIndexEntry, head: CasRef): Thread
head,
suspendedRole: null,
suspendMessage: null,
status: "idle",
completedAt: null,
};
}
@@ -47,21 +55,58 @@ export function markThreadSuspended(
head: entry.head,
suspendedRole,
suspendMessage,
status: "suspended",
completedAt: null,
};
}
export function markThreadCompleted(
entry: ThreadIndexEntry,
status: "completed" | "cancelled",
now: number,
): ThreadIndexEntry {
return {
head: entry.head,
suspendedRole: null,
suspendMessage: null,
status,
completedAt: now,
};
}
/** Serialize for variable store — compact string when not suspended. */
export function serializeThreadIndexEntry(
entry: ThreadIndexEntry,
): string | Record<string, string> {
if (entry.suspendedRole === null || entry.suspendMessage === null) {
): string | Record<string, string | number> {
// Compact string only for idle status with no suspend metadata
if (entry.status === "idle" && entry.suspendedRole === null && entry.suspendMessage === null && entry.completedAt === null) {
return entry.head;
}
return {
// Build object representation
const obj: Record<string, string | number> = {
head: entry.head,
suspendedRole: entry.suspendedRole,
suspendMessage: entry.suspendMessage,
};
// Include suspend metadata if present
if (entry.suspendedRole !== null) {
obj.suspendedRole = entry.suspendedRole;
}
if (entry.suspendMessage !== null) {
obj.suspendMessage = entry.suspendMessage;
}
// Always include status if not idle
if (entry.status !== "idle") {
obj.status = entry.status;
}
// Include completedAt if present
if (entry.completedAt !== null) {
obj.completedAt = entry.completedAt;
}
return obj;
}
export function parseThreadsIndex(raw: unknown): ThreadsIndex {
@@ -80,8 +125,8 @@ export function parseThreadsIndex(raw: unknown): ThreadsIndex {
export function serializeThreadsIndex(
index: ThreadsIndex,
): Record<string, string | Record<string, string>> {
const out: Record<string, string | Record<string, string>> = {};
): Record<string, string | Record<string, string | number>> {
const out: Record<string, string | Record<string, string | number>> = {};
for (const [threadId, entry] of Object.entries(index)) {
out[threadId] = serializeThreadIndexEntry(entry);
}
+2
View File
@@ -118,6 +118,8 @@ export type ThreadIndexEntry = {
head: CasRef;
suspendedRole: string | null;
suspendMessage: string | null;
status: ThreadStatus;
completedAt: number | null;
};
/** uwf thread steps — single step entry */