feat: workflow exit codes & kill mechanism #122

Merged
xiaomo merged 1 commits from feat/121-workflow-exit-codes into main 2026-04-25 04:03:30 +00:00
13 changed files with 282 additions and 73 deletions
+2 -1
View File
@@ -45,7 +45,7 @@ function upsertRun(
): void {
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: timestampMs },
{ runId, workflow, status, timestamp: timestampMs },
{ runId, workflow, status, timestamp: timestampMs, exitCode: null },
);
}
@@ -83,6 +83,7 @@ describe("statusIcon", () => {
["crashed", "💥"],
["dropped", "🗑"],
["interrupted", "⚠️"],
["killed", "🛑"],
] as const)("maps status=%s to icon=%s", (status, icon) => {
expect(statusIcon(status)).toBe(icon);
});
+46 -2
View File
@@ -7,7 +7,7 @@ import { defineCommand } from "citty";
import { stringify } from "yaml";
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
import { triggerWorkflowViaDaemon } from "../daemon-client.js";
import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
@@ -59,6 +59,8 @@ export function statusIcon(status: WorkflowRun["status"]): string {
return "🗑";
case "interrupted":
return "⚠️";
case "killed":
return "🛑";
default: {
const _exhaustive: never = status;
return `?(${_exhaustive})`;
@@ -79,7 +81,8 @@ export function getAllWorkflowRuns(store: LogStore, filterWorkflow: string | nul
*/
export function formatRunLine(run: WorkflowRun): string {
const icon = statusIcon(run.status);
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status} timestamp=${formatTs(run.timestamp)}\n`;
const exitCodeStr = run.exitCode !== null ? ` exit_code=${run.exitCode}` : "";
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status}${exitCodeStr} timestamp=${formatTs(run.timestamp)}\n`;
}
/**
@@ -563,6 +566,46 @@ const workflowTriggerCommand = defineCommand({
},
});
// ---------------------------------------------------------------------------
// nerve workflow kill <runId>
// ---------------------------------------------------------------------------
const workflowKillCommand = defineCommand({
meta: {
name: "kill",
description: "Kill a running or queued workflow thread by runId",
},
args: {
runId: {
type: "positional",
description: "The run ID to kill",
},
},
async run({ args }) {
if (!isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.exit(1);
}
const socketPath = getSocketPath();
let response: DaemonIpcTriggerResponse;
try {
response = await killWorkflowViaDaemon(socketPath, args.runId);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
process.exit(1);
}
if (!response.ok) {
process.stderr.write(`❌ Kill failed: ${response.error}\n`);
process.exit(1);
}
process.stdout.write(`✅ Kill signal sent for run "${args.runId}".\n`);
},
});
// ---------------------------------------------------------------------------
// nerve workflow (parent command)
// ---------------------------------------------------------------------------
@@ -577,5 +620,6 @@ export const workflowCommand = defineCommand({
inspect: workflowInspectCommand,
thread: workflowThreadCommand,
trigger: workflowTriggerCommand,
kill: workflowKillCommand,
},
});
+12
View File
@@ -167,3 +167,15 @@ export function listSensesViaDaemon(socketPath: string): Promise<DaemonIpcListSe
const message: DaemonIpcRequest = { type: "list-senses" };
return sendAndReceive(socketPath, message, parseListSensesResponse);
}
/**
* Send a kill-workflow message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors.
*/
export function killWorkflowViaDaemon(
socketPath: string,
runId: string,
): Promise<DaemonIpcTriggerResponse> {
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
return sendAndReceive(socketPath, message, parseDaemonResponse);
}
+12 -1
View File
@@ -27,11 +27,18 @@ export type DaemonIpcListSensesRequest = {
type: "list-senses";
};
/** Client → daemon: kill a running or queued workflow thread by runId. */
export type DaemonIpcKillWorkflowRequest = {
type: "kill-workflow";
runId: string;
};
/** Union of all JSON requests the daemon IPC server accepts. */
export type DaemonIpcRequest =
| DaemonIpcTriggerWorkflowRequest
| DaemonIpcTriggerSenseRequest
| DaemonIpcListSensesRequest;
| DaemonIpcListSensesRequest
| DaemonIpcKillWorkflowRequest;
/** Successful trigger / trigger-sense reply (no body). */
export type DaemonIpcTriggerOkResponse = { ok: true };
@@ -87,6 +94,10 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
if (req.type === "list-senses") {
return { type: "list-senses" };
}
if (req.type === "kill-workflow") {
if (typeof req.runId !== "string" || req.runId.length === 0) return null;
return { type: "kill-workflow", runId: req.runId };
}
return null;
} catch {
return null;
+1
View File
@@ -38,6 +38,7 @@ export type {
DaemonIpcTriggerWorkflowRequest,
DaemonIpcTriggerSenseRequest,
DaemonIpcListSensesRequest,
DaemonIpcKillWorkflowRequest,
DaemonIpcRequest,
DaemonIpcTriggerOkResponse,
DaemonIpcErrorResponse,
@@ -76,6 +76,7 @@ function makeLogStore(
timestamp: number;
}> = [],
) {
const runsWithExitCode = activeRuns.map((r) => ({ ...r, exitCode: null }));
const store = {
append: vi.fn(),
query: vi.fn(() => []),
@@ -86,9 +87,9 @@ function makeLogStore(
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn((_workflowName?: string) => {
if (_workflowName !== undefined) {
return activeRuns.filter((r) => r.workflow === _workflowName);
return runsWithExitCode.filter((r) => r.workflow === _workflowName);
}
return activeRuns;
return runsWithExitCode;
}),
getTriggerPayload: vi.fn((): unknown => ({ value: 42 })),
getThreadEvents: vi.fn(
+6
View File
@@ -67,6 +67,12 @@ export function createDaemonIpcServer(
const senses = opts.listSenses();
const resp: DaemonIpcResponse = { ok: true, senses };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "kill-workflow") {
const found = workflowManager.killThread(req.runId);
const resp: DaemonIpcResponse = found
? { ok: true }
: { ok: false, error: `Run not found or already finished: ${req.runId}` };
socket.write(`${JSON.stringify(resp)}\n`);
} else {
const _exhaustive: never = req;
void _exhaustive;
+27 -2
View File
@@ -50,13 +50,20 @@ export type ResumeThreadMessage = {
dryRun: boolean;
};
/** Parent → Workflow Worker: kill a specific running thread */
export type KillThreadMessage = {
type: "kill-thread";
runId: string;
};
/** Union of all messages the parent sends to a worker */
export type ParentToWorkerMessage =
| ComputeMessage
| ShutdownMessage
| HealthRequestMessage
| StartThreadMessage
| ResumeThreadMessage;
| ResumeThreadMessage
| KillThreadMessage;
/** Worker → Parent: compute produced a signal */
export type SignalMessage = {
@@ -89,7 +96,13 @@ export type HealthResponseMessage = {
// ---------------------------------------------------------------------------
/** Valid lifecycle event types for a workflow thread. */
export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed";
export type ThreadEventType =
| "queued"
| "started"
| "step_complete"
| "completed"
| "failed"
| "killed";
/**
* Workflow Worker → Parent: a thread lifecycle event.
@@ -106,6 +119,8 @@ export type WorkflowErrorMessage = {
type: "workflow-error";
runId: string;
error: string;
/** Exit code conveying the failure reason (1=role error, 2=maxRounds exhausted). */
exitCode: number;
};
/** Workflow Worker → Parent: a WorkflowMessage produced by a role (for crash recovery). */
@@ -132,6 +147,7 @@ const PARENT_MSG_TYPES = new Set([
"health-request",
"start-thread",
"resume-thread",
"kill-thread",
]);
function validateStartThreadMsg(obj: Record<string, unknown>): string | null {
@@ -201,6 +217,12 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
dryRun: obj.dryRun,
} as ResumeThreadMessage);
}
if (obj.type === "kill-thread") {
if (typeof obj.runId !== "string") {
return err(new Error("'kill-thread' message missing string 'runId'"));
}
return ok({ type: "kill-thread", runId: obj.runId } as KillThreadMessage);
}
return err(new Error(`Unhandled IPC message type: "${obj.type}"`));
}
@@ -254,6 +276,7 @@ function isThreadEventType(value: string): value is ThreadEventType {
case "step_complete":
case "completed":
case "failed":
case "killed":
return true;
default:
return false;
@@ -287,10 +310,12 @@ function parseWorkflowErrorMsg(obj: Record<string, unknown>): Result<WorkerToPar
if (typeof obj.error !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
}
const exitCode = typeof obj.exitCode === "number" ? obj.exitCode : 1;
return ok({
type: "workflow-error",
runId: obj.runId,
error: obj.error,
exitCode,
});
}
+52 -8
View File
@@ -16,6 +16,7 @@ import { START, isPlainRecord } from "@uncaged/nerve-core";
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
import type {
KillThreadMessage,
ResumeThreadMessage,
ShutdownMessage,
StartThreadMessage,
@@ -37,6 +38,11 @@ export type WorkflowLaunchParams = {
export type WorkflowManager = {
/** Trigger a new workflow thread (Sense-driven launch or CLI / IPC). */
startWorkflow: (workflowName: string, launch: WorkflowLaunchParams) => void;
/**
* Kill a running or queued workflow thread by runId.
* Returns true if the thread was found, false if not found.
*/
killThread: (runId: string) => boolean;
/** Number of currently active (running) threads for a workflow. */
activeCount: (workflowName: string) => number;
/** Number of pending queued threads waiting to run for a workflow. */
@@ -181,6 +187,16 @@ function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void
}
}
function sendKillThread(worker: ChildProcess, runId: string): void {
if (worker.connected === false) return;
const msg: KillThreadMessage = { type: "kill-thread", runId };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
@@ -229,15 +245,24 @@ export function createWorkflowManager(
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
killed: "killed",
};
return map[eventType] ?? null;
}
function extractExitCode(payload: unknown): number | null {
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
return payload.exitCode;
}
return null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload?: unknown,
exitCode: number | null = null,
): void {
const timestamp = Date.now();
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
@@ -252,7 +277,7 @@ export function createWorkflowManager(
payload: serialised,
timestamp,
},
{ runId, workflow: workflowName, status, timestamp },
{ runId, workflow: workflowName, status, timestamp, exitCode },
);
} else {
logStore.append({
@@ -307,13 +332,11 @@ export function createWorkflowManager(
const state = states.get(workflowName);
if (state === undefined) return;
if (msg.eventType === "completed" || msg.eventType === "failed") {
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
if (msg.eventType === "completed" || msg.eventType === "failed") {
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload);
const exitCode = extractExitCode(msg.payload);
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
}
}
@@ -399,7 +422,7 @@ export function createWorkflowManager(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
);
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "crashed");
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
}
}
@@ -460,7 +483,7 @@ export function createWorkflowManager(
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
return;
}
@@ -541,6 +564,26 @@ export function createWorkflowManager(
return entry;
}
function killThread(runId: string): boolean {
for (const [workflowName, state] of states) {
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
if (queueIdx !== -1) {
state.queue.splice(queueIdx, 1);
logWorkflowEvent(workflowName, runId, "killed", { exitCode: 137 }, 137);
return true;
}
if (state.active.has(runId)) {
const workerEntry = workers.get(workflowName);
if (workerEntry !== undefined) {
sendKillThread(workerEntry.process, runId);
}
Review

⚠️ 如果 worker 已经 crash 或 IPC 断开,sendKillThread 静默失败,但这里已经返回 true。调用方会认为 kill 成功,但线程实际上会一直留在 state.active

建议:后续可以加一个 timeout 机制——如果 N 秒后没收到 killed 事件,强制从 active 中移除并记录 killed+255。

⚠️ 如果 worker 已经 crash 或 IPC 断开,`sendKillThread` 静默失败,但这里已经返回 `true`。调用方会认为 kill 成功,但线程实际上会一直留在 `state.active`。 建议:后续可以加一个 timeout 机制——如果 N 秒后没收到 killed 事件,强制从 active 中移除并记录 killed+255。
return true;
}
}
return false;
}
function startWorkflow(workflowName: string, launch: WorkflowLaunchParams): void {
if (stopped) return;
@@ -652,6 +695,7 @@ export function createWorkflowManager(
return {
startWorkflow,
killThread,
activeCount,
queueLength,
totalActiveCount,
+41 -9
View File
@@ -41,8 +41,8 @@ function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unk
send({ type: "thread-event", runId, eventType, payload });
}
function sendWorkflowError(runId: string, error: string): void {
send({ type: "workflow-error", runId, error });
function sendWorkflowError(runId: string, error: string, exitCode = 1): void {
send({ type: "workflow-error", runId, error, exitCode });
}
function sendWorkflowMessage(runId: string, message: WorkflowMessage): void {
@@ -178,7 +178,7 @@ async function executeRole(
result = await role(start, messages);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
sendThreadEvent(runId, "failed", { error: errMsg, exitCode: 1 });
return null;
}
@@ -186,10 +186,13 @@ async function executeRole(
return result;
}
type KillFlag = { value: boolean };
async function runThread(
def: WorkflowDefinition<RoleMeta>,
runId: string,
maxRounds: number,
killFlag: KillFlag,
resumeMessages: WorkflowMessage[] = [],
freshPrompt: string | null = null,
dryRun = false,
@@ -219,15 +222,26 @@ async function runThread(
});
}
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
return;
}
let nextRole = def.moderator({ start, steps });
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
sendThreadEvent(runId, "completed", { exitCode: 0 });
return;
}
while (steps.length < maxRounds) {
const result = await executeRole(def, nextRole, start, roleMessages, runId);
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
Review

💡 killFlag 仅在 role 执行完成后检查。如果 role 内部有长阻塞操作(如网络请求),kill 响应会有延迟。

这是 cooperative cancellation 的固有特性,当前实现没问题。如果未来需要更快响应,可以考虑将 killFlag 传入 role 内部或用 AbortController。

💡 killFlag 仅在 role 执行完成后检查。如果 role 内部有长阻塞操作(如网络请求),kill 响应会有延迟。 这是 cooperative cancellation 的固有特性,当前实现没问题。如果未来需要更快响应,可以考虑将 killFlag 传入 role 内部或用 AbortController。
return;
}
if (result === null) return;
const message: WorkflowMessage = {
@@ -249,12 +263,12 @@ async function runThread(
nextRole = def.moderator({ start, steps });
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
sendThreadEvent(runId, "completed", { exitCode: 0 });
return;
}
}
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`);
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`, 2);
}
// ---------------------------------------------------------------------------
@@ -309,6 +323,7 @@ function handleMessage(
raw: unknown,
def: WorkflowDefinition<RoleMeta>,
inFlight: Map<string, Promise<void>>,
killFlags: Map<string, KillFlag>,
shuttingDown: { value: boolean },
): void {
const parseResult = parseParentMessage(raw);
@@ -332,15 +347,19 @@ function handleMessage(
if (shuttingDown.value) return;
const { runId, prompt, maxRounds, dryRun } = msg;
const killFlag: KillFlag = { value: false };
killFlags.set(runId, killFlag);
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, runId, maxRounds, [], prompt, dryRun))
.then(() => runThread(def, runId, maxRounds, killFlag, [], prompt, dryRun))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
killFlags.delete(runId);
});
inFlight.set(runId, next);
@@ -351,20 +370,32 @@ function handleMessage(
if (shuttingDown.value) return;
const { runId, messages, maxRounds, dryRun } = msg;
const killFlag: KillFlag = { value: false };
killFlags.set(runId, killFlag);
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, runId, maxRounds, messages, null, dryRun))
.then(() => runThread(def, runId, maxRounds, killFlag, messages, null, dryRun))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
killFlags.delete(runId);
});
inFlight.set(runId, next);
return;
}
if (msg.type === "kill-thread") {
const flag = killFlags.get(msg.runId);
if (flag !== undefined) {
flag.value = true;
}
return;
}
}
// ---------------------------------------------------------------------------
@@ -382,12 +413,13 @@ async function bootstrap(nerveRoot: string, workflowName: string): Promise<void>
}
const inFlight = new Map<string, Promise<void>>();
const killFlags = new Map<string, KillFlag>();
const shuttingDown = { value: false };
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, def, inFlight, shuttingDown);
handleMessage(raw, def, inFlight, killFlags, shuttingDown);
});
}
@@ -41,7 +41,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: payload }),
timestamp: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
const result = store.getTriggerPayload("run-1");
@@ -57,7 +57,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: null,
timestamp: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
expect(store.getTriggerPayload("run-2")).toBeNull();
@@ -148,7 +148,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: {} }),
timestamp: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
store.append({
source: "workflow",
@@ -31,6 +31,7 @@ describe("LogStore — workflow_runs", () => {
workflow: "cleanup",
status: "started",
timestamp: 1000,
exitCode: null,
};
const entry = store.upsertWorkflowRun(
@@ -53,12 +54,18 @@ describe("LogStore — workflow_runs", () => {
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, timestamp: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", timestamp: 2000 },
{
runId: "run-2",
workflow: "cleanup",
status: "completed",
timestamp: 2000,
exitCode: null,
},
);
const stored = store.getWorkflowRun("run-2");
@@ -79,7 +86,7 @@ describe("LogStore — workflow_runs", () => {
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp, exitCode: null },
);
}
@@ -98,11 +105,23 @@ describe("LogStore — workflow_runs", () => {
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, timestamp: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", timestamp: 100 },
{
runId: "run-4",
workflow: "code-review",
status: "queued",
timestamp: 100,
exitCode: null,
},
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, timestamp: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", timestamp: 200 },
{
runId: "run-4",
workflow: "code-review",
status: "started",
timestamp: 200,
exitCode: null,
},
);
const run = store.getWorkflowRun("run-4");
@@ -115,19 +134,19 @@ describe("LogStore — workflow_runs", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400, exitCode: null },
);
});
@@ -171,13 +190,13 @@ describe("LogStore — workflow_runs", () => {
});
describe("all statuses are storable", () => {
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
it.each(["queued", "started", "completed", "failed", "crashed", "dropped", "killed"] as const)(
"stores status=%s",
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1, exitCode: null },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
},
+47 -34
View File
@@ -58,7 +58,8 @@ export type WorkflowRunStatus =
| "failed"
| "crashed"
| "dropped"
| "interrupted";
| "interrupted"
| "killed";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
@@ -68,6 +69,7 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
"crashed",
"dropped",
"interrupted",
"killed",
]);
function isWorkflowRunStatus(value: string): value is WorkflowRunStatus {
@@ -87,6 +89,7 @@ export type WorkflowRun = {
workflow: string;
status: WorkflowRunStatus;
timestamp: number;
exitCode: number | null;
};
/** One role-produced workflow-message row with 1-based round index (ROW_NUMBER over role messages only). */
@@ -192,10 +195,11 @@ CREATE TABLE IF NOT EXISTS meta (
);
CREATE TABLE IF NOT EXISTS workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL,
exit_code INTEGER
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
@@ -332,6 +336,13 @@ export function createLogStore(dbPath: string): LogStore {
sqlite.exec("PRAGMA journal_mode=WAL");
sqlite.exec(SCHEMA_SQL);
// Migration: add exit_code column for existing databases
try {
sqlite.exec("ALTER TABLE workflow_runs ADD COLUMN exit_code INTEGER");
} catch {
// Column already exists — safe to ignore
}
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, timestamp) VALUES (@source, @type, @refId, @payload, @timestamp)",
);
@@ -342,11 +353,11 @@ export function createLogStore(dbPath: string): LogStore {
);
const upsertWorkflowRunStmt = sqlite.prepare(
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp) VALUES (@runId, @workflow, @status, @timestamp)",
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp, exit_code) VALUES (@runId, @workflow, @status, @timestamp, @exitCode)",
);
const getWorkflowRunStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE run_id = ?",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE run_id = ?",
);
const getTriggerPayloadStmt = sqlite.prepare(
@@ -386,19 +397,19 @@ export function createLogStore(dbPath: string): LogStore {
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
);
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
);
const getAllWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs ORDER BY timestamp DESC",
);
const getAllWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
);
const minLogTsStmt = sqlite.prepare("SELECT MIN(timestamp) AS m FROM logs");
@@ -423,6 +434,7 @@ export function createLogStore(dbPath: string): LogStore {
workflow: run.workflow,
status: run.status,
timestamp: run.timestamp,
exitCode: run.exitCode,
});
return { ...entry, id: Number(info.lastInsertRowid) };
});
@@ -504,31 +516,37 @@ export function createLogStore(dbPath: string): LogStore {
return upsertWorkflowRunTx(entry, run);
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as
| { run_id: string; workflow: string; status: string; timestamp: number }
| undefined;
if (row === undefined) return null;
type SqlWorkflowRunRow = {
run_id: string;
workflow: string;
status: string;
timestamp: number;
exit_code: number | null;
};
function mapWorkflowRunRow(r: SqlWorkflowRunRow): WorkflowRun {
return {
runId: row.run_id,
workflow: row.workflow,
status: validateWorkflowRunStatus(row.status),
timestamp: row.timestamp,
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
exitCode: r.exit_code ?? null,
};
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as SqlWorkflowRunRow | undefined;
if (row === undefined) return null;
return mapWorkflowRunRow(row);
}
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
const rows = (
workflowName !== undefined
? getActiveWorkflowRunsByNameStmt.all(workflowName)
: getActiveWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getAllWorkflowRuns(workflowName: string | null): WorkflowRun[] {
@@ -536,13 +554,8 @@ export function createLogStore(dbPath: string): LogStore {
workflowName !== null
? getAllWorkflowRunsByNameStmt.all(workflowName)
: getAllWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getTriggerPayload(runId: string): unknown {