Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 01d7435c4a | |||
| 889bbbb474 | |||
| 418ae6a073 | |||
| c6f56155c8 | |||
| 3ce9e3a846 | |||
| 0fff8ef954 |
@@ -45,7 +45,7 @@ function upsertRun(
|
||||
): void {
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: timestampMs },
|
||||
{ runId, workflow, status, timestamp: timestampMs },
|
||||
{ runId, workflow, status, timestamp: timestampMs, exitCode: null },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -83,6 +83,7 @@ describe("statusIcon", () => {
|
||||
["crashed", "💥"],
|
||||
["dropped", "🗑"],
|
||||
["interrupted", "⚠️"],
|
||||
["killed", "🛑"],
|
||||
] as const)("maps status=%s to icon=%s", (status, icon) => {
|
||||
expect(statusIcon(status)).toBe(icon);
|
||||
});
|
||||
|
||||
@@ -7,7 +7,7 @@ import { defineCommand } from "citty";
|
||||
import { stringify } from "yaml";
|
||||
|
||||
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
|
||||
import { triggerWorkflowViaDaemon } from "../daemon-client.js";
|
||||
import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js";
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
|
||||
|
||||
@@ -59,6 +59,8 @@ export function statusIcon(status: WorkflowRun["status"]): string {
|
||||
return "🗑";
|
||||
case "interrupted":
|
||||
return "⚠️";
|
||||
case "killed":
|
||||
return "🛑";
|
||||
default: {
|
||||
const _exhaustive: never = status;
|
||||
return `?(${_exhaustive})`;
|
||||
@@ -79,7 +81,8 @@ export function getAllWorkflowRuns(store: LogStore, filterWorkflow: string | nul
|
||||
*/
|
||||
export function formatRunLine(run: WorkflowRun): string {
|
||||
const icon = statusIcon(run.status);
|
||||
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status} timestamp=${formatTs(run.timestamp)}\n`;
|
||||
const exitCodeStr = run.exitCode !== null ? ` exit_code=${run.exitCode}` : "";
|
||||
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status}${exitCodeStr} timestamp=${formatTs(run.timestamp)}\n`;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -563,6 +566,46 @@ const workflowTriggerCommand = defineCommand({
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow kill <runId>
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const workflowKillCommand = defineCommand({
|
||||
meta: {
|
||||
name: "kill",
|
||||
description: "Kill a running or queued workflow thread by runId",
|
||||
},
|
||||
args: {
|
||||
runId: {
|
||||
type: "positional",
|
||||
description: "The run ID to kill",
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
if (!isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: DaemonIpcTriggerResponse;
|
||||
try {
|
||||
response = await killWorkflowViaDaemon(socketPath, args.runId);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
process.stderr.write(`❌ Kill failed: ${response.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
process.stdout.write(`✅ Kill signal sent for run "${args.runId}".\n`);
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow (parent command)
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -577,5 +620,6 @@ export const workflowCommand = defineCommand({
|
||||
inspect: workflowInspectCommand,
|
||||
thread: workflowThreadCommand,
|
||||
trigger: workflowTriggerCommand,
|
||||
kill: workflowKillCommand,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -167,3 +167,15 @@ export function listSensesViaDaemon(socketPath: string): Promise<DaemonIpcListSe
|
||||
const message: DaemonIpcRequest = { type: "list-senses" };
|
||||
return sendAndReceive(socketPath, message, parseListSensesResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a kill-workflow message to the running daemon via its Unix socket.
|
||||
* Resolves with the daemon's response or rejects on connection/timeout errors.
|
||||
*/
|
||||
export function killWorkflowViaDaemon(
|
||||
socketPath: string,
|
||||
runId: string,
|
||||
): Promise<DaemonIpcTriggerResponse> {
|
||||
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
|
||||
return sendAndReceive(socketPath, message, parseDaemonResponse);
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ Shared types and configuration parser for the [nerve](../../README.md) observati
|
||||
- **Config parser** — `parseNerveConfig(yaml)` validates and parses `nerve.yaml` into `NerveConfig` (rejects reflex entries that declare a `workflow` key; reflexes only schedule senses)
|
||||
- **Sense → workflow routing** — `parseSenseWorkflowDirective`, `routeSenseComputeOutput`, and types `ParsedSenseWorkflowDirective`, `SenseComputeRoute`
|
||||
- **Daemon IPC protocol** — request/response types (`DaemonIpcRequest`, `DaemonIpcResponse`, …) and `parseDaemonIpcRequest` for newline-delimited JSON on the CLI ↔ daemon socket
|
||||
- **Workflow automaton types** — `START` / `END` sentinel constants, `WorkflowMessage`, `StartStep`, `RoleStep`, `Moderator`, `WorkflowDefinition`, `Role`, `SenseResult`, plus `DEFAULT_ENGINE_MAX_ROUNDS`
|
||||
- **Workflow automaton types** — `START` / `END` sentinel constants, `WorkflowMessage`, `StartStep`, `RoleStep`, `ModeratorContext` (`start` + `steps`; empty `steps` on first moderator call), `Moderator` (single `context` argument), `WorkflowDefinition`, `Role`, `SenseResult`, plus `DEFAULT_ENGINE_MAX_ROUNDS`
|
||||
- **Result type** — `Result<T>` with `ok()` / `err()` helpers for explicit error handling (no thrown exceptions for parse paths)
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { parseNerveConfig } from "../config.js";
|
||||
import { parseNerveConfig } from "../parse-nerve-config.js";
|
||||
|
||||
const VALID_CONFIG = `
|
||||
senses:
|
||||
|
||||
+30
-295
@@ -1,302 +1,37 @@
|
||||
import { parse } from "yaml";
|
||||
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { Result } from "./result.js";
|
||||
import { err, ok } from "./result.js";
|
||||
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./types.js";
|
||||
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js";
|
||||
|
||||
const DURATION_RE = /^(\d+)([smh])$/;
|
||||
|
||||
const DURATION_MULTIPLIERS: Record<string, number> = {
|
||||
s: 1_000,
|
||||
m: 60_000,
|
||||
h: 3_600_000,
|
||||
export type SenseConfig = {
|
||||
group: string;
|
||||
throttle: number | null;
|
||||
timeout: number | null;
|
||||
gracePeriod: number | null;
|
||||
};
|
||||
|
||||
function parseDurationToMs(value: string): number | null {
|
||||
const match = DURATION_RE.exec(value);
|
||||
if (!match) return null;
|
||||
return Number(match[1]) * DURATION_MULTIPLIERS[match[2]];
|
||||
}
|
||||
export type SenseReflexConfig = {
|
||||
kind: "sense";
|
||||
sense: string;
|
||||
interval: number | null;
|
||||
on: string[];
|
||||
};
|
||||
|
||||
function isValidGroupName(value: string): boolean {
|
||||
return /^[a-zA-Z0-9_-]+$/.test(value);
|
||||
}
|
||||
/** Reflexes only schedule Senses; workflow launches come from Sense return values. */
|
||||
export type ReflexConfig = SenseReflexConfig;
|
||||
|
||||
function parseDurationField(field: unknown, label: string): Result<number | null> {
|
||||
if (field === undefined || field === null) return ok(null);
|
||||
if (typeof field !== "string") {
|
||||
return err(
|
||||
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
|
||||
);
|
||||
}
|
||||
const ms = parseDurationToMs(field);
|
||||
if (ms === null) {
|
||||
return err(
|
||||
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
|
||||
);
|
||||
}
|
||||
return ok(ms);
|
||||
}
|
||||
export type DropOverflowConfig = {
|
||||
concurrency: number;
|
||||
overflow: "drop";
|
||||
};
|
||||
|
||||
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`senses.${name}: must be an object`));
|
||||
}
|
||||
export type QueueOverflowConfig = {
|
||||
concurrency: number;
|
||||
overflow: "queue";
|
||||
maxQueue: number;
|
||||
};
|
||||
|
||||
const obj = raw;
|
||||
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
|
||||
|
||||
if (typeof obj.group !== "string" || obj.group.trim() === "") {
|
||||
return err(new Error(`senses.${name}.group: required string`));
|
||||
}
|
||||
|
||||
if (!isValidGroupName(obj.group)) {
|
||||
return err(
|
||||
new Error(
|
||||
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
|
||||
if (!throttleResult.ok) return throttleResult;
|
||||
|
||||
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
|
||||
if (!timeoutResult.ok) return timeoutResult;
|
||||
|
||||
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
|
||||
if (!graceResult.ok) return graceResult;
|
||||
|
||||
return ok({
|
||||
group: obj.group,
|
||||
throttle: throttleResult.value,
|
||||
timeout: timeoutResult.value,
|
||||
gracePeriod: graceResult.value,
|
||||
});
|
||||
}
|
||||
|
||||
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
|
||||
if (obj.on === undefined || obj.on === null) return ok([]);
|
||||
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
|
||||
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
|
||||
}
|
||||
return ok(obj.on);
|
||||
}
|
||||
|
||||
function parseSenseReflex(
|
||||
index: number,
|
||||
obj: Record<string, unknown>,
|
||||
senseNames: Set<string>,
|
||||
on: string[],
|
||||
): Result<ReflexConfig> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error(`reflexes[${index}].sense: must be a string`));
|
||||
}
|
||||
if (!senseNames.has(obj.sense)) {
|
||||
return err(new Error(`reflexes[${index}].sense: "${obj.sense}" not found in senses`));
|
||||
}
|
||||
|
||||
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
|
||||
if (intervalResult.value === null && on.length === 0) {
|
||||
return err(
|
||||
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
kind: "sense" as const,
|
||||
sense: obj.sense,
|
||||
interval: intervalResult.value,
|
||||
on,
|
||||
});
|
||||
}
|
||||
|
||||
function validateReflexConfig(
|
||||
index: number,
|
||||
raw: unknown,
|
||||
senseNames: Set<string>,
|
||||
): Result<ReflexConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`reflexes[${index}]: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
const hasSense = obj.sense !== undefined;
|
||||
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
|
||||
|
||||
if (hasWorkflowKey) {
|
||||
return err(
|
||||
new Error(
|
||||
`reflexes[${index}]: YAML "workflow" entries are not supported — start workflows from a Sense compute return value using a "workflow" string field (format: name|maxRounds|prompt)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
if (!hasSense) {
|
||||
return err(new Error(`reflexes[${index}]: must include "sense"`));
|
||||
}
|
||||
|
||||
const onResult = parseOnField(index, obj);
|
||||
if (!onResult.ok) return onResult;
|
||||
|
||||
return parseSenseReflex(index, obj, senseNames, onResult.value);
|
||||
}
|
||||
|
||||
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
|
||||
if (obj.max_rounds === undefined || obj.max_rounds === null) {
|
||||
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
|
||||
}
|
||||
if (
|
||||
typeof obj.max_rounds !== "number" ||
|
||||
!Number.isInteger(obj.max_rounds) ||
|
||||
obj.max_rounds < 1
|
||||
) {
|
||||
return err(new Error("max_rounds: must be a positive integer"));
|
||||
}
|
||||
return ok(obj.max_rounds);
|
||||
}
|
||||
|
||||
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`workflows.${name}: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
|
||||
if (
|
||||
typeof obj.concurrency !== "number" ||
|
||||
!Number.isInteger(obj.concurrency) ||
|
||||
obj.concurrency < 1
|
||||
) {
|
||||
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
|
||||
}
|
||||
|
||||
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
|
||||
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
|
||||
}
|
||||
|
||||
if (obj.overflow === "drop") {
|
||||
if (obj.max_queue !== undefined && obj.max_queue !== null) {
|
||||
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
|
||||
}
|
||||
return ok({
|
||||
concurrency: obj.concurrency,
|
||||
overflow: "drop" as const,
|
||||
});
|
||||
}
|
||||
|
||||
// overflow: "queue"
|
||||
let maxQueue = 100; // default
|
||||
if (obj.max_queue !== undefined && obj.max_queue !== null) {
|
||||
if (
|
||||
typeof obj.max_queue !== "number" ||
|
||||
!Number.isInteger(obj.max_queue) ||
|
||||
obj.max_queue < 1
|
||||
) {
|
||||
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
|
||||
}
|
||||
maxQueue = obj.max_queue;
|
||||
}
|
||||
|
||||
return ok({
|
||||
concurrency: obj.concurrency,
|
||||
overflow: "queue" as const,
|
||||
maxQueue,
|
||||
});
|
||||
}
|
||||
|
||||
function parseSenses(
|
||||
obj: Record<string, unknown>,
|
||||
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
|
||||
if (!isPlainRecord(obj.senses)) {
|
||||
return err(new Error("senses: required object"));
|
||||
}
|
||||
|
||||
const sensesRaw = obj.senses;
|
||||
const senses: Record<string, SenseConfig> = {};
|
||||
const senseNames = new Set(Object.keys(sensesRaw));
|
||||
|
||||
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
|
||||
const result = validateSenseConfig(name, senseRaw);
|
||||
if (!result.ok) return result;
|
||||
senses[name] = result.value;
|
||||
}
|
||||
|
||||
return ok({ senses, senseNames });
|
||||
}
|
||||
|
||||
function parseReflexes(
|
||||
obj: Record<string, unknown>,
|
||||
senseNames: Set<string>,
|
||||
): Result<ReflexConfig[]> {
|
||||
if (!Array.isArray(obj.reflexes)) {
|
||||
return err(new Error("reflexes: required array"));
|
||||
}
|
||||
|
||||
const reflexes: ReflexConfig[] = [];
|
||||
for (let i = 0; i < obj.reflexes.length; i++) {
|
||||
const result = validateReflexConfig(i, obj.reflexes[i], senseNames);
|
||||
if (!result.ok) return result;
|
||||
reflexes.push(result.value);
|
||||
}
|
||||
|
||||
return ok(reflexes);
|
||||
}
|
||||
|
||||
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
|
||||
if (obj.workflows === undefined || obj.workflows === null) return ok({});
|
||||
|
||||
if (!isPlainRecord(obj.workflows)) {
|
||||
return err(new Error("workflows: must be an object if provided"));
|
||||
}
|
||||
|
||||
const workflowsRaw = obj.workflows;
|
||||
const workflows: Record<string, WorkflowConfig> = {};
|
||||
|
||||
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
|
||||
const result = validateWorkflowConfig(name, wfRaw);
|
||||
if (!result.ok) return result;
|
||||
workflows[name] = result.value;
|
||||
}
|
||||
|
||||
return ok(workflows);
|
||||
}
|
||||
|
||||
export function parseNerveConfig(raw: string): Result<NerveConfig> {
|
||||
let parsed: unknown;
|
||||
|
||||
try {
|
||||
parsed = parse(raw);
|
||||
} catch (e) {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`YAML parse error: ${message}`));
|
||||
}
|
||||
|
||||
if (!isPlainRecord(parsed)) {
|
||||
return err(new Error("Config must be a YAML object"));
|
||||
}
|
||||
|
||||
const obj = parsed;
|
||||
|
||||
const sensesResult = parseSenses(obj);
|
||||
if (!sensesResult.ok) return sensesResult;
|
||||
const { senses, senseNames } = sensesResult.value;
|
||||
|
||||
const reflexesResult = parseReflexes(obj, senseNames);
|
||||
if (!reflexesResult.ok) return reflexesResult;
|
||||
|
||||
const workflowsResult = parseWorkflows(obj);
|
||||
if (!workflowsResult.ok) return workflowsResult;
|
||||
|
||||
const maxRoundsResult = parseEngineMaxRounds(obj);
|
||||
if (!maxRoundsResult.ok) return maxRoundsResult;
|
||||
|
||||
return ok({
|
||||
maxRounds: maxRoundsResult.value,
|
||||
senses,
|
||||
reflexes: reflexesResult.value,
|
||||
workflows: workflowsResult.value,
|
||||
});
|
||||
}
|
||||
export type NerveConfig = {
|
||||
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
||||
maxRounds: number;
|
||||
senses: Record<string, SenseConfig>;
|
||||
reflexes: ReflexConfig[];
|
||||
workflows: Record<string, WorkflowConfig>;
|
||||
};
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
*/
|
||||
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { SenseInfo } from "./types.js";
|
||||
import type { SenseInfo } from "./sense.js";
|
||||
|
||||
/** Client → daemon: start a workflow run. */
|
||||
export type DaemonIpcTriggerWorkflowRequest = {
|
||||
@@ -27,11 +27,18 @@ export type DaemonIpcListSensesRequest = {
|
||||
type: "list-senses";
|
||||
};
|
||||
|
||||
/** Client → daemon: kill a running or queued workflow thread by runId. */
|
||||
export type DaemonIpcKillWorkflowRequest = {
|
||||
type: "kill-workflow";
|
||||
runId: string;
|
||||
};
|
||||
|
||||
/** Union of all JSON requests the daemon IPC server accepts. */
|
||||
export type DaemonIpcRequest =
|
||||
| DaemonIpcTriggerWorkflowRequest
|
||||
| DaemonIpcTriggerSenseRequest
|
||||
| DaemonIpcListSensesRequest;
|
||||
| DaemonIpcListSensesRequest
|
||||
| DaemonIpcKillWorkflowRequest;
|
||||
|
||||
/** Successful trigger / trigger-sense reply (no body). */
|
||||
export type DaemonIpcTriggerOkResponse = { ok: true };
|
||||
@@ -87,6 +94,10 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
|
||||
if (req.type === "list-senses") {
|
||||
return { type: "list-senses" };
|
||||
}
|
||||
if (req.type === "kill-workflow") {
|
||||
if (typeof req.runId !== "string" || req.runId.length === 0) return null;
|
||||
return { type: "kill-workflow", runId: req.runId };
|
||||
}
|
||||
return null;
|
||||
} catch {
|
||||
return null;
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
export type {
|
||||
Signal,
|
||||
SenseConfig,
|
||||
SenseInfo,
|
||||
SenseReflexConfig,
|
||||
ReflexConfig,
|
||||
DropOverflowConfig,
|
||||
QueueOverflowConfig,
|
||||
WorkflowConfig,
|
||||
NerveConfig,
|
||||
} from "./config.js";
|
||||
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
|
||||
export type {
|
||||
WorkflowMessage,
|
||||
RoleResult,
|
||||
Role,
|
||||
@@ -17,12 +18,11 @@ export type {
|
||||
ModeratorContext,
|
||||
Moderator,
|
||||
WorkflowDefinition,
|
||||
SenseResult,
|
||||
} from "./types.js";
|
||||
export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js";
|
||||
} from "./workflow.js";
|
||||
export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
|
||||
export type { Result } from "./result.js";
|
||||
export { ok, err } from "./result.js";
|
||||
export { parseNerveConfig } from "./config.js";
|
||||
export { parseNerveConfig } from "./parse-nerve-config.js";
|
||||
export { isPlainRecord } from "./is-plain-record.js";
|
||||
|
||||
export type {
|
||||
@@ -38,6 +38,7 @@ export type {
|
||||
DaemonIpcTriggerWorkflowRequest,
|
||||
DaemonIpcTriggerSenseRequest,
|
||||
DaemonIpcListSensesRequest,
|
||||
DaemonIpcKillWorkflowRequest,
|
||||
DaemonIpcRequest,
|
||||
DaemonIpcTriggerOkResponse,
|
||||
DaemonIpcErrorResponse,
|
||||
|
||||
@@ -0,0 +1,302 @@
|
||||
import { parse } from "yaml";
|
||||
|
||||
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./config.js";
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { Result } from "./result.js";
|
||||
import { err, ok } from "./result.js";
|
||||
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
|
||||
|
||||
const DURATION_RE = /^(\d+)([smh])$/;
|
||||
|
||||
const DURATION_MULTIPLIERS: Record<string, number> = {
|
||||
s: 1_000,
|
||||
m: 60_000,
|
||||
h: 3_600_000,
|
||||
};
|
||||
|
||||
function parseDurationToMs(value: string): number | null {
|
||||
const match = DURATION_RE.exec(value);
|
||||
if (!match) return null;
|
||||
return Number(match[1]) * DURATION_MULTIPLIERS[match[2]];
|
||||
}
|
||||
|
||||
function isValidGroupName(value: string): boolean {
|
||||
return /^[a-zA-Z0-9_-]+$/.test(value);
|
||||
}
|
||||
|
||||
function parseDurationField(field: unknown, label: string): Result<number | null> {
|
||||
if (field === undefined || field === null) return ok(null);
|
||||
if (typeof field !== "string") {
|
||||
return err(
|
||||
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
|
||||
);
|
||||
}
|
||||
const ms = parseDurationToMs(field);
|
||||
if (ms === null) {
|
||||
return err(
|
||||
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
|
||||
);
|
||||
}
|
||||
return ok(ms);
|
||||
}
|
||||
|
||||
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`senses.${name}: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
|
||||
if (typeof obj.group !== "string" || obj.group.trim() === "") {
|
||||
return err(new Error(`senses.${name}.group: required string`));
|
||||
}
|
||||
|
||||
if (!isValidGroupName(obj.group)) {
|
||||
return err(
|
||||
new Error(
|
||||
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
|
||||
if (!throttleResult.ok) return throttleResult;
|
||||
|
||||
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
|
||||
if (!timeoutResult.ok) return timeoutResult;
|
||||
|
||||
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
|
||||
if (!graceResult.ok) return graceResult;
|
||||
|
||||
return ok({
|
||||
group: obj.group,
|
||||
throttle: throttleResult.value,
|
||||
timeout: timeoutResult.value,
|
||||
gracePeriod: graceResult.value,
|
||||
});
|
||||
}
|
||||
|
||||
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
|
||||
if (obj.on === undefined || obj.on === null) return ok([]);
|
||||
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
|
||||
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
|
||||
}
|
||||
return ok(obj.on);
|
||||
}
|
||||
|
||||
function parseSenseReflex(
|
||||
index: number,
|
||||
obj: Record<string, unknown>,
|
||||
senseNames: Set<string>,
|
||||
on: string[],
|
||||
): Result<ReflexConfig> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error(`reflexes[${index}].sense: must be a string`));
|
||||
}
|
||||
if (!senseNames.has(obj.sense)) {
|
||||
return err(new Error(`reflexes[${index}].sense: "${obj.sense}" not found in senses`));
|
||||
}
|
||||
|
||||
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
|
||||
if (intervalResult.value === null && on.length === 0) {
|
||||
return err(
|
||||
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
kind: "sense" as const,
|
||||
sense: obj.sense,
|
||||
interval: intervalResult.value,
|
||||
on,
|
||||
});
|
||||
}
|
||||
|
||||
function validateReflexConfig(
|
||||
index: number,
|
||||
raw: unknown,
|
||||
senseNames: Set<string>,
|
||||
): Result<ReflexConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`reflexes[${index}]: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
const hasSense = obj.sense !== undefined;
|
||||
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
|
||||
|
||||
if (hasWorkflowKey) {
|
||||
return err(
|
||||
new Error(
|
||||
`reflexes[${index}]: YAML "workflow" entries are not supported — start workflows from a Sense compute return value using a "workflow" string field (format: name|maxRounds|prompt)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
if (!hasSense) {
|
||||
return err(new Error(`reflexes[${index}]: must include "sense"`));
|
||||
}
|
||||
|
||||
const onResult = parseOnField(index, obj);
|
||||
if (!onResult.ok) return onResult;
|
||||
|
||||
return parseSenseReflex(index, obj, senseNames, onResult.value);
|
||||
}
|
||||
|
||||
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
|
||||
if (obj.max_rounds === undefined || obj.max_rounds === null) {
|
||||
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
|
||||
}
|
||||
if (
|
||||
typeof obj.max_rounds !== "number" ||
|
||||
!Number.isInteger(obj.max_rounds) ||
|
||||
obj.max_rounds < 1
|
||||
) {
|
||||
return err(new Error("max_rounds: must be a positive integer"));
|
||||
}
|
||||
return ok(obj.max_rounds);
|
||||
}
|
||||
|
||||
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`workflows.${name}: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
|
||||
if (
|
||||
typeof obj.concurrency !== "number" ||
|
||||
!Number.isInteger(obj.concurrency) ||
|
||||
obj.concurrency < 1
|
||||
) {
|
||||
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
|
||||
}
|
||||
|
||||
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
|
||||
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
|
||||
}
|
||||
|
||||
if (obj.overflow === "drop") {
|
||||
if (obj.max_queue !== undefined && obj.max_queue !== null) {
|
||||
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
|
||||
}
|
||||
return ok({
|
||||
concurrency: obj.concurrency,
|
||||
overflow: "drop" as const,
|
||||
});
|
||||
}
|
||||
|
||||
// overflow: "queue"
|
||||
let maxQueue = 100; // default
|
||||
if (obj.max_queue !== undefined && obj.max_queue !== null) {
|
||||
if (
|
||||
typeof obj.max_queue !== "number" ||
|
||||
!Number.isInteger(obj.max_queue) ||
|
||||
obj.max_queue < 1
|
||||
) {
|
||||
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
|
||||
}
|
||||
maxQueue = obj.max_queue;
|
||||
}
|
||||
|
||||
return ok({
|
||||
concurrency: obj.concurrency,
|
||||
overflow: "queue" as const,
|
||||
maxQueue,
|
||||
});
|
||||
}
|
||||
|
||||
function parseSenses(
|
||||
obj: Record<string, unknown>,
|
||||
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
|
||||
if (!isPlainRecord(obj.senses)) {
|
||||
return err(new Error("senses: required object"));
|
||||
}
|
||||
|
||||
const sensesRaw = obj.senses;
|
||||
const senses: Record<string, SenseConfig> = {};
|
||||
const senseNames = new Set(Object.keys(sensesRaw));
|
||||
|
||||
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
|
||||
const result = validateSenseConfig(name, senseRaw);
|
||||
if (!result.ok) return result;
|
||||
senses[name] = result.value;
|
||||
}
|
||||
|
||||
return ok({ senses, senseNames });
|
||||
}
|
||||
|
||||
function parseReflexes(
|
||||
obj: Record<string, unknown>,
|
||||
senseNames: Set<string>,
|
||||
): Result<ReflexConfig[]> {
|
||||
if (!Array.isArray(obj.reflexes)) {
|
||||
return err(new Error("reflexes: required array"));
|
||||
}
|
||||
|
||||
const reflexes: ReflexConfig[] = [];
|
||||
for (let i = 0; i < obj.reflexes.length; i++) {
|
||||
const result = validateReflexConfig(i, obj.reflexes[i], senseNames);
|
||||
if (!result.ok) return result;
|
||||
reflexes.push(result.value);
|
||||
}
|
||||
|
||||
return ok(reflexes);
|
||||
}
|
||||
|
||||
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
|
||||
if (obj.workflows === undefined || obj.workflows === null) return ok({});
|
||||
|
||||
if (!isPlainRecord(obj.workflows)) {
|
||||
return err(new Error("workflows: must be an object if provided"));
|
||||
}
|
||||
|
||||
const workflowsRaw = obj.workflows;
|
||||
const workflows: Record<string, WorkflowConfig> = {};
|
||||
|
||||
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
|
||||
const result = validateWorkflowConfig(name, wfRaw);
|
||||
if (!result.ok) return result;
|
||||
workflows[name] = result.value;
|
||||
}
|
||||
|
||||
return ok(workflows);
|
||||
}
|
||||
|
||||
export function parseNerveConfig(raw: string): Result<NerveConfig> {
|
||||
let parsed: unknown;
|
||||
|
||||
try {
|
||||
parsed = parse(raw);
|
||||
} catch (e) {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`YAML parse error: ${message}`));
|
||||
}
|
||||
|
||||
if (!isPlainRecord(parsed)) {
|
||||
return err(new Error("Config must be a YAML object"));
|
||||
}
|
||||
|
||||
const obj = parsed;
|
||||
|
||||
const sensesResult = parseSenses(obj);
|
||||
if (!sensesResult.ok) return sensesResult;
|
||||
const { senses, senseNames } = sensesResult.value;
|
||||
|
||||
const reflexesResult = parseReflexes(obj, senseNames);
|
||||
if (!reflexesResult.ok) return reflexesResult;
|
||||
|
||||
const workflowsResult = parseWorkflows(obj);
|
||||
if (!workflowsResult.ok) return workflowsResult;
|
||||
|
||||
const maxRoundsResult = parseEngineMaxRounds(obj);
|
||||
if (!maxRoundsResult.ok) return maxRoundsResult;
|
||||
|
||||
return ok({
|
||||
maxRounds: maxRoundsResult.value,
|
||||
senses,
|
||||
reflexes: reflexesResult.value,
|
||||
workflows: workflowsResult.value,
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
export type Signal = {
|
||||
id: number;
|
||||
senseId: string;
|
||||
payload: unknown;
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
|
||||
export type SenseInfo = {
|
||||
name: string;
|
||||
group: string;
|
||||
throttle: number | null;
|
||||
timeout: number | null;
|
||||
lastSignalTimestamp: number | null;
|
||||
};
|
||||
|
||||
/** The result of a Sense compute — payload plus optional workflow directive. */
|
||||
export type SenseResult<T = unknown> = {
|
||||
payload: T;
|
||||
workflow: string | null;
|
||||
};
|
||||
@@ -1,57 +1,3 @@
|
||||
export type Signal = {
|
||||
id: number;
|
||||
senseId: string;
|
||||
payload: unknown;
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
export type SenseConfig = {
|
||||
group: string;
|
||||
throttle: number | null;
|
||||
timeout: number | null;
|
||||
gracePeriod: number | null;
|
||||
};
|
||||
|
||||
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
|
||||
export type SenseInfo = {
|
||||
name: string;
|
||||
group: string;
|
||||
throttle: number | null;
|
||||
timeout: number | null;
|
||||
lastSignalTimestamp: number | null;
|
||||
};
|
||||
|
||||
export type SenseReflexConfig = {
|
||||
kind: "sense";
|
||||
sense: string;
|
||||
interval: number | null;
|
||||
on: string[];
|
||||
};
|
||||
|
||||
/** Reflexes only schedule Senses; workflow launches come from Sense return values. */
|
||||
export type ReflexConfig = SenseReflexConfig;
|
||||
|
||||
export type DropOverflowConfig = {
|
||||
concurrency: number;
|
||||
overflow: "drop";
|
||||
};
|
||||
|
||||
export type QueueOverflowConfig = {
|
||||
concurrency: number;
|
||||
overflow: "queue";
|
||||
maxQueue: number;
|
||||
};
|
||||
|
||||
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
|
||||
|
||||
export type NerveConfig = {
|
||||
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
||||
maxRounds: number;
|
||||
senses: Record<string, SenseConfig>;
|
||||
reflexes: ReflexConfig[];
|
||||
workflows: Record<string, WorkflowConfig>;
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow Automaton types (issue #80)
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -103,21 +49,22 @@ export type RoleStep<M extends RoleMeta> = {
|
||||
}[keyof M & string];
|
||||
|
||||
/**
|
||||
* Moderator input: either the initial start frame or a role step after a step.
|
||||
* Lets implementations branch on `context.kind` with full typing for each arm.
|
||||
* Moderator input: the complete workflow history.
|
||||
* Contains the start frame and all role steps so far.
|
||||
* On initial call, `steps` is empty — moderator can check `steps.length === 0`.
|
||||
* Round count is `steps.length`; maxRounds is in `start.meta.maxRounds`.
|
||||
*/
|
||||
export type ModeratorContext<M extends RoleMeta> =
|
||||
| { kind: "start"; start: StartStep }
|
||||
| { kind: "step"; step: RoleStep<M> };
|
||||
export type ModeratorContext<M extends RoleMeta> = {
|
||||
start: StartStep;
|
||||
steps: RoleStep<M>[];
|
||||
};
|
||||
|
||||
/**
|
||||
* The moderator — a pure routing function. Receives start vs step context,
|
||||
* current round, and maxRounds. Returns the next role name or END.
|
||||
* The moderator — a pure routing function. Receives the full workflow context
|
||||
* (start frame + all prior steps). Returns the next role name or END.
|
||||
*/
|
||||
export type Moderator<M extends RoleMeta> = (
|
||||
context: ModeratorContext<M>,
|
||||
round: number,
|
||||
maxRounds: number,
|
||||
) => (keyof M & string) | END;
|
||||
|
||||
/** The complete definition of a workflow, as authored by users. */
|
||||
@@ -126,9 +73,3 @@ export type WorkflowDefinition<M extends RoleMeta> = {
|
||||
roles: { [K in keyof M & string]: Role<M[K]> };
|
||||
moderator: Moderator<M>;
|
||||
};
|
||||
|
||||
/** The result of a Sense compute — payload plus optional workflow directive. */
|
||||
export type SenseResult = {
|
||||
payload: unknown;
|
||||
workflow: string | null;
|
||||
};
|
||||
@@ -76,6 +76,7 @@ function makeLogStore(
|
||||
timestamp: number;
|
||||
}> = [],
|
||||
) {
|
||||
const runsWithExitCode = activeRuns.map((r) => ({ ...r, exitCode: null }));
|
||||
const store = {
|
||||
append: vi.fn(),
|
||||
query: vi.fn(() => []),
|
||||
@@ -86,9 +87,9 @@ function makeLogStore(
|
||||
getWorkflowRun: vi.fn(() => null),
|
||||
getActiveWorkflowRuns: vi.fn((_workflowName?: string) => {
|
||||
if (_workflowName !== undefined) {
|
||||
return activeRuns.filter((r) => r.workflow === _workflowName);
|
||||
return runsWithExitCode.filter((r) => r.workflow === _workflowName);
|
||||
}
|
||||
return activeRuns;
|
||||
return runsWithExitCode;
|
||||
}),
|
||||
getTriggerPayload: vi.fn((): unknown => ({ value: 42 })),
|
||||
getThreadEvents: vi.fn(
|
||||
|
||||
@@ -67,6 +67,12 @@ export function createDaemonIpcServer(
|
||||
const senses = opts.listSenses();
|
||||
const resp: DaemonIpcResponse = { ok: true, senses };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "kill-workflow") {
|
||||
const found = workflowManager.killThread(req.runId);
|
||||
const resp: DaemonIpcResponse = found
|
||||
? { ok: true }
|
||||
: { ok: false, error: `Run not found or already finished: ${req.runId}` };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else {
|
||||
const _exhaustive: never = req;
|
||||
void _exhaustive;
|
||||
|
||||
@@ -50,13 +50,20 @@ export type ResumeThreadMessage = {
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
/** Parent → Workflow Worker: kill a specific running thread */
|
||||
export type KillThreadMessage = {
|
||||
type: "kill-thread";
|
||||
runId: string;
|
||||
};
|
||||
|
||||
/** Union of all messages the parent sends to a worker */
|
||||
export type ParentToWorkerMessage =
|
||||
| ComputeMessage
|
||||
| ShutdownMessage
|
||||
| HealthRequestMessage
|
||||
| StartThreadMessage
|
||||
| ResumeThreadMessage;
|
||||
| ResumeThreadMessage
|
||||
| KillThreadMessage;
|
||||
|
||||
/** Worker → Parent: compute produced a signal */
|
||||
export type SignalMessage = {
|
||||
@@ -89,7 +96,13 @@ export type HealthResponseMessage = {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Valid lifecycle event types for a workflow thread. */
|
||||
export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed";
|
||||
export type ThreadEventType =
|
||||
| "queued"
|
||||
| "started"
|
||||
| "step_complete"
|
||||
| "completed"
|
||||
| "failed"
|
||||
| "killed";
|
||||
|
||||
/**
|
||||
* Workflow Worker → Parent: a thread lifecycle event.
|
||||
@@ -106,6 +119,8 @@ export type WorkflowErrorMessage = {
|
||||
type: "workflow-error";
|
||||
runId: string;
|
||||
error: string;
|
||||
/** Exit code conveying the failure reason (1=role error, 2=maxRounds exhausted). */
|
||||
exitCode: number;
|
||||
};
|
||||
|
||||
/** Workflow Worker → Parent: a WorkflowMessage produced by a role (for crash recovery). */
|
||||
@@ -132,6 +147,7 @@ const PARENT_MSG_TYPES = new Set([
|
||||
"health-request",
|
||||
"start-thread",
|
||||
"resume-thread",
|
||||
"kill-thread",
|
||||
]);
|
||||
|
||||
function validateStartThreadMsg(obj: Record<string, unknown>): string | null {
|
||||
@@ -201,6 +217,12 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
dryRun: obj.dryRun,
|
||||
} as ResumeThreadMessage);
|
||||
}
|
||||
if (obj.type === "kill-thread") {
|
||||
if (typeof obj.runId !== "string") {
|
||||
return err(new Error("'kill-thread' message missing string 'runId'"));
|
||||
}
|
||||
return ok({ type: "kill-thread", runId: obj.runId } as KillThreadMessage);
|
||||
}
|
||||
return err(new Error(`Unhandled IPC message type: "${obj.type}"`));
|
||||
}
|
||||
|
||||
@@ -254,6 +276,7 @@ function isThreadEventType(value: string): value is ThreadEventType {
|
||||
case "step_complete":
|
||||
case "completed":
|
||||
case "failed":
|
||||
case "killed":
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
@@ -287,10 +310,12 @@ function parseWorkflowErrorMsg(obj: Record<string, unknown>): Result<WorkerToPar
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
|
||||
}
|
||||
const exitCode = typeof obj.exitCode === "number" ? obj.exitCode : 1;
|
||||
return ok({
|
||||
type: "workflow-error",
|
||||
runId: obj.runId,
|
||||
error: obj.error,
|
||||
exitCode,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
import type {
|
||||
KillThreadMessage,
|
||||
ResumeThreadMessage,
|
||||
ShutdownMessage,
|
||||
StartThreadMessage,
|
||||
@@ -37,6 +38,11 @@ export type WorkflowLaunchParams = {
|
||||
export type WorkflowManager = {
|
||||
/** Trigger a new workflow thread (Sense-driven launch or CLI / IPC). */
|
||||
startWorkflow: (workflowName: string, launch: WorkflowLaunchParams) => void;
|
||||
/**
|
||||
* Kill a running or queued workflow thread by runId.
|
||||
* Returns true if the thread was found, false if not found.
|
||||
*/
|
||||
killThread: (runId: string) => boolean;
|
||||
/** Number of currently active (running) threads for a workflow. */
|
||||
activeCount: (workflowName: string) => number;
|
||||
/** Number of pending queued threads waiting to run for a workflow. */
|
||||
@@ -181,6 +187,16 @@ function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void
|
||||
}
|
||||
}
|
||||
|
||||
function sendKillThread(worker: ChildProcess, runId: string): void {
|
||||
if (worker.connected === false) return;
|
||||
const msg: KillThreadMessage = { type: "kill-thread", runId };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -229,15 +245,24 @@ export function createWorkflowManager(
|
||||
crashed: "crashed",
|
||||
dropped: "dropped",
|
||||
interrupted: "interrupted",
|
||||
killed: "killed",
|
||||
};
|
||||
return map[eventType] ?? null;
|
||||
}
|
||||
|
||||
function extractExitCode(payload: unknown): number | null {
|
||||
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
|
||||
return payload.exitCode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function logWorkflowEvent(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload?: unknown,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
const timestamp = Date.now();
|
||||
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
|
||||
@@ -252,7 +277,7 @@ export function createWorkflowManager(
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
},
|
||||
{ runId, workflow: workflowName, status, timestamp },
|
||||
{ runId, workflow: workflowName, status, timestamp, exitCode },
|
||||
);
|
||||
} else {
|
||||
logStore.append({
|
||||
@@ -307,13 +332,11 @@ export function createWorkflowManager(
|
||||
const state = states.get(workflowName);
|
||||
if (state === undefined) return;
|
||||
|
||||
if (msg.eventType === "completed" || msg.eventType === "failed") {
|
||||
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
|
||||
if (msg.eventType === "completed" || msg.eventType === "failed") {
|
||||
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload);
|
||||
const exitCode = extractExitCode(msg.payload);
|
||||
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -399,7 +422,7 @@ export function createWorkflowManager(
|
||||
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
|
||||
);
|
||||
for (const runId of state.active) {
|
||||
logWorkflowEvent(workflowName, runId, "crashed");
|
||||
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -460,7 +483,7 @@ export function createWorkflowManager(
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -541,6 +564,26 @@ export function createWorkflowManager(
|
||||
return entry;
|
||||
}
|
||||
|
||||
function killThread(runId: string): boolean {
|
||||
for (const [workflowName, state] of states) {
|
||||
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
|
||||
if (queueIdx !== -1) {
|
||||
state.queue.splice(queueIdx, 1);
|
||||
logWorkflowEvent(workflowName, runId, "killed", { exitCode: 137 }, 137);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (state.active.has(runId)) {
|
||||
const workerEntry = workers.get(workflowName);
|
||||
if (workerEntry !== undefined) {
|
||||
sendKillThread(workerEntry.process, runId);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function startWorkflow(workflowName: string, launch: WorkflowLaunchParams): void {
|
||||
if (stopped) return;
|
||||
|
||||
@@ -652,6 +695,7 @@ export function createWorkflowManager(
|
||||
|
||||
return {
|
||||
startWorkflow,
|
||||
killThread,
|
||||
activeCount,
|
||||
queueLength,
|
||||
totalActiveCount,
|
||||
|
||||
@@ -12,13 +12,7 @@
|
||||
import { existsSync } from "node:fs";
|
||||
import { join, resolve } from "node:path";
|
||||
|
||||
import type {
|
||||
ModeratorContext,
|
||||
RoleMeta,
|
||||
StartStep,
|
||||
WorkflowDefinition,
|
||||
WorkflowMessage,
|
||||
} from "@uncaged/nerve-core";
|
||||
import type { RoleMeta, StartStep, WorkflowDefinition, WorkflowMessage } from "@uncaged/nerve-core";
|
||||
import { END, START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type {
|
||||
@@ -47,8 +41,8 @@ function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unk
|
||||
send({ type: "thread-event", runId, eventType, payload });
|
||||
}
|
||||
|
||||
function sendWorkflowError(runId: string, error: string): void {
|
||||
send({ type: "workflow-error", runId, error });
|
||||
function sendWorkflowError(runId: string, error: string, exitCode = 1): void {
|
||||
send({ type: "workflow-error", runId, error, exitCode });
|
||||
}
|
||||
|
||||
function sendWorkflowMessage(runId: string, message: WorkflowMessage): void {
|
||||
@@ -184,7 +178,7 @@ async function executeRole(
|
||||
result = await role(start, messages);
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendThreadEvent(runId, "failed", { error: errMsg });
|
||||
sendThreadEvent(runId, "failed", { error: errMsg, exitCode: 1 });
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -192,10 +186,13 @@ async function executeRole(
|
||||
return result;
|
||||
}
|
||||
|
||||
type KillFlag = { value: boolean };
|
||||
|
||||
async function runThread(
|
||||
def: WorkflowDefinition<RoleMeta>,
|
||||
runId: string,
|
||||
maxRounds: number,
|
||||
killFlag: KillFlag,
|
||||
resumeMessages: WorkflowMessage[] = [],
|
||||
freshPrompt: string | null = null,
|
||||
dryRun = false,
|
||||
@@ -208,16 +205,43 @@ async function runThread(
|
||||
dryRun,
|
||||
);
|
||||
|
||||
let roleRound = roleMessages.length;
|
||||
let nextRole = def.moderator({ kind: "start", start }, roleRound, maxRounds);
|
||||
const steps: Array<{
|
||||
role: string;
|
||||
meta: Record<string, unknown>;
|
||||
content: string;
|
||||
timestamp: number;
|
||||
}> = [];
|
||||
|
||||
if (nextRole === END) {
|
||||
sendThreadEvent(runId, "completed", null);
|
||||
// Rebuild steps from any resumed messages
|
||||
for (const msg of roleMessages) {
|
||||
steps.push({
|
||||
role: msg.role,
|
||||
meta: msg.meta as Record<string, unknown>,
|
||||
content: msg.content,
|
||||
timestamp: msg.timestamp,
|
||||
});
|
||||
}
|
||||
|
||||
if (killFlag.value) {
|
||||
sendThreadEvent(runId, "killed", { exitCode: 137 });
|
||||
return;
|
||||
}
|
||||
|
||||
while (roleRound < maxRounds) {
|
||||
let nextRole = def.moderator({ start, steps });
|
||||
|
||||
if (nextRole === END) {
|
||||
sendThreadEvent(runId, "completed", { exitCode: 0 });
|
||||
return;
|
||||
}
|
||||
|
||||
while (steps.length < maxRounds) {
|
||||
const result = await executeRole(def, nextRole, start, roleMessages, runId);
|
||||
|
||||
if (killFlag.value) {
|
||||
sendThreadEvent(runId, "killed", { exitCode: 137 });
|
||||
return;
|
||||
}
|
||||
|
||||
if (result === null) return;
|
||||
|
||||
const message: WorkflowMessage = {
|
||||
@@ -229,26 +253,22 @@ async function runThread(
|
||||
roleMessages.push(message);
|
||||
sendWorkflowMessage(runId, message);
|
||||
|
||||
roleRound += 1;
|
||||
steps.push({
|
||||
role: nextRole,
|
||||
meta: result.meta,
|
||||
content: result.content,
|
||||
timestamp: message.timestamp,
|
||||
});
|
||||
|
||||
const stepContext: ModeratorContext<RoleMeta> = {
|
||||
kind: "step",
|
||||
step: {
|
||||
role: nextRole,
|
||||
meta: result.meta,
|
||||
content: result.content,
|
||||
timestamp: message.timestamp,
|
||||
},
|
||||
};
|
||||
nextRole = def.moderator(stepContext, roleRound, maxRounds);
|
||||
nextRole = def.moderator({ start, steps });
|
||||
|
||||
if (nextRole === END) {
|
||||
sendThreadEvent(runId, "completed", null);
|
||||
sendThreadEvent(runId, "completed", { exitCode: 0 });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`);
|
||||
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`, 2);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -303,6 +323,7 @@ function handleMessage(
|
||||
raw: unknown,
|
||||
def: WorkflowDefinition<RoleMeta>,
|
||||
inFlight: Map<string, Promise<void>>,
|
||||
killFlags: Map<string, KillFlag>,
|
||||
shuttingDown: { value: boolean },
|
||||
): void {
|
||||
const parseResult = parseParentMessage(raw);
|
||||
@@ -326,15 +347,19 @@ function handleMessage(
|
||||
if (shuttingDown.value) return;
|
||||
const { runId, prompt, maxRounds, dryRun } = msg;
|
||||
|
||||
const killFlag: KillFlag = { value: false };
|
||||
killFlags.set(runId, killFlag);
|
||||
|
||||
const previous = inFlight.get(runId) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.then(() => runThread(def, runId, maxRounds, [], prompt, dryRun))
|
||||
.then(() => runThread(def, runId, maxRounds, killFlag, [], prompt, dryRun))
|
||||
.catch((e: unknown) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendWorkflowError(runId, errMsg);
|
||||
})
|
||||
.finally(() => {
|
||||
inFlight.delete(runId);
|
||||
killFlags.delete(runId);
|
||||
});
|
||||
|
||||
inFlight.set(runId, next);
|
||||
@@ -345,20 +370,32 @@ function handleMessage(
|
||||
if (shuttingDown.value) return;
|
||||
const { runId, messages, maxRounds, dryRun } = msg;
|
||||
|
||||
const killFlag: KillFlag = { value: false };
|
||||
killFlags.set(runId, killFlag);
|
||||
|
||||
const previous = inFlight.get(runId) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.then(() => runThread(def, runId, maxRounds, messages, null, dryRun))
|
||||
.then(() => runThread(def, runId, maxRounds, killFlag, messages, null, dryRun))
|
||||
.catch((e: unknown) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendWorkflowError(runId, errMsg);
|
||||
})
|
||||
.finally(() => {
|
||||
inFlight.delete(runId);
|
||||
killFlags.delete(runId);
|
||||
});
|
||||
|
||||
inFlight.set(runId, next);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "kill-thread") {
|
||||
const flag = killFlags.get(msg.runId);
|
||||
if (flag !== undefined) {
|
||||
flag.value = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -376,12 +413,13 @@ async function bootstrap(nerveRoot: string, workflowName: string): Promise<void>
|
||||
}
|
||||
|
||||
const inFlight = new Map<string, Promise<void>>();
|
||||
const killFlags = new Map<string, KillFlag>();
|
||||
const shuttingDown = { value: false };
|
||||
|
||||
sendReady();
|
||||
|
||||
process.on("message", (raw: unknown) => {
|
||||
handleMessage(raw, def, inFlight, shuttingDown);
|
||||
handleMessage(raw, def, inFlight, killFlags, shuttingDown);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
|
||||
payload: JSON.stringify({ triggerPayload: payload }),
|
||||
timestamp: 1000,
|
||||
},
|
||||
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000 },
|
||||
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
|
||||
);
|
||||
|
||||
const result = store.getTriggerPayload("run-1");
|
||||
@@ -57,7 +57,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
|
||||
payload: null,
|
||||
timestamp: 1000,
|
||||
},
|
||||
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000 },
|
||||
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
|
||||
);
|
||||
|
||||
expect(store.getTriggerPayload("run-2")).toBeNull();
|
||||
@@ -148,7 +148,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
|
||||
payload: JSON.stringify({ triggerPayload: {} }),
|
||||
timestamp: 1000,
|
||||
},
|
||||
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000 },
|
||||
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
|
||||
);
|
||||
store.append({
|
||||
source: "workflow",
|
||||
|
||||
@@ -31,6 +31,7 @@ describe("LogStore — workflow_runs", () => {
|
||||
workflow: "cleanup",
|
||||
status: "started",
|
||||
timestamp: 1000,
|
||||
exitCode: null,
|
||||
};
|
||||
|
||||
const entry = store.upsertWorkflowRun(
|
||||
@@ -53,12 +54,18 @@ describe("LogStore — workflow_runs", () => {
|
||||
it("updates existing workflow_runs row on upsert (status transition)", () => {
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "started", refId: "run-2", payload: null, timestamp: 1000 },
|
||||
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000 },
|
||||
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000, exitCode: null },
|
||||
);
|
||||
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "completed", refId: "run-2", payload: null, timestamp: 2000 },
|
||||
{ runId: "run-2", workflow: "cleanup", status: "completed", timestamp: 2000 },
|
||||
{
|
||||
runId: "run-2",
|
||||
workflow: "cleanup",
|
||||
status: "completed",
|
||||
timestamp: 2000,
|
||||
exitCode: null,
|
||||
},
|
||||
);
|
||||
|
||||
const stored = store.getWorkflowRun("run-2");
|
||||
@@ -79,7 +86,7 @@ describe("LogStore — workflow_runs", () => {
|
||||
] as const) {
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type, refId: "run-3", payload: null, timestamp },
|
||||
{ runId: "run-3", workflow: "cleanup", status, timestamp },
|
||||
{ runId: "run-3", workflow: "cleanup", status, timestamp, exitCode: null },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -98,11 +105,23 @@ describe("LogStore — workflow_runs", () => {
|
||||
it("returns the latest state after multiple upserts", () => {
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "queued", refId: "run-4", payload: null, timestamp: 100 },
|
||||
{ runId: "run-4", workflow: "code-review", status: "queued", timestamp: 100 },
|
||||
{
|
||||
runId: "run-4",
|
||||
workflow: "code-review",
|
||||
status: "queued",
|
||||
timestamp: 100,
|
||||
exitCode: null,
|
||||
},
|
||||
);
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "started", refId: "run-4", payload: null, timestamp: 200 },
|
||||
{ runId: "run-4", workflow: "code-review", status: "started", timestamp: 200 },
|
||||
{
|
||||
runId: "run-4",
|
||||
workflow: "code-review",
|
||||
status: "started",
|
||||
timestamp: 200,
|
||||
exitCode: null,
|
||||
},
|
||||
);
|
||||
|
||||
const run = store.getWorkflowRun("run-4");
|
||||
@@ -115,19 +134,19 @@ describe("LogStore — workflow_runs", () => {
|
||||
beforeEach(() => {
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "queued", refId: "r1", payload: null, timestamp: 100 },
|
||||
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100 },
|
||||
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100, exitCode: null },
|
||||
);
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "started", refId: "r2", payload: null, timestamp: 200 },
|
||||
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200 },
|
||||
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200, exitCode: null },
|
||||
);
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "completed", refId: "r3", payload: null, timestamp: 300 },
|
||||
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300 },
|
||||
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300, exitCode: null },
|
||||
);
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: "failed", refId: "r4", payload: null, timestamp: 400 },
|
||||
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400 },
|
||||
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400, exitCode: null },
|
||||
);
|
||||
});
|
||||
|
||||
@@ -171,13 +190,13 @@ describe("LogStore — workflow_runs", () => {
|
||||
});
|
||||
|
||||
describe("all statuses are storable", () => {
|
||||
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
|
||||
it.each(["queued", "started", "completed", "failed", "crashed", "dropped", "killed"] as const)(
|
||||
"stores status=%s",
|
||||
(status) => {
|
||||
const runId = `run-${status}`;
|
||||
store.upsertWorkflowRun(
|
||||
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: 1 },
|
||||
{ runId, workflow: "test", status, timestamp: 1 },
|
||||
{ runId, workflow: "test", status, timestamp: 1, exitCode: null },
|
||||
);
|
||||
expect(store.getWorkflowRun(runId)?.status).toBe(status);
|
||||
},
|
||||
|
||||
@@ -58,7 +58,8 @@ export type WorkflowRunStatus =
|
||||
| "failed"
|
||||
| "crashed"
|
||||
| "dropped"
|
||||
| "interrupted";
|
||||
| "interrupted"
|
||||
| "killed";
|
||||
|
||||
const VALID_WORKFLOW_STATUSES = new Set<string>([
|
||||
"queued",
|
||||
@@ -68,6 +69,7 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
|
||||
"crashed",
|
||||
"dropped",
|
||||
"interrupted",
|
||||
"killed",
|
||||
]);
|
||||
|
||||
function isWorkflowRunStatus(value: string): value is WorkflowRunStatus {
|
||||
@@ -87,6 +89,7 @@ export type WorkflowRun = {
|
||||
workflow: string;
|
||||
status: WorkflowRunStatus;
|
||||
timestamp: number;
|
||||
exitCode: number | null;
|
||||
};
|
||||
|
||||
/** One role-produced workflow-message row with 1-based round index (ROW_NUMBER over role messages only). */
|
||||
@@ -192,10 +195,11 @@ CREATE TABLE IF NOT EXISTS meta (
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS workflow_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
workflow TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
timestamp INTEGER NOT NULL
|
||||
run_id TEXT PRIMARY KEY,
|
||||
workflow TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
exit_code INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
|
||||
@@ -332,6 +336,13 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
sqlite.exec("PRAGMA journal_mode=WAL");
|
||||
sqlite.exec(SCHEMA_SQL);
|
||||
|
||||
// Migration: add exit_code column for existing databases
|
||||
try {
|
||||
sqlite.exec("ALTER TABLE workflow_runs ADD COLUMN exit_code INTEGER");
|
||||
} catch {
|
||||
// Column already exists — safe to ignore
|
||||
}
|
||||
|
||||
const insertStmt = sqlite.prepare(
|
||||
"INSERT INTO logs (source, type, ref_id, payload, timestamp) VALUES (@source, @type, @refId, @payload, @timestamp)",
|
||||
);
|
||||
@@ -342,11 +353,11 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
);
|
||||
|
||||
const upsertWorkflowRunStmt = sqlite.prepare(
|
||||
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp) VALUES (@runId, @workflow, @status, @timestamp)",
|
||||
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp, exit_code) VALUES (@runId, @workflow, @status, @timestamp, @exitCode)",
|
||||
);
|
||||
|
||||
const getWorkflowRunStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE run_id = ?",
|
||||
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE run_id = ?",
|
||||
);
|
||||
|
||||
const getTriggerPayloadStmt = sqlite.prepare(
|
||||
@@ -386,19 +397,19 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
);
|
||||
|
||||
const getActiveWorkflowRunsStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
|
||||
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
|
||||
);
|
||||
|
||||
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
|
||||
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
|
||||
);
|
||||
|
||||
const getAllWorkflowRunsStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, timestamp FROM workflow_runs ORDER BY timestamp DESC",
|
||||
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs ORDER BY timestamp DESC",
|
||||
);
|
||||
|
||||
const getAllWorkflowRunsByNameStmt = sqlite.prepare(
|
||||
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
|
||||
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
|
||||
);
|
||||
|
||||
const minLogTsStmt = sqlite.prepare("SELECT MIN(timestamp) AS m FROM logs");
|
||||
@@ -423,6 +434,7 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
workflow: run.workflow,
|
||||
status: run.status,
|
||||
timestamp: run.timestamp,
|
||||
exitCode: run.exitCode,
|
||||
});
|
||||
return { ...entry, id: Number(info.lastInsertRowid) };
|
||||
});
|
||||
@@ -504,31 +516,37 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
return upsertWorkflowRunTx(entry, run);
|
||||
}
|
||||
|
||||
function getWorkflowRun(runId: string): WorkflowRun | null {
|
||||
const row = getWorkflowRunStmt.get(runId) as
|
||||
| { run_id: string; workflow: string; status: string; timestamp: number }
|
||||
| undefined;
|
||||
if (row === undefined) return null;
|
||||
type SqlWorkflowRunRow = {
|
||||
run_id: string;
|
||||
workflow: string;
|
||||
status: string;
|
||||
timestamp: number;
|
||||
exit_code: number | null;
|
||||
};
|
||||
|
||||
function mapWorkflowRunRow(r: SqlWorkflowRunRow): WorkflowRun {
|
||||
return {
|
||||
runId: row.run_id,
|
||||
workflow: row.workflow,
|
||||
status: validateWorkflowRunStatus(row.status),
|
||||
timestamp: row.timestamp,
|
||||
runId: r.run_id,
|
||||
workflow: r.workflow,
|
||||
status: validateWorkflowRunStatus(r.status),
|
||||
timestamp: r.timestamp,
|
||||
exitCode: r.exit_code ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
function getWorkflowRun(runId: string): WorkflowRun | null {
|
||||
const row = getWorkflowRunStmt.get(runId) as SqlWorkflowRunRow | undefined;
|
||||
if (row === undefined) return null;
|
||||
return mapWorkflowRunRow(row);
|
||||
}
|
||||
|
||||
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
|
||||
const rows = (
|
||||
workflowName !== undefined
|
||||
? getActiveWorkflowRunsByNameStmt.all(workflowName)
|
||||
: getActiveWorkflowRunsStmt.all()
|
||||
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
|
||||
return rows.map((r) => ({
|
||||
runId: r.run_id,
|
||||
workflow: r.workflow,
|
||||
status: validateWorkflowRunStatus(r.status),
|
||||
timestamp: r.timestamp,
|
||||
}));
|
||||
) as SqlWorkflowRunRow[];
|
||||
return rows.map(mapWorkflowRunRow);
|
||||
}
|
||||
|
||||
function getAllWorkflowRuns(workflowName: string | null): WorkflowRun[] {
|
||||
@@ -536,13 +554,8 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
workflowName !== null
|
||||
? getAllWorkflowRunsByNameStmt.all(workflowName)
|
||||
: getAllWorkflowRunsStmt.all()
|
||||
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
|
||||
return rows.map((r) => ({
|
||||
runId: r.run_id,
|
||||
workflow: r.workflow,
|
||||
status: validateWorkflowRunStatus(r.status),
|
||||
timestamp: r.timestamp,
|
||||
}));
|
||||
) as SqlWorkflowRunRow[];
|
||||
return rows.map(mapWorkflowRunRow);
|
||||
}
|
||||
|
||||
function getTriggerPayload(runId: string): unknown {
|
||||
|
||||
Reference in New Issue
Block a user