diff --git a/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts new file mode 100644 index 0000000..aea5aec --- /dev/null +++ b/packages/cli-workflow/src/__tests__/thread-list-filters.test.ts @@ -0,0 +1,550 @@ +import { mkdir, mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { CasRef, ThreadId } from "@uncaged/workflow-protocol"; +import { extractUlidTimestamp, generateUlid } from "@uncaged/workflow-util"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { createMarker, deleteMarker } from "../background/index.js"; +import { cmdThreadList } from "../commands/thread.js"; +import { parseTimeInput } from "../commands/thread-time-parser.js"; +import type { UwfStore } from "../store.js"; +import { appendThreadHistory, createUwfStore, saveThreadsIndex } from "../store.js"; + +// ── helpers ─────────────────────────────────────────────────────────────────── + +async function makeUwfStore(storageRoot: string): Promise { + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + return createUwfStore(storageRoot); +} + +async function createTestWorkflow(uwf: UwfStore): Promise { + const workflowPayload = { + name: "test-workflow", + roles: { + role1: { + goal: "test goal", + outputSchema: { type: "object" as const, properties: {} }, + }, + }, + graph: { start: "role1" }, + conditions: {}, + }; + return await uwf.store.put(uwf.schemas.workflow, workflowPayload); +} + +async function createTestThread( + uwf: UwfStore, + storageRoot: string, + workflowHash: CasRef, + timestamp: number, +): Promise { + const threadId = generateUlid(timestamp) as ThreadId; + const startPayload = { + workflow: workflowHash, + prompt: "test prompt", + }; + const headHash = await uwf.store.put(uwf.schemas.startNode, startPayload); + const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); + index[threadId] = headHash; + await saveThreadsIndex(storageRoot, index); + return threadId; +} + +async function markThreadRunning(storageRoot: string, threadId: ThreadId, workflow: CasRef) { + await createMarker(storageRoot, { + thread: threadId, + workflow, + pid: process.pid, // Use current process PID so isPidAlive returns true + startedAt: Date.now(), + }); +} + +async function completeThread( + storageRoot: string, + threadId: ThreadId, + workflowHash: CasRef, + headHash: CasRef, +) { + const index = await import("../store.js").then((m) => m.loadThreadsIndex(storageRoot)); + delete index[threadId]; + await saveThreadsIndex(storageRoot, index); + await appendThreadHistory(storageRoot, { + thread: threadId, + workflow: workflowHash, + head: headHash, + completedAt: Date.now(), + }); +} + +// ── test setup ──────────────────────────────────────────────────────────────── + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "thread-list-filters-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +// ── status filter tests ─────────────────────────────────────────────────────── + +describe("cmdThreadList status filter", () => { + test("should return idle and running threads when status=active", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + await markThreadRunning(tmpDir, thread2, workflowHash); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const thread3Head = index[thread3]; + if (thread3Head === undefined) throw new Error("thread3 head not found"); + await completeThread(tmpDir, thread3, workflowHash, thread3Head); + + const result = await cmdThreadList(tmpDir, ["idle", "running"], null, null, null, null); + + expect(result).toHaveLength(2); + expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2].sort()); + + // Clean up marker after test + await deleteMarker(tmpDir, thread2); + }); + + test("should support comma-separated status values", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + await markThreadRunning(tmpDir, thread2, workflowHash); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const thread3Head = index[thread3]; + if (thread3Head === undefined) throw new Error("thread3 head not found"); + await completeThread(tmpDir, thread3, workflowHash, thread3Head); + + const result = await cmdThreadList(tmpDir, ["idle", "completed"], null, null, null, null); + + // Clean up marker + await deleteMarker(tmpDir, thread2); + + // thread2 is running (not idle), so should not be included + // Expected: thread1 (idle) and thread3 (completed) + expect(result).toHaveLength(2); + expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread3].sort()); + }); + + test("should support single status filter (backward compat)", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const thread3Head = index[thread3]; + if (thread3Head === undefined) throw new Error("thread3 head not found"); + await completeThread(tmpDir, thread3, workflowHash, thread3Head); + + const result = await cmdThreadList(tmpDir, ["completed"], null, null, null, null); + + expect(result).toHaveLength(1); + expect(result[0]?.thread).toBe(thread3); + expect(result[0]?.status).toBe("completed"); + }); + + test("should return all threads when no status filter provided", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + await markThreadRunning(tmpDir, thread2, workflowHash); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const thread3Head = index[thread3]; + if (thread3Head === undefined) throw new Error("thread3 head not found"); + await completeThread(tmpDir, thread3, workflowHash, thread3Head); + + const result = await cmdThreadList(tmpDir, null, null, null, null, null); + + expect(result).toHaveLength(3); + expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2, thread3].sort()); + }); +}); + +// ── time range filtering tests ──────────────────────────────────────────────── + +describe("cmdThreadList time filters", () => { + test("should filter threads created after given timestamp", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0); + const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0); + const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0); + + const _threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1); + const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2); + const threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3); + + // Use a timestamp slightly before ts2 to include threadB + const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0); + const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null); + + expect(result).toHaveLength(2); + expect(result.map((r) => r.thread).sort()).toEqual([threadB, threadC].sort()); + }); + + test("should filter threads created before given timestamp", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0); + const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0); + const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0); + + const threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1); + const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2); + const _threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3); + + const beforeMs = Date.UTC(2026, 4, 22, 0, 0, 0); + const result = await cmdThreadList(tmpDir, null, null, beforeMs, null, null); + + expect(result).toHaveLength(2); + expect(result.map((r) => r.thread).sort()).toEqual([threadA, threadB].sort()); + }); + + test("should support both after and before filters (time range)", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0); + const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0); + const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0); + + const _threadA = await createTestThread(uwf, tmpDir, workflowHash, ts1); + const threadB = await createTestThread(uwf, tmpDir, workflowHash, ts2); + const _threadC = await createTestThread(uwf, tmpDir, workflowHash, ts3); + + const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0); + const beforeMs = Date.UTC(2026, 4, 22, 0, 0, 0); + const result = await cmdThreadList(tmpDir, null, afterMs, beforeMs, null, null); + + expect(result).toHaveLength(1); + expect(result[0]?.thread).toBe(threadB); + }); +}); + +// ── pagination tests ────────────────────────────────────────────────────────── + +describe("cmdThreadList pagination", () => { + test("should limit results with --take", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const threads: ThreadId[] = []; + for (let i = 0; i < 10; i++) { + threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() - i * 1000)); + } + + const result = await cmdThreadList(tmpDir, null, null, null, null, 5); + + expect(result).toHaveLength(5); + }); + + test("should skip first N threads with --skip", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const threads: ThreadId[] = []; + // Create threads in chronological order, but they'll be sorted newest first + for (let i = 0; i < 10; i++) { + threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 100)); + // Small delay to ensure distinct timestamps + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + const result = await cmdThreadList(tmpDir, null, null, null, 3, null); + + expect(result).toHaveLength(7); + // The 3 newest threads should be skipped, so we should get the 7 oldest + }); + + test("should support skip + take for pagination", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const threads: ThreadId[] = []; + for (let i = 0; i < 10; i++) { + threads.push(await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 100)); + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + const result = await cmdThreadList(tmpDir, null, null, null, 5, 3); + + expect(result).toHaveLength(3); + // Should skip first 5 (newest), then take 3 + }); + + test("should handle take > available threads", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + const _thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const _thread3 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + const result = await cmdThreadList(tmpDir, null, null, null, null, 10); + + expect(result).toHaveLength(3); + }); + + test("should return empty array when skip >= thread count", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 3000); + await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + const result = await cmdThreadList(tmpDir, null, null, null, 5, null); + + expect(result).toHaveLength(0); + }); +}); + +// ── combined filters tests ──────────────────────────────────────────────────── + +describe("combined filters", () => { + test("should combine status and time range filters", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const ts1 = Date.UTC(2026, 4, 20, 0, 0, 0); + const ts2 = Date.UTC(2026, 4, 21, 0, 0, 0); + const ts3 = Date.UTC(2026, 4, 22, 0, 0, 0); + const ts4 = Date.UTC(2026, 4, 23, 0, 0, 0); + + const _thread1 = await createTestThread(uwf, tmpDir, workflowHash, ts1); + const thread2 = await createTestThread(uwf, tmpDir, workflowHash, ts2); + const thread3 = await createTestThread(uwf, tmpDir, workflowHash, ts3); + const thread4 = await createTestThread(uwf, tmpDir, workflowHash, ts4); + + await markThreadRunning(tmpDir, thread2, workflowHash); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const thread3Head = index[thread3]; + if (thread3Head === undefined) throw new Error("thread3 head not found"); + await completeThread(tmpDir, thread3, workflowHash, thread3Head); + + const afterMs = Date.UTC(2026, 4, 20, 12, 0, 0); + const result = await cmdThreadList(tmpDir, ["idle"], afterMs, null, null, null); + + expect(result).toHaveLength(1); + expect(result[0]?.thread).toBe(thread4); + expect(result[0]?.status).toBe("idle"); + + // Clean up marker + await deleteMarker(tmpDir, thread2); + }); + + test("should combine status filter and pagination", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const threads: ThreadId[] = []; + for (let i = 9; i >= 0; i--) { + const thread = await createTestThread(uwf, tmpDir, workflowHash, Date.now() + i * 1000); + threads.push(thread); + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const headHash = index[thread]; + if (headHash === undefined) throw new Error("head not found"); + await completeThread(tmpDir, thread, workflowHash, headHash); + } + + const result = await cmdThreadList(tmpDir, ["completed"], null, null, 3, 5); + + expect(result).toHaveLength(5); + for (const r of result) { + expect(r.status).toBe("completed"); + } + }); + + test("should combine time range and pagination", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const threads: ThreadId[] = []; + for (let i = 0; i < 20; i++) { + const ts = Date.UTC(2026, 4, 1 + i, 0, 0, 0); + threads.push(await createTestThread(uwf, tmpDir, workflowHash, ts)); + } + + const afterMs = Date.UTC(2026, 4, 10, 0, 0, 0); + const result = await cmdThreadList(tmpDir, null, afterMs, null, 2, 5); + + expect(result).toHaveLength(5); + for (const r of result) { + const ts = extractUlidTimestamp(r.thread); + expect(ts).not.toBeNull(); + if (ts !== null) { + expect(ts).toBeGreaterThan(afterMs); + } + } + }); + + async function setupMixedStatusThreads( + uwf: UwfStore, + workflowHash: string, + count: number, + ): Promise { + const threads: ThreadId[] = []; + for (let i = 0; i < count; i++) { + const ts = Date.UTC(2026, 4, 10 + i, 0, 0, 0); + const thread = await createTestThread(uwf, tmpDir, workflowHash, ts); + threads.push(thread); + + if (i % 2 === 0) { + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + const headHash = index[thread]; + if (headHash === undefined) throw new Error("head not found"); + await completeThread(tmpDir, thread, workflowHash, headHash); + } else { + await markThreadRunning(tmpDir, thread, workflowHash); + } + } + return threads; + } + + async function cleanupRunningMarkers(threads: ThreadId[]): Promise { + for (let i = 0; i < threads.length; i++) { + if (i % 2 !== 0) { + await deleteMarker(tmpDir, threads[i] as ThreadId); + } + } + } + + test("should combine all filters (status + time + pagination)", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + const threads = await setupMixedStatusThreads(uwf, workflowHash, 15); + + const afterMs = Date.UTC(2026, 4, 14, 12, 0, 0); + const beforeMs = Date.UTC(2026, 4, 20, 0, 0, 0); + const result = await cmdThreadList(tmpDir, ["idle", "running"], afterMs, beforeMs, 1, 3); + + expect(result.length).toBeLessThanOrEqual(3); + for (const r of result) { + expect(["idle", "running"]).toContain(r.status); + const ts = extractUlidTimestamp(r.thread); + if (ts !== null) { + expect(ts).toBeGreaterThan(afterMs); + expect(ts).toBeLessThan(beforeMs); + } + } + + await cleanupRunningMarkers(threads); + }); +}); + +// ── edge cases tests ────────────────────────────────────────────────────────── + +describe("edge cases", () => { + test("should handle empty thread list", async () => { + await makeUwfStore(tmpDir); + const result = await cmdThreadList(tmpDir, null, null, null, null, null); + expect(result).toHaveLength(0); + }); + + test("should skip threads with invalid ULID when time filtering", async () => { + const uwf = await makeUwfStore(tmpDir); + const workflowHash = await createTestWorkflow(uwf); + + const thread1 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 2000); + const thread2 = await createTestThread(uwf, tmpDir, workflowHash, Date.now() - 1000); + + const index = await import("../store.js").then((m) => m.loadThreadsIndex(tmpDir)); + index["INVALID_ULID_FORMAT_HERE" as ThreadId] = "01J6HMVRNQKJV2"; + await saveThreadsIndex(tmpDir, index); + + const afterMs = Date.now() - 3000; + const result = await cmdThreadList(tmpDir, null, afterMs, null, null, null); + + expect(result).toHaveLength(2); + expect(result.map((r) => r.thread).sort()).toEqual([thread1, thread2].sort()); + }); +}); + +// ── time parsing tests ──────────────────────────────────────────────────────── + +describe("relative time parsing", () => { + test("should parse '7d' as 7 days ago", () => { + const nowMs = Date.UTC(2026, 4, 24, 12, 0, 0); + const result = parseTimeInput("7d", nowMs); + const expected = Date.UTC(2026, 4, 17, 12, 0, 0); + expect(result).toBe(expected); + }); + + test("should parse '24h' as 24 hours ago", () => { + const nowMs = Date.UTC(2026, 4, 24, 12, 0, 0); + const result = parseTimeInput("24h", nowMs); + const expected = Date.UTC(2026, 4, 23, 12, 0, 0); + expect(result).toBe(expected); + }); + + test("should parse '30m' as 30 minutes ago", () => { + const nowMs = Date.UTC(2026, 4, 24, 12, 30, 0); + const result = parseTimeInput("30m", nowMs); + const expected = Date.UTC(2026, 4, 24, 12, 0, 0); + expect(result).toBe(expected); + }); + + test("should parse '1d' as 1 day ago", () => { + const nowMs = Date.UTC(2026, 4, 24, 0, 0, 0); + const result = parseTimeInput("1d", nowMs); + const expected = Date.UTC(2026, 4, 23, 0, 0, 0); + expect(result).toBe(expected); + }); +}); + +describe("ISO date parsing", () => { + test("should parse ISO date (YYYY-MM-DD)", () => { + const nowMs = Date.now(); + const result = parseTimeInput("2026-05-20", nowMs); + const expected = Date.UTC(2026, 4, 20, 0, 0, 0); + expect(result).toBe(expected); + }); + + test("should parse ISO datetime (YYYY-MM-DDTHH:MM:SS)", () => { + const nowMs = Date.now(); + const result = parseTimeInput("2026-05-20T14:30:00", nowMs); + const expected = Date.parse("2026-05-20T14:30:00"); + expect(result).toBe(expected); + }); + + test("should parse ISO datetime with Z suffix", () => { + const nowMs = Date.now(); + const result = parseTimeInput("2026-05-20T14:30:00Z", nowMs); + const expected = Date.UTC(2026, 4, 20, 14, 30, 0); + expect(result).toBe(expected); + }); + + test("should reject invalid date formats", () => { + const nowMs = Date.now(); + expect(() => parseTimeInput("not-a-date", nowMs)).toThrow(); + expect(() => parseTimeInput("2026-13-01", nowMs)).toThrow(); + expect(() => parseTimeInput("invalid", nowMs)).toThrow(); + }); +}); diff --git a/packages/cli-workflow/src/cli.ts b/packages/cli-workflow/src/cli.ts index c7d7fa4..1ed04d6 100755 --- a/packages/cli-workflow/src/cli.ts +++ b/packages/cli-workflow/src/cli.ts @@ -28,6 +28,7 @@ import { THREAD_READ_DEFAULT_QUOTA, type ThreadStatus, } from "./commands/thread.js"; +import { parseTimeInput } from "./commands/thread-time-parser.js"; import { cmdWorkflowAdd, cmdWorkflowList, cmdWorkflowShow } from "./commands/workflow.js"; import { formatOutput, type OutputFormat } from "./format.js"; import { resolveStorageRoot } from "./store.js"; @@ -168,30 +169,103 @@ thread }); }); +// Helper functions for thread list command parsing +function parseStatusFilter(status: string | undefined): ThreadStatus[] | null { + if (status === undefined) return null; + const raw = status.trim(); + if (raw === "active") return ["idle", "running"]; + + const parts = raw.split(",").map((s) => s.trim()); + const validStatuses: ThreadStatus[] = ["idle", "running", "completed"]; + for (const part of parts) { + if (!validStatuses.includes(part as ThreadStatus)) { + process.stderr.write( + `Invalid status: ${part}. Must be one of: idle, running, completed, active\n`, + ); + process.exit(1); + } + } + return parts as ThreadStatus[]; +} + +function parseTimeFilters( + after: string | undefined, + before: string | undefined, + nowMs: number, +): { afterMs: number | null; beforeMs: number | null } { + try { + const afterMs = after !== undefined ? parseTimeInput(after, nowMs) : null; + const beforeMs = before !== undefined ? parseTimeInput(before, nowMs) : null; + return { afterMs, beforeMs }; + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + process.stderr.write(`${message}\n`); + process.exit(1); + } +} + +function parsePaginationOptions( + skip: string | undefined, + take: string | undefined, +): { skip: number | null; take: number | null } { + let skipVal: number | null = null; + let takeVal: number | null = null; + + if (skip !== undefined) { + skipVal = Number.parseInt(skip, 10); + if (!Number.isInteger(skipVal) || skipVal < 0) { + process.stderr.write("--skip must be a non-negative integer\n"); + process.exit(1); + } + } + if (take !== undefined) { + takeVal = Number.parseInt(take, 10); + if (!Number.isInteger(takeVal) || takeVal < 1) { + process.stderr.write("--take must be a positive integer\n"); + process.exit(1); + } + } + return { skip: skipVal, take: takeVal }; +} + thread .command("list") .description("List threads") - .option("--status ", "Filter by status: idle, running, or completed") - .action((opts: { status: string | undefined }) => { - const storageRoot = resolveStorageRoot(); - runAction(async () => { - const validStatuses: ThreadStatus[] = ["idle", "running", "completed"]; - let statusFilter: ThreadStatus | null = null; + .option( + "--status ", + "Filter by status: idle, running, completed, active (idle+running), or comma-separated values", + ) + .option("--after ", "Filter threads created after this date (ISO or relative like '7d')") + .option("--before ", "Filter threads created before this date (ISO or relative like '7d')") + .option("--skip ", "Skip first n threads") + .option("--take ", "Return at most n threads") + .action( + (opts: { + status: string | undefined; + after: string | undefined; + before: string | undefined; + skip: string | undefined; + take: string | undefined; + }) => { + const storageRoot = resolveStorageRoot(); + runAction(async () => { + const statusFilter = parseStatusFilter(opts.status); + const nowMs = Date.now(); + const { afterMs, beforeMs } = parseTimeFilters(opts.after, opts.before, nowMs); + const { skip, take } = parsePaginationOptions(opts.skip, opts.take); - if (opts.status !== undefined) { - if (!validStatuses.includes(opts.status as ThreadStatus)) { - process.stderr.write( - `Invalid status: ${opts.status}. Must be one of: idle, running, completed\n`, - ); - process.exit(1); - } - statusFilter = opts.status as ThreadStatus; - } - - const result = await cmdThreadList(storageRoot, statusFilter); - writeOutput(result); - }); - }); + const result = await cmdThreadList( + storageRoot, + statusFilter, + afterMs, + beforeMs, + skip, + take, + ); + writeOutput(result); + }); + }, + ); thread .command("stop") diff --git a/packages/cli-workflow/src/commands/thread-time-parser.ts b/packages/cli-workflow/src/commands/thread-time-parser.ts new file mode 100644 index 0000000..822257d --- /dev/null +++ b/packages/cli-workflow/src/commands/thread-time-parser.ts @@ -0,0 +1,23 @@ +/** + * Parse time input: ISO date (YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS) or relative (7d, 24h, 30m) + * Returns Unix timestamp in milliseconds. + */ +export function parseTimeInput(input: string, nowMs: number): number { + const trimmed = input.trim(); + + // Relative time: 7d, 24h, 30m + const relativeMatch = /^(\d+)(d|h|m)$/.exec(trimmed); + if (relativeMatch !== null) { + const value = Number.parseInt(relativeMatch[1], 10); + const unit = relativeMatch[2]; + const multiplier = unit === "d" ? 86400000 : unit === "h" ? 3600000 : 60000; + return nowMs - value * multiplier; + } + + // ISO date: try parsing + const parsed = Date.parse(trimmed); + if (Number.isNaN(parsed)) { + throw new Error(`invalid time format: ${trimmed} (expected ISO date or relative like '7d')`); + } + return parsed; +} diff --git a/packages/cli-workflow/src/commands/thread.ts b/packages/cli-workflow/src/commands/thread.ts index b3fe01a..5ba4d8b 100644 --- a/packages/cli-workflow/src/commands/thread.ts +++ b/packages/cli-workflow/src/commands/thread.ts @@ -16,10 +16,16 @@ import type { StepOutput, ThreadId, ThreadListItem, + ThreadsIndex, WorkflowConfig, WorkflowPayload, } from "@uncaged/workflow-protocol"; -import { createProcessLogger, generateUlid, type ProcessLogger } from "@uncaged/workflow-util"; +import { + createProcessLogger, + extractUlidTimestamp, + generateUlid, + type ProcessLogger, +} from "@uncaged/workflow-util"; import { config as loadDotenv } from "dotenv"; import { parse, stringify } from "yaml"; import { createMarker, deleteMarker, isThreadRunning } from "../background/index.js"; @@ -344,44 +350,115 @@ async function threadListItemFromActive( return { thread: threadId, workflow, head, status }; } -export async function cmdThreadList( +async function collectActiveThreads( storageRoot: string, - statusFilter: ThreadStatus | null, + uwf: UwfStore, + index: ThreadsIndex, ): Promise { - const uwf = await createUwfStore(storageRoot); - const index = await loadThreadsIndex(storageRoot); const items: ThreadListItemWithStatus[] = []; - - // Add active threads for (const [threadId, head] of Object.entries(index)) { - const item = await threadListItemFromActive(storageRoot, uwf, threadId as ThreadId, head); + const item = await threadListItemFromActive( + storageRoot, + uwf, + threadId as ThreadId, + head as CasRef, + ); if (item !== null) { items.push(item); } } + return items; +} - // Add completed threads if requested - if (statusFilter === "completed" || statusFilter === null) { - const activeIds = new Set(items.map((i) => i.thread)); - const history = await loadThreadHistory(storageRoot); - for (const entry of history) { - if (!activeIds.has(entry.thread)) { - items.push({ - thread: entry.thread, - workflow: entry.workflow, - head: entry.head, - status: "completed", - }); - } +async function collectCompletedThreads( + storageRoot: string, + activeIds: Set, +): Promise { + const items: ThreadListItemWithStatus[] = []; + const history = await loadThreadHistory(storageRoot); + const seen = new Set(); // Deduplication (issue #470) + for (const entry of history) { + if (!activeIds.has(entry.thread) && !seen.has(entry.thread)) { + seen.add(entry.thread); + items.push({ + thread: entry.thread, + workflow: entry.workflow, + head: entry.head, + status: "completed", + }); } } + return items; +} - // Apply status filter if provided - if (statusFilter !== null) { - return items.filter((item) => item.status === statusFilter); +function applyTimeFilters( + items: ThreadListItemWithStatus[], + afterMs: number | null, + beforeMs: number | null, +): ThreadListItemWithStatus[] { + if (afterMs === null && beforeMs === null) return items; + return items.filter((item) => { + const ts = extractUlidTimestamp(item.thread); + if (ts === null) return false; + if (afterMs !== null && ts <= afterMs) return false; + if (beforeMs !== null && ts >= beforeMs) return false; + return true; + }); +} + +function sortByNewestFirst(items: ThreadListItemWithStatus[]): ThreadListItemWithStatus[] { + return items.sort((a, b) => { + const tsA = extractUlidTimestamp(a.thread) ?? 0; + const tsB = extractUlidTimestamp(b.thread) ?? 0; + return tsB - tsA; + }); +} + +function applyPagination( + items: ThreadListItemWithStatus[], + skip: number | null, + take: number | null, +): ThreadListItemWithStatus[] { + const skipCount = skip ?? 0; + const takeCount = take ?? items.length; + return items.slice(skipCount, skipCount + takeCount); +} + +export async function cmdThreadList( + storageRoot: string, + statusFilter: ThreadStatus[] | null, + afterMs: number | null, + beforeMs: number | null, + skip: number | null, + take: number | null, +): Promise { + const uwf = await createUwfStore(storageRoot); + const index = await loadThreadsIndex(storageRoot); + + // Collect active threads + let items = await collectActiveThreads(storageRoot, uwf, index); + + // Collect completed threads (if relevant for status filter) + const includeCompleted = statusFilter === null || statusFilter.includes("completed"); + if (includeCompleted) { + const activeIds = new Set(items.map((i) => i.thread)); + const completedItems = await collectCompletedThreads(storageRoot, activeIds); + items = items.concat(completedItems); } - return items; + // Apply status filter + if (statusFilter !== null) { + items = items.filter((item) => statusFilter.includes(item.status)); + } + + // Apply time range filters + items = applyTimeFilters(items, afterMs, beforeMs); + + // Sort by timestamp descending (newest first) + items = sortByNewestFirst(items); + + // Apply pagination + return applyPagination(items, skip, take); } function formatYaml(value: unknown): string { diff --git a/packages/workflow-agent-claude-code/package.json b/packages/workflow-agent-claude-code/package.json index 5d8d0c2..4a54820 100644 --- a/packages/workflow-agent-claude-code/package.json +++ b/packages/workflow-agent-claude-code/package.json @@ -22,7 +22,8 @@ }, "dependencies": { "@uncaged/json-cas": "^0.4.0", - "@uncaged/workflow-agent-kit": "workspace:^" + "@uncaged/workflow-agent-kit": "workspace:^", + "@uncaged/workflow-util": "workspace:^" }, "devDependencies": { "typescript": "^5.8.3" diff --git a/packages/workflow-util/src/__tests__/ulid-timestamp.test.ts b/packages/workflow-util/src/__tests__/ulid-timestamp.test.ts new file mode 100644 index 0000000..7e25cf3 --- /dev/null +++ b/packages/workflow-util/src/__tests__/ulid-timestamp.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, it } from "bun:test"; +import { extractUlidTimestamp, generateUlid } from "../ulid.js"; + +describe("extractUlidTimestamp", () => { + it("should extract correct timestamp from ULID", () => { + const knownTimestamp = Date.UTC(2026, 4, 20, 0, 0, 0); + const ulid = generateUlid(knownTimestamp); + const extracted = extractUlidTimestamp(ulid); + expect(extracted).toBe(knownTimestamp); + }); + + it("should handle epoch timestamp (timestamp 0)", () => { + const ulid = generateUlid(0); + const extracted = extractUlidTimestamp(ulid); + expect(extracted).toBe(0); + }); + + it("should handle recent timestamps", () => { + const recentTimestamp = Date.now(); + const ulid = generateUlid(recentTimestamp); + const extracted = extractUlidTimestamp(ulid); + expect(extracted).toBe(recentTimestamp); + }); + + it("should handle max 48-bit timestamp", () => { + const maxTimestamp = 2 ** 48 - 1; + const ulid = generateUlid(maxTimestamp); + const extracted = extractUlidTimestamp(ulid); + expect(extracted).toBe(maxTimestamp); + }); + + it("should return null for invalid ULID length", () => { + expect(extractUlidTimestamp("")).toBe(null); + expect(extractUlidTimestamp("TOOSHORT")).toBe(null); + expect(extractUlidTimestamp("TOOLONGAAAAAAAAAAAAAAAAAA")).toBe(null); + }); + + it("should return null for invalid Crockford Base32 characters", () => { + expect(extractUlidTimestamp("INVALID!@#$%^&CHARACTERS")).toBe(null); + }); + + it("should extract timestamps from multiple ULIDs correctly", () => { + const timestamps = [ + Date.UTC(2020, 0, 1, 0, 0, 0), + Date.UTC(2023, 5, 15, 12, 30, 45), + Date.UTC(2026, 11, 31, 23, 59, 59), + ]; + + for (const ts of timestamps) { + const ulid = generateUlid(ts); + const extracted = extractUlidTimestamp(ulid); + expect(extracted).toBe(ts); + } + }); +}); diff --git a/packages/workflow-util/src/index.ts b/packages/workflow-util/src/index.ts index dad2e81..a08f3ad 100644 --- a/packages/workflow-util/src/index.ts +++ b/packages/workflow-util/src/index.ts @@ -24,4 +24,4 @@ export { normalizeRefsField } from "./refs-field.js"; export { err, ok } from "./result.js"; export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js"; export type { LogFn, Result } from "./types.js"; -export { generateUlid } from "./ulid.js"; +export { extractUlidTimestamp, generateUlid } from "./ulid.js"; diff --git a/packages/workflow-util/src/ulid.ts b/packages/workflow-util/src/ulid.ts index 60938cf..e204f7e 100644 --- a/packages/workflow-util/src/ulid.ts +++ b/packages/workflow-util/src/ulid.ts @@ -1,4 +1,4 @@ -import { encodeCrockfordBase32Bits } from "./base32.js"; +import { decodeCrockfordBase32Bits, encodeCrockfordBase32Bits } from "./base32.js"; const ULID_TIME_BITS = 48; const ULID_RANDOM_BITS = 80; @@ -26,3 +26,19 @@ export function generateUlid(nowMs: number): string { const payload = (time << BigInt(ULID_RANDOM_BITS)) | rand; return encodeCrockfordBase32Bits(payload, ULID_TIME_BITS + ULID_RANDOM_BITS); } + +/** + * Extract the timestamp (in milliseconds) from a ULID string. + * Returns null if the ULID is invalid. + */ +export function extractUlidTimestamp(ulid: string): number | null { + if (ulid.length !== 26) { + return null; + } + const timestampPart = ulid.slice(0, 10); + const decoded = decodeCrockfordBase32Bits(timestampPart, ULID_TIME_BITS); + if (!decoded.ok) { + return null; + } + return Number(decoded.value); +}