refactor(cli): split workflow/thread into two top-level command groups #146

Merged
xiaomo merged 2 commits from refactor/split-workflow-thread into main 2026-04-27 01:46:33 +00:00
5 changed files with 355 additions and 288 deletions
@@ -1,5 +1,5 @@
/**
* Smoke / integration tests for `nerve workflow` citty handlers with a real HOME
* Smoke / integration tests for `nerve workflow` and `nerve thread` citty handlers with a real HOME
* layout and logs.db. `loadDaemonModule` is mocked so tests use workspace
* `@uncaged/nerve-store` directly (no ~/.uncaged-nerve daemon install required).
*/
@@ -20,9 +20,9 @@ vi.mock("../workspace-daemon.js", async () => {
import { createLogStore } from "@uncaged/nerve-store";
import { workflowCommand } from "../commands/workflow.js";
import { threadCommand } from "../commands/thread.js";
describe("nerve workflow CLI (runCommand + temp HOME)", () => {
describe("nerve thread CLI (runCommand + temp HOME)", () => {
let prevHome: string | undefined;
let fakeHome: string;
let stdoutSpy: ReturnType<typeof vi.spyOn> | null;
@@ -70,21 +70,19 @@ describe("nerve workflow CLI (runCommand + temp HOME)", () => {
rmSync(fakeHome, { recursive: true, force: true });
});
it("workflow runs --all completes without throwing", async () => {
it("thread list --all completes without throwing", async () => {
await expect(runCommand(threadCommand, { rawArgs: ["list", "--all"] })).resolves.toBeDefined();
});
it("thread inspect <runId> completes without throwing", async () => {
await expect(
runCommand(workflowCommand, { rawArgs: ["runs", "--all"] }),
runCommand(threadCommand, { rawArgs: ["inspect", "e2e-run", "--limit", "10"] }),
).resolves.toBeDefined();
});
it("workflow inspect <runId> completes without throwing", async () => {
it("thread show <runId> completes without throwing (role rounds path)", async () => {
await expect(
runCommand(workflowCommand, { rawArgs: ["inspect", "e2e-run", "--limit", "10"] }),
).resolves.toBeDefined();
});
it("workflow thread <runId> completes without throwing (role rounds path)", async () => {
await expect(
runCommand(workflowCommand, { rawArgs: ["thread", "e2e-run", "--budget", "50000"] }),
runCommand(threadCommand, { rawArgs: ["show", "e2e-run", "--budget", "50000"] }),
).resolves.toBeDefined();
});
});
+1 -1
View File
@@ -259,7 +259,7 @@ describe("buildListOutput", () => {
// header + 2 run lines
expect(lines).toHaveLength(3);
expect(paginationHint).not.toBeNull();
expect(paginationHint).toContain("nerve workflow runs");
expect(paginationHint).toContain("nerve thread list");
expect(paginationHint).toContain("--offset 2");
expect(paginationHint).toContain("3 more");
});
+2 -8
View File
@@ -4,12 +4,9 @@ import { consumeGlobalDaemonCliFlags } from "./cli-global.js";
import { daemonCommand } from "./commands/daemon.js";
import { devCommand } from "./commands/dev.js";
import { initCommand } from "./commands/init.js";
import { logsCommand } from "./commands/logs.js";
import { senseCommand } from "./commands/sense.js";
import { daemonStartCommand } from "./commands/start.js";
import { statusCommand } from "./commands/status.js";
import { stopCommand } from "./commands/stop.js";
import { storeCommand } from "./commands/store.js";
import { threadCommand } from "./commands/thread.js";
import { validateCommand } from "./commands/validate.js";
import { workflowCommand } from "./commands/workflow.js";
@@ -43,13 +40,10 @@ const main = defineCommand({
init: initCommand,
daemon: daemonCommand,
dev: devCommand,
start: daemonStartCommand,
stop: stopCommand,
status: statusCommand,
logs: logsCommand,
validate: validateCommand,
sense: senseCommand,
store: storeCommand,
thread: threadCommand,
workflow: workflowCommand,
},
});
+277
View File
@@ -0,0 +1,277 @@
import { defineCommand } from "citty";
import { isRemoteDaemonCli } from "../cli-global.js";
import { resolveDaemonTransport } from "../daemon-client.js";
import { isRunning } from "../workspace.js";
import {
DEFAULT_PAGE_SIZE,
DEFAULT_THREAD_BUDGET_CHARS,
THREAD_ROUNDS_FETCH_LIMIT,
buildInspectOutput,
buildListOutput,
buildThreadCommandOutput,
getAllWorkflowRuns,
openStore,
parseIntArg,
} from "./workflow.js";
// ---------------------------------------------------------------------------
// nerve thread list
// ---------------------------------------------------------------------------
const threadListCommand = defineCommand({
meta: {
name: "list",
description: "List active (queued/started) workflow runs from logs",
},
args: {
all: {
type: "boolean",
description: "Include completed/failed/crashed runs",
default: false,
},
workflow: {
type: "string",
description: "Filter by workflow name",
default: "",
},
limit: {
type: "string",
description: `Max runs to show (default: ${DEFAULT_PAGE_SIZE})`,
default: String(DEFAULT_PAGE_SIZE),
},
offset: {
type: "string",
description: "Skip first N runs (for pagination)",
default: "0",
},
},
async run({ args }) {
const store = await openStore();
try {
const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE));
const offset = Math.max(0, parseIntArg(args.offset, 0));
const filterWorkflow = args.workflow.length > 0 ? args.workflow : null;
const runs = args.all
? getAllWorkflowRuns(store, filterWorkflow)
: store.getActiveWorkflowRuns(filterWorkflow ?? undefined);
const { lines, paginationHint } = buildListOutput(
runs,
offset,
limit,
args.all,
filterWorkflow,
);
for (const line of lines) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve thread show <runId>
// ---------------------------------------------------------------------------
const threadShowCommand = defineCommand({
meta: {
name: "show",
description: "Print role rounds for a workflow run (agent-oriented, budget-limited)",
},
args: {
runId: {
type: "positional",
description: "The run ID to dump role rounds for",
},
before: {
type: "string",
description:
"Exclusive upper bound on 1-based round index (use with hint from prior output to load older rounds)",
default: "0",
},
budget: {
type: "string",
description: `Max output characters including header (default: ${String(DEFAULT_THREAD_BUDGET_CHARS)})`,
default: String(DEFAULT_THREAD_BUDGET_CHARS),
},
},
async run({ args }) {
const store = await openStore();
try {
const before = Math.max(0, parseIntArg(args.before, 0));
const budgetChars = Math.max(1, parseIntArg(args.budget, DEFAULT_THREAD_BUDGET_CHARS));
const run = store.getWorkflowRun(args.runId);
if (run === null) {
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
process.exit(1);
}
const totalRoleRounds = store.getThreadRoundCount(args.runId);
if (totalRoleRounds === 0) {
process.stdout.write(
`🧵 Workflow thread: ${run.runId}\n workflow: ${run.workflow}\n status: ${run.status}\n\n📭 No role rounds recorded for this run.\n`,
);
return;
}
const descRows = store.getThreadRounds(args.runId, {
before,
limit: THREAD_ROUNDS_FETCH_LIMIT,
});
const prefixLines = [
"🧵 Role rounds (workflow thread)\n",
` runId: ${run.runId}\n`,
` workflow: ${run.workflow}\n`,
` status: ${run.status}\n`,
` rounds: ${String(totalRoleRounds)} role event(s) total\n\n`,
];
const { lines, paginationHint } = buildThreadCommandOutput(
prefixLines,
descRows,
budgetChars,
args.runId,
);
for (const line of lines) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
if (descRows.length === 0 && before > 0) {
process.stdout.write(`\n📭 No rounds with index < ${String(before)}.\n`);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve thread inspect <runId>
// ---------------------------------------------------------------------------
const threadInspectCommand = defineCommand({
meta: {
name: "inspect",
description: "Show details and thread events for a workflow run",
},
args: {
runId: {
type: "positional",
description: "The run ID to inspect",
},
limit: {
type: "string",
description: `Max log entries to show (default: ${DEFAULT_PAGE_SIZE})`,
default: String(DEFAULT_PAGE_SIZE),
},
offset: {
type: "string",
description: "Skip first N log entries (for pagination)",
default: "0",
},
},
async run({ args }) {
const store = await openStore();
try {
const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE));
const offset = Math.max(0, parseIntArg(args.offset, 0));
const run = store.getWorkflowRun(args.runId);
if (run === null) {
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
process.exit(1);
}
const allLogs = store.query({ source: "workflow", refId: args.runId });
const { header, eventLines, paginationHint } = buildInspectOutput(
run,
allLogs,
offset,
limit,
);
for (const line of [...header, ...eventLines]) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve thread kill <runId>
// ---------------------------------------------------------------------------
const threadKillCommand = defineCommand({
meta: {
name: "kill",
description: "Kill a running or queued workflow thread by runId",
},
args: {
runId: {
type: "positional",
description: "The run ID to kill",
},
},
async run({ args }) {
if (!isRemoteDaemonCli() && !isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve daemon start`.\n");
process.exit(1);
}
const transport = resolveDaemonTransport();
let response: { ok: true } | { ok: false; error: string };
try {
response = await transport.killWorkflow(args.runId);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
process.exit(1);
}
if (!response.ok) {
process.stderr.write(`❌ Kill failed: ${response.error}\n`);
process.exit(1);
}
process.stdout.write(`✅ Kill signal sent for run "${args.runId}".\n`);
},
});
// ---------------------------------------------------------------------------
// nerve thread (parent command)
// ---------------------------------------------------------------------------
export const threadCommand = defineCommand({
meta: {
name: "thread",
description: "Inspect and manage workflow threads (runs)",
},
subCommands: {
list: threadListCommand,
show: threadShowCommand,
inspect: threadInspectCommand,
kill: threadKillCommand,
},
});
+64 -266
View File
@@ -1,7 +1,7 @@
import { existsSync } from "node:fs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core";
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord, parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import { stringify } from "yaml";
@@ -14,7 +14,7 @@ import { getNerveRoot, isRunning } from "../workspace.js";
export const DEFAULT_PAGE_SIZE = 20;
/** Default max characters for `nerve workflow thread` output (including run header). */
/** Default max characters for `nerve thread show` output (including run header). */
export const DEFAULT_THREAD_BUDGET_CHARS = 8000;
/** Max role-round rows read from SQLite per invocation (DESC by round). */
@@ -50,7 +50,7 @@ export function formatTs(timestampMs: number | null | undefined): string {
}
}
async function openStore(): Promise<LogStore> {
export async function openStore(): Promise<LogStore> {
const nerveRoot = getNerveRoot();
const dbPath = getDbPath();
if (!existsSync(dbPath)) {
@@ -143,7 +143,7 @@ export function buildListOutput(
const allFlagStr = allFlag ? " --all" : "";
paginationHint =
`\n⏩ ${remaining} more run(s) not shown. Fetch next page:\n` +
` nerve workflow runs --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
` nerve thread list --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
}
return { lines, paginationHint };
@@ -196,14 +196,14 @@ export function buildInspectOutput(
if (remaining > 0) {
paginationHint =
`\n⏩ ${remaining} more event(s) not shown. Fetch next page:\n` +
` nerve workflow inspect ${run.runId} --offset ${offset + limit}\n`;
` nerve thread inspect ${run.runId} --offset ${offset + limit}\n`;
}
return { header, eventLines, paginationHint };
}
// ---------------------------------------------------------------------------
// nerve workflow thread <runId> — agent-oriented role rounds
// nerve thread show <runId> — agent-oriented role rounds
// ---------------------------------------------------------------------------
export type PartitionedMessage = {
@@ -270,13 +270,13 @@ function buildTruncatedSingleRound(
lines: [...prefixLines, single],
paginationHint:
hintRound > 1
? `\n⏩ Older rounds exist. Fetch with:\n nerve workflow thread ${runId} --before ${String(hintRound)}${budgetFlag}\n`
? `\n⏩ Older rounds exist. Fetch with:\n nerve thread show ${runId} --before ${String(hintRound)}${budgetFlag}\n`
: null,
};
}
/**
* Build stdout lines for `nerve workflow thread`: newest-first selection from
* Build stdout lines for `nerve thread show`: newest-first selection from
* `descRows` until `budgetChars` (including `prefixLines`), then chronological order.
*/
export function buildThreadCommandOutput(
@@ -309,24 +309,69 @@ export function buildThreadCommandOutput(
const shownMinRound = picked.length === 0 ? null : Math.min(...picked.map((r) => r.round));
let paginationHint: string | null = null;
if (shownMinRound !== null && shownMinRound > 1) {
paginationHint = `\n⏩ Older rounds not shown. Fetch with:\n nerve workflow thread ${runId} --before ${String(shownMinRound)}${budgetFlag}\n`;
paginationHint = `\n⏩ Older rounds not shown. Fetch with:\n nerve thread show ${runId} --before ${String(shownMinRound)}${budgetFlag}\n`;
}
return { lines: [...prefixLines, ...blocksAsc], paginationHint };
}
// ---------------------------------------------------------------------------
// nerve workflow list (daemon — registered workflows + queue depth)
// nerve workflow list (reads workflow definitions from workspace YAML)
// ---------------------------------------------------------------------------
const workflowDaemonListCommand = defineCommand({
const workflowListCommand = defineCommand({
meta: {
name: "list",
description: "List workflows from the running daemon (concurrency, active, queued)",
description: "List workflow definitions from nerve.yaml",
},
async run() {
const configPath = join(getNerveRoot(), "nerve.yaml");
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch {
process.stderr.write(`❌ Could not read ${configPath}\n`);
process.exit(1);
}
const result = parseNerveConfig(raw);
if (!result.ok) {
process.stderr.write(`❌ Config validation failed: ${result.error.message}\n`);
process.exit(1);
}
const config = result.value;
const workflowEntries = Object.entries(config.workflows);
if (workflowEntries.length === 0) {
process.stdout.write("📭 No workflows defined in nerve.yaml.\n");
return;
}
const rows = workflowEntries.map(([name, wf]) => ({
name,
concurrency: wf.concurrency,
overflow: wf.overflow,
...(wf.overflow === "queue" ? { maxQueue: wf.maxQueue } : {}),
}));
process.stdout.write(`📋 Workflow definitions (${String(rows.length)}):\n\n`);
process.stdout.write(formatRowsAsAlignedTable(rows));
},
});
// ---------------------------------------------------------------------------
// nerve workflow status (daemon — registered workflows + queue depth)
// ---------------------------------------------------------------------------
const workflowStatusCommand = defineCommand({
meta: {
name: "status",
description: "Show live workflow status from the running daemon (concurrency, active, queued)",
},
async run() {
if (!isRemoteDaemonCli() && !isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve daemon start`.\n");
process.exit(1);
}
@@ -359,210 +404,6 @@ const workflowDaemonListCommand = defineCommand({
},
});
// ---------------------------------------------------------------------------
// nerve workflow runs
// ---------------------------------------------------------------------------
const workflowRunsCommand = defineCommand({
meta: {
name: "runs",
description: "List active (queued/started) workflow runs from logs",
},
args: {
all: {
type: "boolean",
description: "Include completed/failed/crashed runs",
default: false,
},
workflow: {
type: "string",
description: "Filter by workflow name",
default: "",
},
limit: {
type: "string",
description: `Max runs to show (default: ${DEFAULT_PAGE_SIZE})`,
default: String(DEFAULT_PAGE_SIZE),
},
offset: {
type: "string",
description: "Skip first N runs (for pagination)",
default: "0",
},
},
async run({ args }) {
const store = await openStore();
try {
const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE));
const offset = Math.max(0, parseIntArg(args.offset, 0));
const filterWorkflow = args.workflow.length > 0 ? args.workflow : null;
const runs = args.all
? getAllWorkflowRuns(store, filterWorkflow)
: store.getActiveWorkflowRuns(filterWorkflow ?? undefined);
const { lines, paginationHint } = buildListOutput(
runs,
offset,
limit,
args.all,
filterWorkflow,
);
for (const line of lines) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve workflow inspect <runId>
// ---------------------------------------------------------------------------
const workflowInspectCommand = defineCommand({
meta: {
name: "inspect",
description: "Show details and thread events for a workflow run",
},
args: {
runId: {
type: "positional",
description: "The run ID to inspect",
},
limit: {
type: "string",
description: `Max log entries to show (default: ${DEFAULT_PAGE_SIZE})`,
default: String(DEFAULT_PAGE_SIZE),
},
offset: {
type: "string",
description: "Skip first N log entries (for pagination)",
default: "0",
},
},
async run({ args }) {
const store = await openStore();
try {
const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE));
const offset = Math.max(0, parseIntArg(args.offset, 0));
const run = store.getWorkflowRun(args.runId);
if (run === null) {
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
process.exit(1);
}
const allLogs = store.query({ source: "workflow", refId: args.runId });
const { header, eventLines, paginationHint } = buildInspectOutput(
run,
allLogs,
offset,
limit,
);
for (const line of [...header, ...eventLines]) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve workflow thread <runId>
// ---------------------------------------------------------------------------
const workflowThreadCommand = defineCommand({
meta: {
name: "thread",
description: "Print role rounds for a workflow run (agent-oriented, budget-limited)",
},
args: {
runId: {
type: "positional",
description: "The run ID to dump role rounds for",
},
before: {
type: "string",
description:
"Exclusive upper bound on 1-based round index (use with hint from prior output to load older rounds)",
default: "0",
},
budget: {
type: "string",
description: `Max output characters including header (default: ${String(DEFAULT_THREAD_BUDGET_CHARS)})`,
default: String(DEFAULT_THREAD_BUDGET_CHARS),
},
},
async run({ args }) {
const store = await openStore();
try {
const before = Math.max(0, parseIntArg(args.before, 0));
const budgetChars = Math.max(1, parseIntArg(args.budget, DEFAULT_THREAD_BUDGET_CHARS));
const run = store.getWorkflowRun(args.runId);
if (run === null) {
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
process.exit(1);
}
const totalRoleRounds = store.getThreadRoundCount(args.runId);
if (totalRoleRounds === 0) {
process.stdout.write(
`🧵 Workflow thread: ${run.runId}\n workflow: ${run.workflow}\n status: ${run.status}\n\n📭 No role rounds recorded for this run.\n`,
);
return;
}
const descRows = store.getThreadRounds(args.runId, {
before,
limit: THREAD_ROUNDS_FETCH_LIMIT,
});
const prefixLines = [
"🧵 Role rounds (workflow thread)\n",
` runId: ${run.runId}\n`,
` workflow: ${run.workflow}\n`,
` status: ${run.status}\n`,
` rounds: ${String(totalRoleRounds)} role event(s) total\n\n`,
];
const { lines, paginationHint } = buildThreadCommandOutput(
prefixLines,
descRows,
budgetChars,
args.runId,
);
for (const line of lines) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
if (descRows.length === 0 && before > 0) {
process.stdout.write(`\n📭 No rounds with index < ${String(before)}.\n`);
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve workflow trigger <name>
// ---------------------------------------------------------------------------
@@ -604,7 +445,7 @@ const workflowTriggerCommand = defineCommand({
}
if (!isRemoteDaemonCli() && !isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve daemon start`.\n");
process.exit(1);
}
@@ -624,47 +465,7 @@ const workflowTriggerCommand = defineCommand({
}
process.stdout.write(`✅ Triggered workflow "${args.name}" via daemon.\n`);
process.stdout.write("\n💡 Inspect active runs with: nerve workflow runs\n");
},
});
// ---------------------------------------------------------------------------
// nerve workflow kill <runId>
// ---------------------------------------------------------------------------
const workflowKillCommand = defineCommand({
meta: {
name: "kill",
description: "Kill a running or queued workflow thread by runId",
},
args: {
runId: {
type: "positional",
description: "The run ID to kill",
},
},
async run({ args }) {
if (!isRemoteDaemonCli() && !isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.exit(1);
}
const transport = resolveDaemonTransport();
let response: { ok: true } | { ok: false; error: string };
try {
response = await transport.killWorkflow(args.runId);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
process.exit(1);
}
if (!response.ok) {
process.stderr.write(`❌ Kill failed: ${response.error}\n`);
process.exit(1);
}
process.stdout.write(`✅ Kill signal sent for run "${args.runId}".\n`);
process.stdout.write("\n💡 Inspect active runs with: nerve thread list\n");
},
});
@@ -675,14 +476,11 @@ const workflowKillCommand = defineCommand({
export const workflowCommand = defineCommand({
meta: {
name: "workflow",
description: "Manage and inspect workflow runs",
description: "Manage workflow definitions and trigger workflows",
},
subCommands: {
list: workflowDaemonListCommand,
runs: workflowRunsCommand,
inspect: workflowInspectCommand,
thread: workflowThreadCommand,
list: workflowListCommand,
status: workflowStatusCommand,
trigger: workflowTriggerCommand,
kill: workflowKillCommand,
},
});