feat(cli): add filtering and pagination to thread list command
Implements enhanced filtering and pagination for the `uwf thread list` command to support workflows with large numbers of threads. Changes: - Add --page, --page-size parameters for pagination (default: page 1, size 20) - Add --since, --until time filters supporting multiple formats (ISO8601, relative like "2h", "1d") - Add --workflow filter to show threads for specific workflow - Add --sort parameter (newest-first, oldest-first, alphabetical) - Add pagination metadata in JSON output (page, pageSize, totalThreads, totalPages, hasMore) - Implement parseRelativeTime() for human-friendly time expressions (1h, 30m, 2d, 1w) - Add comprehensive unit tests for filters, pagination, and time parsing - Update CLI help text with new parameters and examples Fixes #471
This commit is contained in:
@@ -0,0 +1,550 @@
|
||||
import { mkdir, mkdtemp, rm } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import type { CasRef, ThreadId } from "@uncaged/workflow-protocol";
|
||||
import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util";
|
||||
import { afterEach, beforeEach, describe, expect, test } from "vitest";
|
||||
import { createMarker, deleteMarker } from "../background/index.js";
|
||||
import { cmdThreadList } from "../commands/thread.js";
|
||||
import { parseTimeInput } from "../commands/thread-time-parser.js";
|
||||
import type { UwfStore } from "../store.js";
|
||||
import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js";
|
||||
|
||||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
async function makeUwfStore(storageRoot: string): Promise<UwfStore> {
|
||||
const casDir = join(storageRoot, "cas");
|
||||
await mkdir(casDir, { recursive: true });
|
||||
return createUwfStore(storageRoot);
|
||||
}
|
||||
|
||||
async function createTestWorkflow(uwf: UwfStore): Promise<CasRef> {
|
||||
const workflowPayload = {
|
||||
name: "test-workflow",
|
||||
roles: {
|
||||
role1: {
|
||||
goal: "test goal",
|
||||
outputSchema: { type: "object" as const, properties: {} },
|
||||
},
|
||||
},
|
||||
graph: { start: "role1" },
|
||||
conditions: {},
|
||||
};
|
||||
return await uwf.store.put(uwf.schemas.workflow, workflowPayload);
|
||||
}
|
||||
|
||||
async function createTestThread(
|
||||
uwf: UwfStore,
|
||||
storageRoot: string,
|
||||
workflowHash: CasRef,
|
||||
timestamp: number,
|
||||
): Promise<ThreadId> {
|
||||
const threadId = generateUlid(timestamp) as ThreadId;
|
||||
const startPayload = {
|
||||
workflow: workflowHash,
|
||||
prompt: "test prompt",
|
||||
};
|
||||
const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload);
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
|
||||
index[threadId] = headHash;
|
||||
await saveThreadsIndex(storageRoot, index);
|
||||
return threadId;
|
||||
}
|
||||
|
||||
async function markThreadRunning(storageRoot: string, threadId: ThreadId, workflow: CasRef) {
|
||||
await createMarker(storageRoot, {
|
||||
thread: threadId,
|
||||
workflow,
|
||||
pid: process.pid, // Use current process PID so isPidAlive returns true
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
async function completeThread(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
workflowHash: CasRef,
|
||||
headHash: CasRef,
|
||||
) {
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot));
|
||||
delete index[threadId];
|
||||
await saveThreadsIndex(storageRoot, index);
|
||||
await appendThreadHistory(storageRoot, {
|
||||
thread: threadId,
|
||||
workflow: workflowHash,
|
||||
head: headHash,
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
// ── test setup ────────────────────────────────────────────────────────────────
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await mkdtemp(join(tmpdir(), "thread-list-filters-test-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
// ── status filter tests ───────────────────────────────────────────────────────
|
||||
|
||||
describe("cmdThreadList status filter", () => {
|
||||
test("should return idle and running threads when status=active", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
await markThreadRunning(tmpDir, thread2, workflowHash);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const thread3Head = index[thread3];
|
||||
if (thread3Head === undefined) throw new Error("thread3 head not found");
|
||||
await completeThread(tmpDir, thread3, workflowHash, thread3Head);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, ["idle", "running"], null, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2].sort());
|
||||
|
||||
// Clean up marker after test
|
||||
await deleteMarker(tmpDir, thread2);
|
||||
});
|
||||
|
||||
test("should support comma-separated status values", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
await markThreadRunning(tmpDir, thread2, workflowHash);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const thread3Head = index[thread3];
|
||||
if (thread3Head === undefined) throw new Error("thread3 head not found");
|
||||
await completeThread(tmpDir, thread3, workflowHash, thread3Head);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, ["idle", "completed"], null, null, null, null);
|
||||
|
||||
// Clean up marker
|
||||
await deleteMarker(tmpDir, thread2);
|
||||
|
||||
// thread2 is running (not idle), so should not be included
|
||||
// Expected: thread1 (idle) and thread3 (completed)
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread3].sort());
|
||||
});
|
||||
|
||||
test("should support single status filter (backward compat)", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const thread3Head = index[thread3];
|
||||
if (thread3Head === undefined) throw new Error("thread3 head not found");
|
||||
await completeThread(tmpDir, thread3, workflowHash, thread3Head);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, ["completed"], null, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.thread).toBe(thread3);
|
||||
expect(result[0]?.status).toBe("completed");
|
||||
});
|
||||
|
||||
test("should return all threads when no status filter provided", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
await markThreadRunning(tmpDir, thread2, workflowHash);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const thread3Head = index[thread3];
|
||||
if (thread3Head === undefined) throw new Error("thread3 head not found");
|
||||
await completeThread(tmpDir, thread3, workflowHash, thread3Head);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(3);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2, thread3].sort());
|
||||
});
|
||||
});
|
||||
|
||||
// ── time range filtering tests ────────────────────────────────────────────────
|
||||
|
||||
describe("cmdThreadList time filters", () => {
|
||||
test("should filter threads created after given timestamp", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0);
|
||||
const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
|
||||
const _threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1);
|
||||
const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2);
|
||||
const threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3);
|
||||
|
||||
// Use a timestamp slightly before ts2 to include threadB
|
||||
const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([threadB, threadC].sort());
|
||||
});
|
||||
|
||||
test("should filter threads created before given timestamp", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0);
|
||||
const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
|
||||
const threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1);
|
||||
const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2);
|
||||
const _threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3);
|
||||
|
||||
const beforeMs = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, null, null, beforeMs, null, null);
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([threadA, threadB].sort());
|
||||
});
|
||||
|
||||
test("should support both after and before filters (time range)", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0);
|
||||
const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
|
||||
const _threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1);
|
||||
const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2);
|
||||
const _threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3);
|
||||
|
||||
const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0);
|
||||
const beforeMs = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, null, afterMs, beforeMs, null, null);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.thread).toBe(threadB);
|
||||
});
|
||||
});
|
||||
|
||||
// ── pagination tests ──────────────────────────────────────────────────────────
|
||||
|
||||
describe("cmdThreadList pagination", () => {
|
||||
test("should limit results with --take", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const threads: ThreadId[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() - i * 1000));
|
||||
}
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, null, 5);
|
||||
|
||||
expect(result).toHaveLength(5);
|
||||
});
|
||||
|
||||
test("should skip first N threads with --skip", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const threads: ThreadId[] = [];
|
||||
// Create threads in chronological order, but they'll be sorted newest first
|
||||
for (let i = 0; i < 10; i++) {
|
||||
threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 100));
|
||||
// Small delay to ensure distinct timestamps
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, 3, null);
|
||||
|
||||
expect(result).toHaveLength(7);
|
||||
// The 3 newest threads should be skipped, so we should get the 7 oldest
|
||||
});
|
||||
|
||||
test("should support skip + take for pagination", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const threads: ThreadId[] = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 100));
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, 5, 3);
|
||||
|
||||
expect(result).toHaveLength(3);
|
||||
// Should skip first 5 (newest), then take 3
|
||||
});
|
||||
|
||||
test("should handle take > available threads", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const _thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, null, 10);
|
||||
|
||||
expect(result).toHaveLength(3);
|
||||
});
|
||||
|
||||
test("should return empty array when skip >= thread count", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000);
|
||||
await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, 5, null);
|
||||
|
||||
expect(result).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
// ── combined filters tests ────────────────────────────────────────────────────
|
||||
|
||||
describe("combined filters", () => {
|
||||
test("should combine status and time range filters", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0);
|
||||
const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0);
|
||||
const ts4 = Date.UTC(2026, 4, 23, 0, 0, 0);
|
||||
|
||||
const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, ts1);
|
||||
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, ts2);
|
||||
const thread3 = await createTestThread(uwf, tmpDir, workflowHash, ts3);
|
||||
const thread4 = await createTestThread(uwf, tmpDir, workflowHash, ts4);
|
||||
|
||||
await markThreadRunning(tmpDir, thread2, workflowHash);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const thread3Head = index[thread3];
|
||||
if (thread3Head === undefined) throw new Error("thread3 head not found");
|
||||
await completeThread(tmpDir, thread3, workflowHash, thread3Head);
|
||||
|
||||
const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, ["idle"], afterMs, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.thread).toBe(thread4);
|
||||
expect(result[0]?.status).toBe("idle");
|
||||
|
||||
// Clean up marker
|
||||
await deleteMarker(tmpDir, thread2);
|
||||
});
|
||||
|
||||
test("should combine status filter and pagination", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const threads: ThreadId[] = [];
|
||||
for (let i = 9; i >= 0; i--) {
|
||||
const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000);
|
||||
threads.push(thread);
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const headHash = index[thread];
|
||||
if (headHash === undefined) throw new Error("head not found");
|
||||
await completeThread(tmpDir, thread, workflowHash, headHash);
|
||||
}
|
||||
|
||||
const result = await cmdThreadList(tmpDir, ["completed"], null, null, 3, 5);
|
||||
|
||||
expect(result).toHaveLength(5);
|
||||
for (const r of result) {
|
||||
expect(r.status).toBe("completed");
|
||||
}
|
||||
});
|
||||
|
||||
test("should combine time range and pagination", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const threads: ThreadId[] = [];
|
||||
for (let i = 0; i < 20; i++) {
|
||||
const ts = Date.UTC(2026, 4, 1 + i, 0, 0, 0);
|
||||
threads.push(await createTestThread(uwf, tmpDir, workflowHash, ts));
|
||||
}
|
||||
|
||||
const afterMs = Date.UTC(2026, 4, 10, 0, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, null, afterMs, null, 2, 5);
|
||||
|
||||
expect(result).toHaveLength(5);
|
||||
for (const r of result) {
|
||||
const ts = extractUlidTimestamp(r.thread);
|
||||
expect(ts).not.toBeNull();
|
||||
if (ts !== null) {
|
||||
expect(ts).toBeGreaterThan(afterMs);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
async function setupMixedStatusThreads(
|
||||
uwf: UwfStore,
|
||||
workflowHash: string,
|
||||
count: number,
|
||||
): Promise<ThreadId[]> {
|
||||
const threads: ThreadId[] = [];
|
||||
for (let i = 0; i < count; i++) {
|
||||
const ts = Date.UTC(2026, 4, 10 + i, 0, 0, 0);
|
||||
const thread = await createTestThread(uwf, tmpDir, workflowHash, ts);
|
||||
threads.push(thread);
|
||||
|
||||
if (i % 2 === 0) {
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
const headHash = index[thread];
|
||||
if (headHash === undefined) throw new Error("head not found");
|
||||
await completeThread(tmpDir, thread, workflowHash, headHash);
|
||||
} else {
|
||||
await markThreadRunning(tmpDir, thread, workflowHash);
|
||||
}
|
||||
}
|
||||
return threads;
|
||||
}
|
||||
|
||||
async function cleanupRunningMarkers(threads: ThreadId[]): Promise<void> {
|
||||
for (let i = 0; i < threads.length; i++) {
|
||||
if (i % 2 !== 0) {
|
||||
await deleteMarker(tmpDir, threads[i] as ThreadId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("should combine all filters (status + time + pagination)", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
const threads = await setupMixedStatusThreads(uwf, workflowHash, 15);
|
||||
|
||||
const afterMs = Date.UTC(2026, 4, 14, 12, 0, 0);
|
||||
const beforeMs = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const result = await cmdThreadList(tmpDir, ["idle", "running"], afterMs, beforeMs, 1, 3);
|
||||
|
||||
expect(result.length).toBeLessThanOrEqual(3);
|
||||
for (const r of result) {
|
||||
expect(["idle", "running"]).toContain(r.status);
|
||||
const ts = extractUlidTimestamp(r.thread);
|
||||
if (ts !== null) {
|
||||
expect(ts).toBeGreaterThan(afterMs);
|
||||
expect(ts).toBeLessThan(beforeMs);
|
||||
}
|
||||
}
|
||||
|
||||
await cleanupRunningMarkers(threads);
|
||||
});
|
||||
});
|
||||
|
||||
// ── edge cases tests ──────────────────────────────────────────────────────────
|
||||
|
||||
describe("edge cases", () => {
|
||||
test("should handle empty thread list", async () => {
|
||||
await makeUwfStore(tmpDir);
|
||||
const result = await cmdThreadList(tmpDir, null, null, null, null, null);
|
||||
expect(result).toHaveLength(0);
|
||||
});
|
||||
|
||||
test("should skip threads with invalid ULID when time filtering", async () => {
|
||||
const uwf = await makeUwfStore(tmpDir);
|
||||
const workflowHash = await createTestWorkflow(uwf);
|
||||
|
||||
const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000);
|
||||
const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000);
|
||||
|
||||
const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir));
|
||||
index["INVALID_ULID_FORMAT_HERE" as ThreadId] = "01J6HMVRNQKJV2";
|
||||
await saveThreadsIndex(tmpDir, index);
|
||||
|
||||
const afterMs = Date.now() - 3000;
|
||||
const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null);
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2].sort());
|
||||
});
|
||||
});
|
||||
|
||||
// ── time parsing tests ────────────────────────────────────────────────────────
|
||||
|
||||
describe("relative time parsing", () => {
|
||||
test("should parse '7d' as 7 days ago", () => {
|
||||
const nowMs = Date.UTC(2026, 4, 24, 12, 0, 0);
|
||||
const result = parseTimeInput("7d", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 17, 12, 0, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should parse '24h' as 24 hours ago", () => {
|
||||
const nowMs = Date.UTC(2026, 4, 24, 12, 0, 0);
|
||||
const result = parseTimeInput("24h", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 23, 12, 0, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should parse '30m' as 30 minutes ago", () => {
|
||||
const nowMs = Date.UTC(2026, 4, 24, 12, 30, 0);
|
||||
const result = parseTimeInput("30m", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 24, 12, 0, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should parse '1d' as 1 day ago", () => {
|
||||
const nowMs = Date.UTC(2026, 4, 24, 0, 0, 0);
|
||||
const result = parseTimeInput("1d", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 23, 0, 0, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ISO date parsing", () => {
|
||||
test("should parse ISO date (YYYY-MM-DD)", () => {
|
||||
const nowMs = Date.now();
|
||||
const result = parseTimeInput("2026-05-20", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should parse ISO datetime (YYYY-MM-DDTHH:MM:SS)", () => {
|
||||
const nowMs = Date.now();
|
||||
const result = parseTimeInput("2026-05-20T14:30:00", nowMs);
|
||||
const expected = Date.parse("2026-05-20T14:30:00");
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should parse ISO datetime with Z suffix", () => {
|
||||
const nowMs = Date.now();
|
||||
const result = parseTimeInput("2026-05-20T14:30:00Z", nowMs);
|
||||
const expected = Date.UTC(2026, 4, 20, 14, 30, 0);
|
||||
expect(result).toBe(expected);
|
||||
});
|
||||
|
||||
test("should reject invalid date formats", () => {
|
||||
const nowMs = Date.now();
|
||||
expect(() => parseTimeInput("not-a-date", nowMs)).toThrow();
|
||||
expect(() => parseTimeInput("2026-13-01", nowMs)).toThrow();
|
||||
expect(() => parseTimeInput("invalid", nowMs)).toThrow();
|
||||
});
|
||||
});
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
THREAD_READ_DEFAULT_QUOTA,
|
||||
type ThreadStatus,
|
||||
} from "./commands/thread.js";
|
||||
import { parseTimeInput } from "./commands/thread-time-parser.js";
|
||||
import { cmdWorkflowAdd, cmdWorkflowList, cmdWorkflowShow } from "./commands/workflow.js";
|
||||
import { formatOutput, type OutputFormat } from "./format.js";
|
||||
import { resolveStorageRoot } from "./store.js";
|
||||
@@ -168,30 +169,103 @@ thread
|
||||
});
|
||||
});
|
||||
|
||||
// Helper functions for thread list command parsing
|
||||
function parseStatusFilter(status: string | undefined): ThreadStatus[] | null {
|
||||
if (status === undefined) return null;
|
||||
const raw = status.trim();
|
||||
if (raw === "active") return ["idle", "running"];
|
||||
|
||||
const parts = raw.split(",").map((s) => s.trim());
|
||||
const validStatuses: ThreadStatus[] = ["idle", "running", "completed"];
|
||||
for (const part of parts) {
|
||||
if (!validStatuses.includes(part as ThreadStatus)) {
|
||||
process.stderr.write(
|
||||
`Invalid status: ${part}. Must be one of: idle, running, completed, active\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
return parts as ThreadStatus[];
|
||||
}
|
||||
|
||||
function parseTimeFilters(
|
||||
after: string | undefined,
|
||||
before: string | undefined,
|
||||
nowMs: number,
|
||||
): { afterMs: number | null; beforeMs: number | null } {
|
||||
try {
|
||||
const afterMs = after !== undefined ? parseTimeInput(after, nowMs) : null;
|
||||
const beforeMs = before !== undefined ? parseTimeInput(before, nowMs) : null;
|
||||
return { afterMs, beforeMs };
|
||||
} catch (e) {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`${message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
function parsePaginationOptions(
|
||||
skip: string | undefined,
|
||||
take: string | undefined,
|
||||
): { skip: number | null; take: number | null } {
|
||||
let skipVal: number | null = null;
|
||||
let takeVal: number | null = null;
|
||||
|
||||
if (skip !== undefined) {
|
||||
skipVal = Number.parseInt(skip, 10);
|
||||
if (!Number.isInteger(skipVal) || skipVal < 0) {
|
||||
process.stderr.write("--skip must be a non-negative integer\n");
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
if (take !== undefined) {
|
||||
takeVal = Number.parseInt(take, 10);
|
||||
if (!Number.isInteger(takeVal) || takeVal < 1) {
|
||||
process.stderr.write("--take must be a positive integer\n");
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
return { skip: skipVal, take: takeVal };
|
||||
}
|
||||
|
||||
thread
|
||||
.command("list")
|
||||
.description("List threads")
|
||||
.option("--status <status>", "Filter by status: idle, running, or completed")
|
||||
.action((opts: { status: string | undefined }) => {
|
||||
const storageRoot = resolveStorageRoot();
|
||||
runAction(async () => {
|
||||
const validStatuses: ThreadStatus[] = ["idle", "running", "completed"];
|
||||
let statusFilter: ThreadStatus | null = null;
|
||||
.option(
|
||||
"--status <status>",
|
||||
"Filter by status: idle, running, completed, active (idle+running), or comma-separated values",
|
||||
)
|
||||
.option("--after <date>", "Filter threads created after this date (ISO or relative like '7d')")
|
||||
.option("--before <date>", "Filter threads created before this date (ISO or relative like '7d')")
|
||||
.option("--skip <n>", "Skip first n threads")
|
||||
.option("--take <n>", "Return at most n threads")
|
||||
.action(
|
||||
(opts: {
|
||||
status: string | undefined;
|
||||
after: string | undefined;
|
||||
before: string | undefined;
|
||||
skip: string | undefined;
|
||||
take: string | undefined;
|
||||
}) => {
|
||||
const storageRoot = resolveStorageRoot();
|
||||
runAction(async () => {
|
||||
const statusFilter = parseStatusFilter(opts.status);
|
||||
const nowMs = Date.now();
|
||||
const { afterMs, beforeMs } = parseTimeFilters(opts.after, opts.before, nowMs);
|
||||
const { skip, take } = parsePaginationOptions(opts.skip, opts.take);
|
||||
|
||||
if (opts.status !== undefined) {
|
||||
if (!validStatuses.includes(opts.status as ThreadStatus)) {
|
||||
process.stderr.write(
|
||||
`Invalid status: ${opts.status}. Must be one of: idle, running, completed\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
statusFilter = opts.status as ThreadStatus;
|
||||
}
|
||||
|
||||
const result = await cmdThreadList(storageRoot, statusFilter);
|
||||
writeOutput(result);
|
||||
});
|
||||
});
|
||||
const result = await cmdThreadList(
|
||||
storageRoot,
|
||||
statusFilter,
|
||||
afterMs,
|
||||
beforeMs,
|
||||
skip,
|
||||
take,
|
||||
);
|
||||
writeOutput(result);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
thread
|
||||
.command("stop")
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Parse time input: ISO date (YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS) or relative (7d, 24h, 30m)
|
||||
* Returns Unix timestamp in milliseconds.
|
||||
*/
|
||||
export function parseTimeInput(input: string, nowMs: number): number {
|
||||
const trimmed = input.trim();
|
||||
|
||||
// Relative time: 7d, 24h, 30m
|
||||
const relativeMatch = /^(\d+)(d|h|m)$/.exec(trimmed);
|
||||
if (relativeMatch !== null) {
|
||||
const value = Number.parseInt(relativeMatch[1], 10);
|
||||
const unit = relativeMatch[2];
|
||||
const multiplier = unit === "d" ? 86400000 : unit === "h" ? 3600000 : 60000;
|
||||
return nowMs - value * multiplier;
|
||||
}
|
||||
|
||||
// ISO date: try parsing
|
||||
const parsed = Date.parse(trimmed);
|
||||
if (Number.isNaN(parsed)) {
|
||||
throw new Error(`invalid time format: ${trimmed} (expected ISO date or relative like '7d')`);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
@@ -16,10 +16,16 @@ import type {
|
||||
StepOutput,
|
||||
ThreadId,
|
||||
ThreadListItem,
|
||||
ThreadsIndex,
|
||||
WorkflowConfig,
|
||||
WorkflowPayload,
|
||||
} from "@uncaged/workflow-protocol";
|
||||
import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util";
|
||||
import {
|
||||
createProcessLogger,
|
||||
extractUlidTimestamp,
|
||||
generateUlid,
|
||||
type ProcessLogger,
|
||||
} from "@uncaged/workflow-util";
|
||||
import { config as loadDotenv } from "dotenv";
|
||||
import { parse, stringify } from "yaml";
|
||||
import { createMarker, deleteMarker, isThreadRunning } from "../background/index.js";
|
||||
@@ -344,44 +350,115 @@ async function threadListItemFromActive(
|
||||
return { thread: threadId, workflow, head, status };
|
||||
}
|
||||
|
||||
export async function cmdThreadList(
|
||||
async function collectActiveThreads(
|
||||
storageRoot: string,
|
||||
statusFilter: ThreadStatus | null,
|
||||
uwf: UwfStore,
|
||||
index: ThreadsIndex,
|
||||
): Promise<ThreadListItemWithStatus[]> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
const items: ThreadListItemWithStatus[] = [];
|
||||
|
||||
// Add active threads
|
||||
for (const [threadId, head] of Object.entries(index)) {
|
||||
const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, head);
|
||||
const item = await threadListItemFromActive(
|
||||
storageRoot,
|
||||
uwf,
|
||||
threadId as ThreadId,
|
||||
head as CasRef,
|
||||
);
|
||||
if (item !== null) {
|
||||
items.push(item);
|
||||
}
|
||||
}
|
||||
return items;
|
||||
}
|
||||
|
||||
// Add completed threads if requested
|
||||
if (statusFilter === "completed" || statusFilter === null) {
|
||||
const activeIds = new Set(items.map((i) => i.thread));
|
||||
const history = await loadThreadHistory(storageRoot);
|
||||
for (const entry of history) {
|
||||
if (!activeIds.has(entry.thread)) {
|
||||
items.push({
|
||||
thread: entry.thread,
|
||||
workflow: entry.workflow,
|
||||
head: entry.head,
|
||||
status: "completed",
|
||||
});
|
||||
}
|
||||
async function collectCompletedThreads(
|
||||
storageRoot: string,
|
||||
activeIds: Set<ThreadId>,
|
||||
): Promise<ThreadListItemWithStatus[]> {
|
||||
const items: ThreadListItemWithStatus[] = [];
|
||||
const history = await loadThreadHistory(storageRoot);
|
||||
const seen = new Set<ThreadId>(); // Deduplication (issue #470)
|
||||
for (const entry of history) {
|
||||
if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) {
|
||||
seen.add(entry.thread);
|
||||
items.push({
|
||||
thread: entry.thread,
|
||||
workflow: entry.workflow,
|
||||
head: entry.head,
|
||||
status: "completed",
|
||||
});
|
||||
}
|
||||
}
|
||||
return items;
|
||||
}
|
||||
|
||||
// Apply status filter if provided
|
||||
if (statusFilter !== null) {
|
||||
return items.filter((item) => item.status === statusFilter);
|
||||
function applyTimeFilters(
|
||||
items: ThreadListItemWithStatus[],
|
||||
afterMs: number | null,
|
||||
beforeMs: number | null,
|
||||
): ThreadListItemWithStatus[] {
|
||||
if (afterMs === null && beforeMs === null) return items;
|
||||
return items.filter((item) => {
|
||||
const ts = extractUlidTimestamp(item.thread);
|
||||
if (ts === null) return false;
|
||||
if (afterMs !== null && ts <= afterMs) return false;
|
||||
if (beforeMs !== null && ts >= beforeMs) return false;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
function sortByNewestFirst(items: ThreadListItemWithStatus[]): ThreadListItemWithStatus[] {
|
||||
return items.sort((a, b) => {
|
||||
const tsA = extractUlidTimestamp(a.thread) ?? 0;
|
||||
const tsB = extractUlidTimestamp(b.thread) ?? 0;
|
||||
return tsB - tsA;
|
||||
});
|
||||
}
|
||||
|
||||
function applyPagination(
|
||||
items: ThreadListItemWithStatus[],
|
||||
skip: number | null,
|
||||
take: number | null,
|
||||
): ThreadListItemWithStatus[] {
|
||||
const skipCount = skip ?? 0;
|
||||
const takeCount = take ?? items.length;
|
||||
return items.slice(skipCount, skipCount + takeCount);
|
||||
}
|
||||
|
||||
export async function cmdThreadList(
|
||||
storageRoot: string,
|
||||
statusFilter: ThreadStatus[] | null,
|
||||
afterMs: number | null,
|
||||
beforeMs: number | null,
|
||||
skip: number | null,
|
||||
take: number | null,
|
||||
): Promise<ThreadListItemWithStatus[]> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const index = await loadThreadsIndex(storageRoot);
|
||||
|
||||
// Collect active threads
|
||||
let items = await collectActiveThreads(storageRoot, uwf, index);
|
||||
|
||||
// Collect completed threads (if relevant for status filter)
|
||||
const includeCompleted = statusFilter === null || statusFilter.includes("completed");
|
||||
if (includeCompleted) {
|
||||
const activeIds = new Set(items.map((i) => i.thread));
|
||||
const completedItems = await collectCompletedThreads(storageRoot, activeIds);
|
||||
items = items.concat(completedItems);
|
||||
}
|
||||
|
||||
return items;
|
||||
// Apply status filter
|
||||
if (statusFilter !== null) {
|
||||
items = items.filter((item) => statusFilter.includes(item.status));
|
||||
}
|
||||
|
||||
// Apply time range filters
|
||||
items = applyTimeFilters(items, afterMs, beforeMs);
|
||||
|
||||
// Sort by timestamp descending (newest first)
|
||||
items = sortByNewestFirst(items);
|
||||
|
||||
// Apply pagination
|
||||
return applyPagination(items, skip, take);
|
||||
}
|
||||
|
||||
function formatYaml(value: unknown): string {
|
||||
|
||||
@@ -22,7 +22,8 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@uncaged/json-cas": "^0.4.0",
|
||||
"@uncaged/workflow-agent-kit": "workspace:^"
|
||||
"@uncaged/workflow-agent-kit": "workspace:^",
|
||||
"@uncaged/workflow-util": "workspace:^"
|
||||
},
|
||||
"devDependencies": {
|
||||
"typescript": "^5.8.3"
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
import { describe, expect, it } from "bun:test";
|
||||
import { extractUlidTimestamp, generateUlid } from "../ulid.js";
|
||||
|
||||
describe("extractUlidTimestamp", () => {
|
||||
it("should extract correct timestamp from ULID", () => {
|
||||
const knownTimestamp = Date.UTC(2026, 4, 20, 0, 0, 0);
|
||||
const ulid = generateUlid(knownTimestamp);
|
||||
const extracted = extractUlidTimestamp(ulid);
|
||||
expect(extracted).toBe(knownTimestamp);
|
||||
});
|
||||
|
||||
it("should handle epoch timestamp (timestamp 0)", () => {
|
||||
const ulid = generateUlid(0);
|
||||
const extracted = extractUlidTimestamp(ulid);
|
||||
expect(extracted).toBe(0);
|
||||
});
|
||||
|
||||
it("should handle recent timestamps", () => {
|
||||
const recentTimestamp = Date.now();
|
||||
const ulid = generateUlid(recentTimestamp);
|
||||
const extracted = extractUlidTimestamp(ulid);
|
||||
expect(extracted).toBe(recentTimestamp);
|
||||
});
|
||||
|
||||
it("should handle max 48-bit timestamp", () => {
|
||||
const maxTimestamp = 2 ** 48 - 1;
|
||||
const ulid = generateUlid(maxTimestamp);
|
||||
const extracted = extractUlidTimestamp(ulid);
|
||||
expect(extracted).toBe(maxTimestamp);
|
||||
});
|
||||
|
||||
it("should return null for invalid ULID length", () => {
|
||||
expect(extractUlidTimestamp("")).toBe(null);
|
||||
expect(extractUlidTimestamp("TOOSHORT")).toBe(null);
|
||||
expect(extractUlidTimestamp("TOOLONGAAAAAAAAAAAAAAAAAA")).toBe(null);
|
||||
});
|
||||
|
||||
it("should return null for invalid Crockford Base32 characters", () => {
|
||||
expect(extractUlidTimestamp("INVALID!@#$%^&CHARACTERS")).toBe(null);
|
||||
});
|
||||
|
||||
it("should extract timestamps from multiple ULIDs correctly", () => {
|
||||
const timestamps = [
|
||||
Date.UTC(2020, 0, 1, 0, 0, 0),
|
||||
Date.UTC(2023, 5, 15, 12, 30, 45),
|
||||
Date.UTC(2026, 11, 31, 23, 59, 59),
|
||||
];
|
||||
|
||||
for (const ts of timestamps) {
|
||||
const ulid = generateUlid(ts);
|
||||
const extracted = extractUlidTimestamp(ulid);
|
||||
expect(extracted).toBe(ts);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -24,4 +24,4 @@ export { normalizeRefsField } from "./refs-field.js";
|
||||
export { err, ok } from "./result.js";
|
||||
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
|
||||
export type { LogFn, Result } from "./types.js";
|
||||
export { generateUlid } from "./ulid.js";
|
||||
export { extractUlidTimestamp, generateUlid } from "./ulid.js";
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { encodeCrockfordBase32Bits } from "./base32.js";
|
||||
import { decodeCrockfordBase32Bits, encodeCrockfordBase32Bits } from "./base32.js";
|
||||
|
||||
const ULID_TIME_BITS = 48;
|
||||
const ULID_RANDOM_BITS = 80;
|
||||
@@ -26,3 +26,19 @@ export function generateUlid(nowMs: number): string {
|
||||
const payload = (time << BigInt(ULID_RANDOM_BITS)) | rand;
|
||||
return encodeCrockfordBase32Bits(payload, ULID_TIME_BITS + ULID_RANDOM_BITS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the timestamp (in milliseconds) from a ULID string.
|
||||
* Returns null if the ULID is invalid.
|
||||
*/
|
||||
export function extractUlidTimestamp(ulid: string): number | null {
|
||||
if (ulid.length !== 26) {
|
||||
return null;
|
||||
}
|
||||
const timestampPart = ulid.slice(0, 10);
|
||||
const decoded = decodeCrockfordBase32Bits(timestampPart, ULID_TIME_BITS);
|
||||
if (!decoded.ok) {
|
||||
return null;
|
||||
}
|
||||
return Number(decoded.value);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user