feat: workflow exit codes & kill mechanism

- Add exit_code to workflow_runs (0=success, 1=role error, 2=maxRounds, 137=killed, 255=crash)
- Expand status enum: started/completed/failed/killed
- Add kill-thread IPC message for graceful workflow termination
- Add 'nerve workflow kill <runId>' CLI command
- Show exit_code in 'nerve workflow list' output

Fixes #121
This commit is contained in:
2026-04-25 03:56:57 +00:00
parent 889bbbb474
commit 01d7435c4a
13 changed files with 282 additions and 73 deletions
@@ -41,7 +41,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: payload }),
timestamp: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
const result = store.getTriggerPayload("run-1");
@@ -57,7 +57,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: null,
timestamp: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
expect(store.getTriggerPayload("run-2")).toBeNull();
@@ -148,7 +148,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: {} }),
timestamp: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
store.append({
source: "workflow",
@@ -31,6 +31,7 @@ describe("LogStore — workflow_runs", () => {
workflow: "cleanup",
status: "started",
timestamp: 1000,
exitCode: null,
};
const entry = store.upsertWorkflowRun(
@@ -53,12 +54,18 @@ describe("LogStore — workflow_runs", () => {
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, timestamp: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", timestamp: 2000 },
{
runId: "run-2",
workflow: "cleanup",
status: "completed",
timestamp: 2000,
exitCode: null,
},
);
const stored = store.getWorkflowRun("run-2");
@@ -79,7 +86,7 @@ describe("LogStore — workflow_runs", () => {
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp, exitCode: null },
);
}
@@ -98,11 +105,23 @@ describe("LogStore — workflow_runs", () => {
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, timestamp: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", timestamp: 100 },
{
runId: "run-4",
workflow: "code-review",
status: "queued",
timestamp: 100,
exitCode: null,
},
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, timestamp: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", timestamp: 200 },
{
runId: "run-4",
workflow: "code-review",
status: "started",
timestamp: 200,
exitCode: null,
},
);
const run = store.getWorkflowRun("run-4");
@@ -115,19 +134,19 @@ describe("LogStore — workflow_runs", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400, exitCode: null },
);
});
@@ -171,13 +190,13 @@ describe("LogStore — workflow_runs", () => {
});
describe("all statuses are storable", () => {
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
it.each(["queued", "started", "completed", "failed", "crashed", "dropped", "killed"] as const)(
"stores status=%s",
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1, exitCode: null },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
},
+47 -34
View File
@@ -58,7 +58,8 @@ export type WorkflowRunStatus =
| "failed"
| "crashed"
| "dropped"
| "interrupted";
| "interrupted"
| "killed";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
@@ -68,6 +69,7 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
"crashed",
"dropped",
"interrupted",
"killed",
]);
function isWorkflowRunStatus(value: string): value is WorkflowRunStatus {
@@ -87,6 +89,7 @@ export type WorkflowRun = {
workflow: string;
status: WorkflowRunStatus;
timestamp: number;
exitCode: number | null;
};
/** One role-produced workflow-message row with 1-based round index (ROW_NUMBER over role messages only). */
@@ -192,10 +195,11 @@ CREATE TABLE IF NOT EXISTS meta (
);
CREATE TABLE IF NOT EXISTS workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL,
exit_code INTEGER
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
@@ -332,6 +336,13 @@ export function createLogStore(dbPath: string): LogStore {
sqlite.exec("PRAGMA journal_mode=WAL");
sqlite.exec(SCHEMA_SQL);
// Migration: add exit_code column for existing databases
try {
sqlite.exec("ALTER TABLE workflow_runs ADD COLUMN exit_code INTEGER");
} catch {
// Column already exists — safe to ignore
}
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, timestamp) VALUES (@source, @type, @refId, @payload, @timestamp)",
);
@@ -342,11 +353,11 @@ export function createLogStore(dbPath: string): LogStore {
);
const upsertWorkflowRunStmt = sqlite.prepare(
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp) VALUES (@runId, @workflow, @status, @timestamp)",
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp, exit_code) VALUES (@runId, @workflow, @status, @timestamp, @exitCode)",
);
const getWorkflowRunStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE run_id = ?",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE run_id = ?",
);
const getTriggerPayloadStmt = sqlite.prepare(
@@ -386,19 +397,19 @@ export function createLogStore(dbPath: string): LogStore {
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
);
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
);
const getAllWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs ORDER BY timestamp DESC",
);
const getAllWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
);
const minLogTsStmt = sqlite.prepare("SELECT MIN(timestamp) AS m FROM logs");
@@ -423,6 +434,7 @@ export function createLogStore(dbPath: string): LogStore {
workflow: run.workflow,
status: run.status,
timestamp: run.timestamp,
exitCode: run.exitCode,
});
return { ...entry, id: Number(info.lastInsertRowid) };
});
@@ -504,31 +516,37 @@ export function createLogStore(dbPath: string): LogStore {
return upsertWorkflowRunTx(entry, run);
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as
| { run_id: string; workflow: string; status: string; timestamp: number }
| undefined;
if (row === undefined) return null;
type SqlWorkflowRunRow = {
run_id: string;
workflow: string;
status: string;
timestamp: number;
exit_code: number | null;
};
function mapWorkflowRunRow(r: SqlWorkflowRunRow): WorkflowRun {
return {
runId: row.run_id,
workflow: row.workflow,
status: validateWorkflowRunStatus(row.status),
timestamp: row.timestamp,
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
exitCode: r.exit_code ?? null,
};
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as SqlWorkflowRunRow | undefined;
if (row === undefined) return null;
return mapWorkflowRunRow(row);
}
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
const rows = (
workflowName !== undefined
? getActiveWorkflowRunsByNameStmt.all(workflowName)
: getActiveWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getAllWorkflowRuns(workflowName: string | null): WorkflowRun[] {
@@ -536,13 +554,8 @@ export function createLogStore(dbPath: string): LogStore {
workflowName !== null
? getAllWorkflowRunsByNameStmt.all(workflowName)
: getAllWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getTriggerPayload(runId: string): unknown {