fix(http-api): bind 127.0.0.1, support trigger body params, fix kill-workflow fields #136
@@ -192,6 +192,7 @@ describe("buildListOutput", () => {
|
||||
// header + 2 run lines
|
||||
expect(lines).toHaveLength(3);
|
||||
expect(paginationHint).not.toBeNull();
|
||||
expect(paginationHint).toContain("nerve workflow runs");
|
||||
expect(paginationHint).toContain("--offset 2");
|
||||
expect(paginationHint).toContain("3 more");
|
||||
});
|
||||
@@ -298,7 +299,7 @@ describe("buildInspectOutput", () => {
|
||||
// Integration: getAllWorkflowRuns + buildListOutput end-to-end with real store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("workflow list — integration with real store", () => {
|
||||
describe("workflow runs list — integration with real store", () => {
|
||||
it("lists active runs from the store", () => {
|
||||
upsertRun("r1", "cleanup", "started", 1000);
|
||||
upsertRun("r2", "cleanup", "queued", 2000);
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
|
||||
import {
|
||||
type ForegroundSessionOptions,
|
||||
runForegroundKernelSession,
|
||||
} from "../run-foreground-kernel.js";
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
@@ -9,9 +12,25 @@ export const devCommand = defineCommand({
|
||||
name: "dev",
|
||||
description: "Run the nerve kernel in the foreground (development mode)",
|
||||
},
|
||||
async run() {
|
||||
args: {
|
||||
port: {
|
||||
type: "string",
|
||||
description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.",
|
||||
default: "",
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
const { createKernel } = await loadDaemonModule(nerveRoot);
|
||||
await runForegroundKernelSession(nerveRoot, createKernel);
|
||||
let sessionOpts: ForegroundSessionOptions = {};
|
||||
if (args.port.length > 0) {
|
||||
const n = Number.parseInt(args.port, 10);
|
||||
if (Number.isNaN(n) || n < 1 || n > 65_535) {
|
||||
process.stderr.write(`❌ Invalid --port: ${args.port}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
sessionOpts = { httpApiPortOverride: n };
|
||||
}
|
||||
await runForegroundKernelSession(nerveRoot, createKernel, sessionOpts);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { createWriteStream, existsSync } from "node:fs";
|
||||
import { createWriteStream, existsSync, readFileSync } from "node:fs";
|
||||
import { mkdir } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import {
|
||||
@@ -56,7 +57,7 @@ function daemonBootstrapScript(): string {
|
||||
);
|
||||
}
|
||||
|
||||
async function runDaemon(nerveRoot: string): Promise<void> {
|
||||
async function runDaemon(nerveRoot: string, cliHttpPort: number | null): Promise<void> {
|
||||
if (isRunning()) {
|
||||
const pid = readPidFile();
|
||||
process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`);
|
||||
@@ -74,12 +75,27 @@ async function runDaemon(nerveRoot: string): Promise<void> {
|
||||
|
||||
const bootstrapPath = daemonBootstrapScript();
|
||||
|
||||
const configPath = join(nerveRoot, "nerve.yaml");
|
||||
let yamlApiPort: number | null = null;
|
||||
try {
|
||||
const raw = readFileSync(configPath, "utf8");
|
||||
const parsed = parseNerveConfig(raw);
|
||||
if (parsed.ok) yamlApiPort = parsed.value.api.port;
|
||||
} catch {
|
||||
// kernel bootstrap will surface a clearer error if config is missing
|
||||
}
|
||||
const resolvedHttpPort = cliHttpPort ?? yamlApiPort;
|
||||
const env: NodeJS.ProcessEnv = { ...process.env, NERVE_ROOT: nerveRoot };
|
||||
if (resolvedHttpPort !== null && resolvedHttpPort > 0) {
|
||||
env.NERVE_API_PORT = String(resolvedHttpPort);
|
||||
}
|
||||
|
||||
// After `open`, file-backed WriteStream has a numeric OS fd for spawn stdio; `@types/node` omits `fd` on this WriteStream alias.
|
||||
const logFd = (logStream as unknown as { fd: number }).fd;
|
||||
const child = spawn(process.execPath, [bootstrapPath], {
|
||||
detached: true,
|
||||
stdio: ["ignore", logFd, logFd],
|
||||
env: { ...process.env, NERVE_ROOT: nerveRoot },
|
||||
env,
|
||||
cwd: nerveRoot,
|
||||
});
|
||||
|
||||
@@ -109,8 +125,8 @@ async function runDaemon(nerveRoot: string): Promise<void> {
|
||||
}
|
||||
|
||||
/** Background daemon only — use `nerve dev` for foreground mode. */
|
||||
export async function runDaemonStartCommand(): Promise<void> {
|
||||
await runDaemon(getNerveRoot());
|
||||
export async function runDaemonStartCommand(cliHttpPort: number | null = null): Promise<void> {
|
||||
await runDaemon(getNerveRoot(), cliHttpPort);
|
||||
}
|
||||
|
||||
export const daemonStartCommand = defineCommand({
|
||||
@@ -118,7 +134,23 @@ export const daemonStartCommand = defineCommand({
|
||||
name: "start",
|
||||
description: "Start the nerve daemon in the background",
|
||||
},
|
||||
async run() {
|
||||
await runDaemonStartCommand();
|
||||
args: {
|
||||
port: {
|
||||
type: "string",
|
||||
description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.",
|
||||
default: "",
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
let cliHttpPort: number | null = null;
|
||||
if (args.port.length > 0) {
|
||||
const n = Number.parseInt(args.port, 10);
|
||||
if (Number.isNaN(n) || n < 1 || n > 65_535) {
|
||||
process.stderr.write(`❌ Invalid --port: ${args.port}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
cliHttpPort = n;
|
||||
}
|
||||
await runDaemonStartCommand(cliHttpPort);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -7,7 +7,12 @@ import { defineCommand } from "citty";
|
||||
import { stringify } from "yaml";
|
||||
|
||||
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
|
||||
import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js";
|
||||
import {
|
||||
killWorkflowViaDaemon,
|
||||
listWorkflowsViaDaemon,
|
||||
triggerWorkflowViaDaemon,
|
||||
} from "../daemon-client.js";
|
||||
import { formatRowsAsAlignedTable } from "../sense-sqlite.js";
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
|
||||
|
||||
@@ -125,7 +130,7 @@ export function buildListOutput(
|
||||
const allFlagStr = allFlag ? " --all" : "";
|
||||
paginationHint =
|
||||
`\n⏩ ${remaining} more run(s) not shown. Fetch next page:\n` +
|
||||
` nerve workflow list --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
|
||||
` nerve workflow runs --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
|
||||
}
|
||||
|
||||
return { lines, paginationHint };
|
||||
@@ -298,13 +303,61 @@ export function buildThreadCommandOutput(
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow list
|
||||
// nerve workflow list (daemon — registered workflows + queue depth)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const workflowListCommand = defineCommand({
|
||||
const workflowDaemonListCommand = defineCommand({
|
||||
meta: {
|
||||
name: "list",
|
||||
description: "List active (queued/started) workflow runs",
|
||||
description: "List workflows from the running daemon (concurrency, active, queued)",
|
||||
},
|
||||
async run() {
|
||||
if (!isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: Awaited<ReturnType<typeof listWorkflowsViaDaemon>>;
|
||||
try {
|
||||
response = await listWorkflowsViaDaemon(socketPath);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
process.stderr.write(`❌ Daemon error: ${response.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const rows = response.workflows.map((w) => ({
|
||||
name: w.name,
|
||||
active: w.activeThreads,
|
||||
queued: w.queuedThreads,
|
||||
concurrency: w.config.concurrency,
|
||||
overflow: w.config.overflow,
|
||||
}));
|
||||
|
||||
if (rows.length === 0) {
|
||||
process.stdout.write("📭 No workflows in nerve.yaml (or empty registry).\n");
|
||||
return;
|
||||
}
|
||||
|
||||
process.stdout.write(`📋 Workflows (${String(rows.length)}):\n\n`);
|
||||
process.stdout.write(formatRowsAsAlignedTable(rows));
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve workflow runs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const workflowRunsCommand = defineCommand({
|
||||
meta: {
|
||||
name: "runs",
|
||||
description: "List active (queued/started) workflow runs from logs",
|
||||
},
|
||||
args: {
|
||||
all: {
|
||||
@@ -562,7 +615,7 @@ const workflowTriggerCommand = defineCommand({
|
||||
}
|
||||
|
||||
process.stdout.write(`✅ Triggered workflow "${args.name}" via daemon.\n`);
|
||||
process.stdout.write("\n💡 Inspect active runs with: nerve workflow list\n");
|
||||
process.stdout.write("\n💡 Inspect active runs with: nerve workflow runs\n");
|
||||
},
|
||||
});
|
||||
|
||||
@@ -616,7 +669,8 @@ export const workflowCommand = defineCommand({
|
||||
description: "Manage and inspect workflow runs",
|
||||
},
|
||||
subCommands: {
|
||||
list: workflowListCommand,
|
||||
list: workflowDaemonListCommand,
|
||||
runs: workflowRunsCommand,
|
||||
inspect: workflowInspectCommand,
|
||||
thread: workflowThreadCommand,
|
||||
trigger: workflowTriggerCommand,
|
||||
|
||||
@@ -10,11 +10,16 @@ import type { Socket } from "node:net";
|
||||
|
||||
import type {
|
||||
DaemonIpcListSensesResponse,
|
||||
DaemonIpcListWorkflowsResponse,
|
||||
DaemonIpcRequest,
|
||||
DaemonIpcTriggerResponse,
|
||||
DaemonTransport,
|
||||
DaemonTransportTriggerResult,
|
||||
HealthInfo,
|
||||
SenseInfo,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { isPlainRecord } from "@uncaged/nerve-core";
|
||||
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
const CONNECT_TIMEOUT_MS = 3_000;
|
||||
const RESPONSE_TIMEOUT_MS = 5_000;
|
||||
@@ -32,6 +37,19 @@ function isSenseInfo(value: unknown): value is SenseInfo {
|
||||
);
|
||||
}
|
||||
|
||||
function isWorkflowStatus(value: unknown): value is WorkflowStatus {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
const cfg = value.config;
|
||||
if (!isPlainRecord(cfg)) return false;
|
||||
return (
|
||||
typeof value.name === "string" &&
|
||||
typeof value.activeThreads === "number" &&
|
||||
typeof value.queuedThreads === "number" &&
|
||||
typeof cfg.concurrency === "number" &&
|
||||
typeof cfg.overflow === "string"
|
||||
);
|
||||
}
|
||||
|
||||
function parseDaemonResponse(line: string): DaemonIpcTriggerResponse {
|
||||
try {
|
||||
const obj: unknown = JSON.parse(line);
|
||||
@@ -62,6 +80,51 @@ function parseListSensesResponse(line: string): DaemonIpcListSensesResponse {
|
||||
return { ok: false, error: `Unexpected daemon response: ${line}` };
|
||||
}
|
||||
|
||||
function parseListWorkflowsResponse(line: string): DaemonIpcListWorkflowsResponse {
|
||||
try {
|
||||
const obj: unknown = JSON.parse(line);
|
||||
if (isPlainRecord(obj)) {
|
||||
const r = obj;
|
||||
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
|
||||
if (r.ok === true && Array.isArray(r.workflows) && r.workflows.every(isWorkflowStatus)) {
|
||||
return { ok: true, workflows: r.workflows };
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// fall through
|
||||
}
|
||||
return { ok: false, error: `Unexpected daemon response: ${line}` };
|
||||
}
|
||||
|
||||
function parseHealthResponse(line: string): HealthInfo | null {
|
||||
try {
|
||||
const obj: unknown = JSON.parse(line);
|
||||
if (!isPlainRecord(obj)) return null;
|
||||
const r = obj;
|
||||
if (r.ok === true && isPlainRecord(r.health)) {
|
||||
const h = r.health;
|
||||
if (
|
||||
typeof h.ok === "boolean" &&
|
||||
typeof h.version === "string" &&
|
||||
typeof h.uptime === "number" &&
|
||||
typeof h.startedAt === "string" &&
|
||||
typeof h.hostname === "string"
|
||||
) {
|
||||
return {
|
||||
ok: h.ok,
|
||||
version: h.version,
|
||||
uptime: h.uptime,
|
||||
startedAt: h.startedAt,
|
||||
hostname: h.hostname,
|
||||
};
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// fall through
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the daemon socket, send one JSON request (newline-terminated),
|
||||
* and resolve with the first non-empty line parsed by `parseFirstLine`.
|
||||
@@ -126,6 +189,72 @@ function sendAndReceive<T>(
|
||||
});
|
||||
}
|
||||
|
||||
/** Unix-socket implementation of {@link DaemonTransport} (local daemon). */
|
||||
export class UnixTransport implements DaemonTransport {
|
||||
readonly socketPath: string;
|
||||
constructor(socketPath: string) {
|
||||
this.socketPath = socketPath;
|
||||
}
|
||||
|
||||
async health(): Promise<HealthInfo> {
|
||||
const parsed = await sendAndReceive(this.socketPath, { type: "health" }, (line) =>
|
||||
parseHealthResponse(line),
|
||||
);
|
||||
if (parsed === null) {
|
||||
throw new Error("Unexpected daemon response for health");
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
async listSenses(): Promise<SenseInfo[]> {
|
||||
const r = await sendAndReceive(
|
||||
this.socketPath,
|
||||
{ type: "list-senses" },
|
||||
parseListSensesResponse,
|
||||
);
|
||||
if (!r.ok) {
|
||||
throw new Error(r.error);
|
||||
}
|
||||
return r.senses;
|
||||
}
|
||||
|
||||
async listWorkflows(): Promise<WorkflowStatus[]> {
|
||||
const r = await sendAndReceive(
|
||||
this.socketPath,
|
||||
{ type: "list-workflows" },
|
||||
parseListWorkflowsResponse,
|
||||
);
|
||||
if (!r.ok) {
|
||||
throw new Error(r.error);
|
||||
}
|
||||
return r.workflows;
|
||||
}
|
||||
|
||||
async triggerSense(name: string): Promise<DaemonTransportTriggerResult> {
|
||||
return sendAndReceive(
|
||||
this.socketPath,
|
||||
{ type: "trigger-sense", sense: name },
|
||||
parseDaemonResponse,
|
||||
);
|
||||
}
|
||||
|
||||
async triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult> {
|
||||
const message: DaemonIpcRequest = {
|
||||
type: "trigger-workflow",
|
||||
workflow: name,
|
||||
prompt: "",
|
||||
maxRounds: DEFAULT_ENGINE_MAX_ROUNDS,
|
||||
dryRun: false,
|
||||
};
|
||||
return sendAndReceive(this.socketPath, message, parseDaemonResponse);
|
||||
}
|
||||
|
||||
async killWorkflow(runId: string): Promise<DaemonTransportTriggerResult> {
|
||||
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
|
||||
return sendAndReceive(this.socketPath, message, parseDaemonResponse);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a trigger-workflow message to the running daemon via its Unix socket.
|
||||
* Resolves with the daemon's response or rejects on connection/timeout errors.
|
||||
@@ -168,6 +297,16 @@ export function listSensesViaDaemon(socketPath: string): Promise<DaemonIpcListSe
|
||||
return sendAndReceive(socketPath, message, parseListSensesResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a list-workflows message to the running daemon via its Unix socket.
|
||||
*/
|
||||
export function listWorkflowsViaDaemon(
|
||||
socketPath: string,
|
||||
): Promise<DaemonIpcListWorkflowsResponse> {
|
||||
const message: DaemonIpcRequest = { type: "list-workflows" };
|
||||
return sendAndReceive(socketPath, message, parseListWorkflowsResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a kill-workflow message to the running daemon via its Unix socket.
|
||||
* Resolves with the daemon's response or rejects on connection/timeout errors.
|
||||
|
||||
@@ -9,7 +9,11 @@ import { getSocketPath } from "./workspace.js";
|
||||
export type CreateKernelFn = (
|
||||
config: NerveConfig,
|
||||
nerveRoot: string,
|
||||
opts: { enableFileWatcher: boolean; ipcSocketPath: string },
|
||||
opts: {
|
||||
enableFileWatcher: boolean;
|
||||
ipcSocketPath: string;
|
||||
httpApiPortOverride?: number | null;
|
||||
},
|
||||
) => {
|
||||
groups: Set<string>;
|
||||
ready: Promise<void>;
|
||||
@@ -28,9 +32,23 @@ function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
|
||||
return parseNerveConfig(raw);
|
||||
}
|
||||
|
||||
function parseEnvHttpPort(): number | null {
|
||||
const envPort = process.env.NERVE_API_PORT;
|
||||
if (envPort === undefined || envPort === "") return null;
|
||||
const n = Number.parseInt(envPort, 10);
|
||||
if (Number.isNaN(n) || n < 1 || n > 65_535) return null;
|
||||
return n;
|
||||
}
|
||||
|
||||
export type ForegroundSessionOptions = {
|
||||
/** Positive integer overrides `nerve.yaml` `api.port` (same precedence as `NERVE_API_PORT`). */
|
||||
httpApiPortOverride?: number | null;
|
||||
};
|
||||
|
||||
export async function runForegroundKernelSession(
|
||||
nerveRoot: string,
|
||||
createKernel: CreateKernelFn,
|
||||
sessionOpts: ForegroundSessionOptions = {},
|
||||
): Promise<void> {
|
||||
const configResult = readConfig(nerveRoot);
|
||||
if (!configResult.ok) {
|
||||
@@ -39,10 +57,23 @@ export async function runForegroundKernelSession(
|
||||
}
|
||||
|
||||
const config = configResult.value;
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
const envOverride = parseEnvHttpPort();
|
||||
const cliOverride = sessionOpts.httpApiPortOverride ?? null;
|
||||
const resolvedOverride =
|
||||
cliOverride !== null && cliOverride > 0
|
||||
? cliOverride
|
||||
: envOverride !== null && envOverride > 0
|
||||
? envOverride
|
||||
: null;
|
||||
|
||||
const kernelBase = {
|
||||
enableFileWatcher: true,
|
||||
ipcSocketPath: getSocketPath(),
|
||||
});
|
||||
};
|
||||
const kernel =
|
||||
resolvedOverride !== null
|
||||
? createKernel(config, nerveRoot, { ...kernelBase, httpApiPortOverride: resolvedOverride })
|
||||
: createKernel(config, nerveRoot, kernelBase);
|
||||
|
||||
const senseNames = Object.keys(config.senses);
|
||||
const groups = [...kernel.groups];
|
||||
|
||||
@@ -34,7 +34,11 @@ export type DaemonModule = {
|
||||
createKernel: (
|
||||
config: NerveConfig,
|
||||
nerveRoot: string,
|
||||
options: { enableFileWatcher: boolean; ipcSocketPath: string },
|
||||
options: {
|
||||
enableFileWatcher: boolean;
|
||||
ipcSocketPath: string;
|
||||
httpApiPortOverride?: number | null;
|
||||
},
|
||||
) => {
|
||||
groups: Set<string>;
|
||||
ready: Promise<void>;
|
||||
|
||||
@@ -63,6 +63,7 @@ describe("parseNerveConfig", () => {
|
||||
overflow: "queue",
|
||||
maxQueue: 10,
|
||||
});
|
||||
expect(result.value.api).toEqual({ port: null });
|
||||
});
|
||||
|
||||
it("parses config with empty reflexes array", () => {
|
||||
@@ -170,9 +171,39 @@ workflows:
|
||||
maxQueue: 100,
|
||||
});
|
||||
});
|
||||
|
||||
it("parses api.port when present", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.api).toEqual({ port: 9800 });
|
||||
});
|
||||
});
|
||||
|
||||
describe("invalid configs", () => {
|
||||
it("returns error when api.port is out of range", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 99999
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/api\.port/);
|
||||
});
|
||||
|
||||
it("returns error on bad YAML syntax", () => {
|
||||
const result = parseNerveConfig("senses: [\\nunclosed");
|
||||
expect(result.ok).toBe(false);
|
||||
|
||||
@@ -55,7 +55,7 @@ describe("parseDaemonIpcRequest", () => {
|
||||
).toBeNull();
|
||||
});
|
||||
|
||||
it("parses trigger-sense and list-senses", () => {
|
||||
it("parses trigger-sense, list-senses, list-workflows, and health", () => {
|
||||
expect(parseDaemonIpcRequest(JSON.stringify({ type: "trigger-sense", sense: "x" }))).toEqual({
|
||||
type: "trigger-sense",
|
||||
sense: "x",
|
||||
@@ -63,6 +63,12 @@ describe("parseDaemonIpcRequest", () => {
|
||||
expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-senses" }))).toEqual({
|
||||
type: "list-senses",
|
||||
});
|
||||
expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-workflows" }))).toEqual({
|
||||
type: "list-workflows",
|
||||
});
|
||||
expect(parseDaemonIpcRequest(JSON.stringify({ type: "health" }))).toEqual({
|
||||
type: "health",
|
||||
});
|
||||
});
|
||||
|
||||
it("returns null for invalid JSON or unknown type", () => {
|
||||
|
||||
@@ -28,10 +28,16 @@ export type QueueOverflowConfig = {
|
||||
|
||||
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
|
||||
|
||||
/** Optional HTTP control plane (Phase 1: no auth). When `port` is null, the HTTP server is not started. */
|
||||
export type NerveApiConfig = {
|
||||
port: number | null;
|
||||
};
|
||||
|
||||
export type NerveConfig = {
|
||||
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
||||
maxRounds: number;
|
||||
senses: Record<string, SenseConfig>;
|
||||
reflexes: ReflexConfig[];
|
||||
workflows: Record<string, WorkflowConfig>;
|
||||
api: NerveApiConfig;
|
||||
};
|
||||
|
||||
@@ -7,6 +7,23 @@
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { SenseInfo } from "./sense.js";
|
||||
|
||||
/** Runtime status of a registered workflow (for listing / observability). */
|
||||
export type WorkflowStatus = {
|
||||
name: string;
|
||||
activeThreads: number;
|
||||
queuedThreads: number;
|
||||
config: { concurrency: number; overflow: string };
|
||||
};
|
||||
|
||||
/** Public health payload for HTTP / IPC. */
|
||||
export type HealthInfo = {
|
||||
ok: boolean;
|
||||
version: string;
|
||||
uptime: number;
|
||||
startedAt: string;
|
||||
hostname: string;
|
||||
};
|
||||
|
||||
/** Client → daemon: start a workflow run. */
|
||||
export type DaemonIpcTriggerWorkflowRequest = {
|
||||
type: "trigger-workflow";
|
||||
@@ -33,12 +50,24 @@ export type DaemonIpcKillWorkflowRequest = {
|
||||
runId: string;
|
||||
};
|
||||
|
||||
/** Client → daemon: list registered workflows and queue/active counts. */
|
||||
export type DaemonIpcListWorkflowsRequest = {
|
||||
type: "list-workflows";
|
||||
};
|
||||
|
||||
/** Client → daemon: public health snapshot. */
|
||||
export type DaemonIpcHealthRequest = {
|
||||
type: "health";
|
||||
};
|
||||
|
||||
/** Union of all JSON requests the daemon IPC server accepts. */
|
||||
export type DaemonIpcRequest =
|
||||
| DaemonIpcTriggerWorkflowRequest
|
||||
| DaemonIpcTriggerSenseRequest
|
||||
| DaemonIpcListSensesRequest
|
||||
| DaemonIpcKillWorkflowRequest;
|
||||
| DaemonIpcKillWorkflowRequest
|
||||
| DaemonIpcListWorkflowsRequest
|
||||
| DaemonIpcHealthRequest;
|
||||
|
||||
/** Successful trigger / trigger-sense reply (no body). */
|
||||
export type DaemonIpcTriggerOkResponse = { ok: true };
|
||||
@@ -53,11 +82,21 @@ export type DaemonIpcListSensesResponse =
|
||||
| { ok: true; senses: SenseInfo[] }
|
||||
| DaemonIpcErrorResponse;
|
||||
|
||||
/** Reply for list-workflows. */
|
||||
export type DaemonIpcListWorkflowsResponse =
|
||||
| { ok: true; workflows: WorkflowStatus[] }
|
||||
| DaemonIpcErrorResponse;
|
||||
|
||||
/** Reply for health. */
|
||||
export type DaemonIpcHealthResponse = { ok: true; health: HealthInfo } | DaemonIpcErrorResponse;
|
||||
|
||||
/** Any JSON response the daemon may write on the IPC socket. */
|
||||
export type DaemonIpcResponse =
|
||||
| DaemonIpcTriggerOkResponse
|
||||
| DaemonIpcErrorResponse
|
||||
| { ok: true; senses: SenseInfo[] };
|
||||
| DaemonIpcListSensesResponse
|
||||
| DaemonIpcListWorkflowsResponse
|
||||
| DaemonIpcHealthResponse;
|
||||
|
||||
function parseTriggerWorkflowFields(
|
||||
req: Record<string, unknown>,
|
||||
@@ -98,6 +137,12 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
|
||||
if (typeof req.runId !== "string" || req.runId.length === 0) return null;
|
||||
return { type: "kill-workflow", runId: req.runId };
|
||||
}
|
||||
if (req.type === "list-workflows") {
|
||||
return { type: "list-workflows" };
|
||||
}
|
||||
if (req.type === "health") {
|
||||
return { type: "health" };
|
||||
}
|
||||
return null;
|
||||
} catch {
|
||||
return null;
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import type { HealthInfo, WorkflowStatus } from "./daemon-ipc-protocol.js";
|
||||
import type { SenseInfo } from "./sense.js";
|
||||
|
||||
export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string };
|
||||
|
||||
/**
|
||||
* Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2).
|
||||
* Implementations live in CLI / tools; the daemon kernel uses shared handler logic.
|
||||
*/
|
||||
export type DaemonTransport = {
|
||||
health(): Promise<HealthInfo>;
|
||||
listSenses(): Promise<SenseInfo[]>;
|
||||
listWorkflows(): Promise<WorkflowStatus[]>;
|
||||
triggerSense(name: string): Promise<DaemonTransportTriggerResult>;
|
||||
triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult>;
|
||||
/** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */
|
||||
killWorkflow(runId: string): Promise<DaemonTransportTriggerResult>;
|
||||
};
|
||||
@@ -5,6 +5,7 @@ export type {
|
||||
DropOverflowConfig,
|
||||
QueueOverflowConfig,
|
||||
WorkflowConfig,
|
||||
NerveApiConfig,
|
||||
NerveConfig,
|
||||
} from "./config.js";
|
||||
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
|
||||
@@ -35,15 +36,22 @@ export {
|
||||
} from "./sense-workflow-directive.js";
|
||||
|
||||
export type {
|
||||
WorkflowStatus,
|
||||
HealthInfo,
|
||||
DaemonIpcTriggerWorkflowRequest,
|
||||
DaemonIpcTriggerSenseRequest,
|
||||
DaemonIpcListSensesRequest,
|
||||
DaemonIpcKillWorkflowRequest,
|
||||
DaemonIpcListWorkflowsRequest,
|
||||
DaemonIpcHealthRequest,
|
||||
DaemonIpcRequest,
|
||||
DaemonIpcTriggerOkResponse,
|
||||
DaemonIpcErrorResponse,
|
||||
DaemonIpcTriggerResponse,
|
||||
DaemonIpcListSensesResponse,
|
||||
DaemonIpcListWorkflowsResponse,
|
||||
DaemonIpcHealthResponse,
|
||||
DaemonIpcResponse,
|
||||
} from "./daemon-ipc-protocol.js";
|
||||
export { parseDaemonIpcRequest } from "./daemon-ipc-protocol.js";
|
||||
export type { DaemonTransport, DaemonTransportTriggerResult } from "./daemon-transport.js";
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
import { parse } from "yaml";
|
||||
|
||||
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./config.js";
|
||||
import type {
|
||||
NerveApiConfig,
|
||||
NerveConfig,
|
||||
ReflexConfig,
|
||||
SenseConfig,
|
||||
WorkflowConfig,
|
||||
} from "./config.js";
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { Result } from "./result.js";
|
||||
import { err, ok } from "./result.js";
|
||||
@@ -245,6 +251,28 @@ function parseReflexes(
|
||||
return ok(reflexes);
|
||||
}
|
||||
|
||||
function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
|
||||
if (obj.api === undefined || obj.api === null) {
|
||||
return ok({ port: null });
|
||||
}
|
||||
if (!isPlainRecord(obj.api)) {
|
||||
return err(new Error("api: must be an object if provided"));
|
||||
}
|
||||
const api = obj.api;
|
||||
if (api.port === undefined || api.port === null) {
|
||||
return ok({ port: null });
|
||||
}
|
||||
if (
|
||||
typeof api.port !== "number" ||
|
||||
!Number.isInteger(api.port) ||
|
||||
api.port < 1 ||
|
||||
api.port > 65_535
|
||||
) {
|
||||
return err(new Error("api.port: must be an integer between 1 and 65535 if provided"));
|
||||
}
|
||||
return ok({ port: api.port });
|
||||
}
|
||||
|
||||
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
|
||||
if (obj.workflows === undefined || obj.workflows === null) return ok({});
|
||||
|
||||
@@ -293,10 +321,14 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
|
||||
const maxRoundsResult = parseEngineMaxRounds(obj);
|
||||
if (!maxRoundsResult.ok) return maxRoundsResult;
|
||||
|
||||
const apiResult = parseApiConfig(obj);
|
||||
if (!apiResult.ok) return apiResult;
|
||||
|
||||
return ok({
|
||||
maxRounds: maxRoundsResult.value,
|
||||
senses,
|
||||
reflexes: reflexesResult.value,
|
||||
workflows: workflowsResult.value,
|
||||
api: apiResult.value,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig
|
||||
reflexes: [],
|
||||
workflows,
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import { join } from "node:path";
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { createDaemonHandlers } from "../daemon-handlers.js";
|
||||
import { createDaemonIpcServer } from "../daemon-ipc.js";
|
||||
import type { DaemonIpcServer } from "../daemon-ipc.js";
|
||||
|
||||
@@ -28,15 +29,42 @@ let server: DaemonIpcServer | null = null;
|
||||
function makeMockWorkflowManager() {
|
||||
return {
|
||||
startWorkflow: vi.fn(),
|
||||
killThread: vi.fn(() => false),
|
||||
stop: vi.fn(async () => {}),
|
||||
totalActiveCount: vi.fn(() => 0),
|
||||
drainAndRespawn: vi.fn(async () => {}),
|
||||
drainWhenIdle: vi.fn(),
|
||||
updateConfig: vi.fn(),
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
listWorkflows: vi.fn(() => []),
|
||||
activeCount: vi.fn(() => 0),
|
||||
queueLength: vi.fn(() => 0),
|
||||
};
|
||||
}
|
||||
|
||||
function openIpcServer(
|
||||
path: string,
|
||||
wfManager: ReturnType<typeof makeMockWorkflowManager>,
|
||||
opts: {
|
||||
triggerSense: (senseName: string) => void;
|
||||
listSenses: () => unknown[];
|
||||
},
|
||||
): DaemonIpcServer {
|
||||
const handlers = createDaemonHandlers({
|
||||
workflowManager: wfManager as never,
|
||||
triggerSense: opts.triggerSense,
|
||||
listSenses: opts.listSenses as never,
|
||||
getHealthInfo: () => ({
|
||||
ok: true,
|
||||
version: "0.0.0-test",
|
||||
uptime: 0,
|
||||
startedAt: "2020-01-01T00:00:00.000Z",
|
||||
hostname: "test-host",
|
||||
}),
|
||||
getDefaultMaxRounds: () => 10,
|
||||
});
|
||||
return createDaemonIpcServer(path, handlers);
|
||||
}
|
||||
|
||||
function sendRaw(path: string, message: object): Promise<object> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const sock = connect(path, () => {
|
||||
@@ -89,9 +117,10 @@ afterEach(async () => {
|
||||
describe("daemon-ipc — trigger-sense", () => {
|
||||
it("responds ok:true when triggerSense succeeds", async () => {
|
||||
const triggerSense = vi.fn();
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "cpu-usage" });
|
||||
@@ -105,9 +134,10 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
const triggerSense = vi.fn(() => {
|
||||
throw new Error('Unknown sense: "no-such-sense"');
|
||||
});
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "no-such-sense" });
|
||||
@@ -118,9 +148,10 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
|
||||
it("responds ok:false for trigger-sense with empty sense name", async () => {
|
||||
const triggerSense = vi.fn();
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "" });
|
||||
@@ -131,9 +162,10 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
|
||||
it("responds ok:false for trigger-sense missing sense field", async () => {
|
||||
const triggerSense = vi.fn();
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, { type: "trigger-sense" });
|
||||
@@ -145,9 +177,9 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
it("does NOT call triggerSense for trigger-workflow requests", async () => {
|
||||
const triggerSense = vi.fn();
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = createDaemonIpcServer(sockPath, wfManager as never, {
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, {
|
||||
@@ -169,9 +201,10 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
|
||||
it("responds ok:false for completely unknown request type", async () => {
|
||||
const triggerSense = vi.fn();
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense,
|
||||
listSenses: vi.fn(() => []),
|
||||
listSenses: () => [],
|
||||
});
|
||||
|
||||
const resp = await sendRaw(sockPath, { type: "unknown-type", data: "x" });
|
||||
@@ -188,7 +221,8 @@ describe("daemon-ipc — trigger-sense", () => {
|
||||
describe("daemon-ipc — list-senses", () => {
|
||||
it("responds ok:true with empty senses array when listSenses returns []", async () => {
|
||||
const listSenses = vi.fn(() => []);
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense: vi.fn(),
|
||||
listSenses,
|
||||
});
|
||||
@@ -217,7 +251,8 @@ describe("daemon-ipc — list-senses", () => {
|
||||
},
|
||||
];
|
||||
const listSenses = vi.fn(() => sensesData);
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense: vi.fn(),
|
||||
listSenses,
|
||||
});
|
||||
@@ -232,7 +267,8 @@ describe("daemon-ipc — list-senses", () => {
|
||||
const listSenses = vi.fn(() => {
|
||||
throw new Error("internal error");
|
||||
});
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense: vi.fn(),
|
||||
listSenses,
|
||||
});
|
||||
@@ -244,7 +280,8 @@ describe("daemon-ipc — list-senses", () => {
|
||||
|
||||
it("does NOT call listSenses for trigger-sense requests", async () => {
|
||||
const listSenses = vi.fn(() => []);
|
||||
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
|
||||
const wfManager = makeMockWorkflowManager();
|
||||
server = openIpcServer(sockPath, wfManager, {
|
||||
triggerSense: vi.fn(),
|
||||
listSenses,
|
||||
});
|
||||
|
||||
@@ -81,6 +81,8 @@ describe("createFileWatcher — workflow file changes (Phase 3)", () => {
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
// Isolate the nerve.yaml write from fs.watch startup / coalesced events on some platforms
|
||||
changes.length = 0;
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
|
||||
|
||||
await waitFor(() => changes.some((c) => c.kind === "config"), 3000);
|
||||
|
||||
@@ -95,6 +95,8 @@ describe("createFileWatcher", () => {
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
// Ignore fs.watch attachment noise on some platforms before asserting post-close silence
|
||||
changes.length = 0;
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ const { createWorkflowManager } = await import("../workflow-manager.js");
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
|
||||
return { senses: {}, reflexes: [], workflows, maxRounds: 10 };
|
||||
return { senses: {}, reflexes: [], workflows, maxRounds: 10, api: { port: null } };
|
||||
}
|
||||
|
||||
function makeLogStore() {
|
||||
@@ -454,6 +454,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
@@ -489,6 +490,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
const kernel = createKernel(initialConfig, nerveRoot, {
|
||||
@@ -510,6 +512,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
@@ -532,6 +535,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
const kernel = createKernel(initialConfig, nerveRoot, {
|
||||
@@ -548,6 +552,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -197,6 +198,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
@@ -214,6 +216,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -229,6 +232,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
@@ -252,6 +256,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
});
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
|
||||
@@ -97,6 +97,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -100,6 +100,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -322,6 +323,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
reflexes: [],
|
||||
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
@@ -375,6 +377,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -210,6 +211,7 @@ describe("kernel — groupForSense mapping", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
@@ -59,6 +60,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
|
||||
@@ -90,6 +92,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
|
||||
@@ -27,6 +27,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -152,6 +153,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
@@ -172,6 +174,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: MOCK_WORKER,
|
||||
@@ -187,6 +190,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
@@ -224,6 +228,7 @@ describe("phase6 — error isolation", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
@@ -334,6 +339,7 @@ describe("phase6 — getHealth", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -299,6 +300,7 @@ describe("ReflexScheduler — workflow reflexes ignored", () => {
|
||||
workflows: {
|
||||
"my-workflow": { concurrency: 1, overflow: "drop" },
|
||||
},
|
||||
api: { port: null },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
@@ -89,6 +89,7 @@ function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveCon
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: overrides as NerveConfig["workflows"],
|
||||
api: { port: null },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
import type { HealthInfo, SenseInfo, WorkflowStatus } from "@uncaged/nerve-core";
|
||||
|
||||
import type { WorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
export type DaemonHandlerBundle = {
|
||||
health: () => HealthInfo;
|
||||
listSenses: () => SenseInfo[];
|
||||
listWorkflows: () => WorkflowStatus[];
|
||||
getDefaultMaxRounds: () => number;
|
||||
triggerSense: (senseName: string) => { ok: true } | { ok: false; error: string };
|
||||
triggerWorkflow: (
|
||||
workflowName: string,
|
||||
launch: { prompt: string; maxRounds: number; dryRun: boolean },
|
||||
) => { ok: true } | { ok: false; error: string };
|
||||
killWorkflowByRunId: (runId: string) => { ok: true } | { ok: false; error: string };
|
||||
};
|
||||
|
||||
export type CreateDaemonHandlersInput = {
|
||||
workflowManager: WorkflowManager;
|
||||
triggerSense: (senseName: string) => void;
|
||||
listSenses: () => SenseInfo[];
|
||||
getHealthInfo: () => HealthInfo;
|
||||
getDefaultMaxRounds: () => number;
|
||||
};
|
||||
|
||||
export function createDaemonHandlers(input: CreateDaemonHandlersInput): DaemonHandlerBundle {
|
||||
const {
|
||||
workflowManager,
|
||||
triggerSense: triggerSenseFn,
|
||||
listSenses: listSensesFn,
|
||||
getHealthInfo,
|
||||
getDefaultMaxRounds,
|
||||
} = input;
|
||||
|
||||
function triggerSense(senseName: string): { ok: true } | { ok: false; error: string } {
|
||||
try {
|
||||
triggerSenseFn(senseName);
|
||||
return { ok: true };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
return { ok: false, error: msg };
|
||||
}
|
||||
}
|
||||
|
||||
function triggerWorkflow(
|
||||
workflowName: string,
|
||||
launch: { prompt: string; maxRounds: number; dryRun: boolean },
|
||||
): { ok: true } | { ok: false; error: string } {
|
||||
try {
|
||||
workflowManager.startWorkflow(workflowName, launch);
|
||||
return { ok: true };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
return { ok: false, error: msg };
|
||||
}
|
||||
}
|
||||
|
||||
function killWorkflowByRunId(runId: string): { ok: true } | { ok: false; error: string } {
|
||||
try {
|
||||
const found = workflowManager.killThread(runId);
|
||||
return found
|
||||
? { ok: true }
|
||||
: { ok: false, error: `Run not found or already finished: ${runId}` };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
return { ok: false, error: msg };
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
health: () => getHealthInfo(),
|
||||
listSenses: () => listSensesFn(),
|
||||
listWorkflows: () => workflowManager.listWorkflows(),
|
||||
getDefaultMaxRounds: () => getDefaultMaxRounds(),
|
||||
triggerSense,
|
||||
triggerWorkflow,
|
||||
killWorkflowByRunId,
|
||||
};
|
||||
}
|
||||
@@ -9,28 +9,18 @@
|
||||
import { rmSync } from "node:fs";
|
||||
import { type Server, type Socket, createServer } from "node:net";
|
||||
|
||||
import type { DaemonIpcResponse, SenseInfo } from "@uncaged/nerve-core";
|
||||
import type { DaemonIpcResponse } from "@uncaged/nerve-core";
|
||||
import { parseDaemonIpcRequest } from "@uncaged/nerve-core";
|
||||
|
||||
import type { WorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
export type { SenseInfo };
|
||||
import type { DaemonHandlerBundle } from "./daemon-handlers.js";
|
||||
|
||||
export type DaemonIpcServer = {
|
||||
close: () => Promise<void>;
|
||||
};
|
||||
|
||||
export type DaemonIpcServerOptions = {
|
||||
/** Called when a trigger-sense request arrives. Should throw if the sense is unknown. */
|
||||
triggerSense: (senseName: string) => void;
|
||||
/** Called when a list-senses request arrives. Returns sense info for all registered senses. */
|
||||
listSenses: () => SenseInfo[];
|
||||
};
|
||||
|
||||
export function createDaemonIpcServer(
|
||||
socketPath: string,
|
||||
workflowManager: WorkflowManager,
|
||||
opts: DaemonIpcServerOptions,
|
||||
handlers: DaemonHandlerBundle,
|
||||
): DaemonIpcServer {
|
||||
// Remove stale socket file if it exists
|
||||
try {
|
||||
@@ -52,26 +42,32 @@ export function createDaemonIpcServer(
|
||||
|
||||
try {
|
||||
if (req.type === "trigger-workflow") {
|
||||
workflowManager.startWorkflow(req.workflow, {
|
||||
const r = handlers.triggerWorkflow(req.workflow, {
|
||||
prompt: req.prompt,
|
||||
maxRounds: req.maxRounds,
|
||||
dryRun: req.dryRun,
|
||||
});
|
||||
const resp: DaemonIpcResponse = { ok: true };
|
||||
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "trigger-sense") {
|
||||
opts.triggerSense(req.sense);
|
||||
const resp: DaemonIpcResponse = { ok: true };
|
||||
const r = handlers.triggerSense(req.sense);
|
||||
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "list-senses") {
|
||||
const senses = opts.listSenses();
|
||||
const senses = handlers.listSenses();
|
||||
const resp: DaemonIpcResponse = { ok: true, senses };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "kill-workflow") {
|
||||
const found = workflowManager.killThread(req.runId);
|
||||
const resp: DaemonIpcResponse = found
|
||||
? { ok: true }
|
||||
: { ok: false, error: `Run not found or already finished: ${req.runId}` };
|
||||
const r = handlers.killWorkflowByRunId(req.runId);
|
||||
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "list-workflows") {
|
||||
const workflows = handlers.listWorkflows();
|
||||
const resp: DaemonIpcResponse = { ok: true, workflows };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else if (req.type === "health") {
|
||||
const health = handlers.health();
|
||||
const resp: DaemonIpcResponse = { ok: true, health };
|
||||
socket.write(`${JSON.stringify(resp)}\n`);
|
||||
} else {
|
||||
const _exhaustive: never = req;
|
||||
|
||||
@@ -0,0 +1,194 @@
|
||||
/**
|
||||
* Optional JSON HTTP control plane (Phase 1 — no auth).
|
||||
* Uses only `node:http`; shares handler logic with Unix IPC via {@link createDaemonHandlers}.
|
||||
*/
|
||||
|
||||
import { type IncomingMessage, type Server, type ServerResponse, createServer } from "node:http";
|
||||
|
||||
import { isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { DaemonHandlerBundle } from "./daemon-handlers.js";
|
||||
|
||||
/** Phase 1 HTTP API has no auth — bind loopback only unless explicitly configured later. */
|
||||
const HTTP_API_BIND_HOST = "127.0.0.1";
|
||||
|
||||
const CORS_HEADERS: Record<string, string> = {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
||||
"Access-Control-Allow-Headers": "Content-Type",
|
||||
};
|
||||
|
||||
export type HttpApiServer = {
|
||||
close: () => Promise<void>;
|
||||
};
|
||||
|
||||
function setJsonHeaders(res: ServerResponse, status: number): void {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
for (const [k, v] of Object.entries(CORS_HEADERS)) {
|
||||
res.setHeader(k, v);
|
||||
}
|
||||
}
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown): void {
|
||||
setJsonHeaders(res, status);
|
||||
res.end(`${JSON.stringify(body)}\n`);
|
||||
}
|
||||
|
||||
function readRequestBody(req: IncomingMessage): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
req.on("data", (c: Buffer) => {
|
||||
chunks.push(c);
|
||||
});
|
||||
req.on("end", () => {
|
||||
resolve(Buffer.concat(chunks).toString("utf8"));
|
||||
});
|
||||
req.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
function parseJsonBody(raw: string): unknown | null {
|
||||
if (raw.trim().length === 0) return null;
|
||||
try {
|
||||
return JSON.parse(raw) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle): HttpApiServer {
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: HTTP router dispatches multiple routes in one handler
|
||||
const server: Server = createServer(async (req, res) => {
|
||||
if (req.method === "OPTIONS") {
|
||||
setJsonHeaders(res, 204);
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const url = req.url ?? "";
|
||||
const path = url.split("?")[0] ?? "";
|
||||
|
||||
try {
|
||||
if (req.method === "GET" && path === "/api/health") {
|
||||
sendJson(res, 200, handlers.health());
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "GET" && path === "/api/senses") {
|
||||
sendJson(res, 200, { senses: handlers.listSenses() });
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "GET" && path === "/api/workflows") {
|
||||
sendJson(res, 200, { workflows: handlers.listWorkflows() });
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/trigger-sense") {
|
||||
const raw = await readRequestBody(req);
|
||||
const body = parseJsonBody(raw);
|
||||
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
|
||||
sendJson(res, 400, { ok: false, error: 'Expected JSON body: { "name": string }' });
|
||||
return;
|
||||
}
|
||||
const result = handlers.triggerSense(body.name);
|
||||
sendJson(res, result.ok ? 200 : 400, result);
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/trigger-workflow") {
|
||||
const raw = await readRequestBody(req);
|
||||
const body = parseJsonBody(raw);
|
||||
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
|
||||
sendJson(res, 400, {
|
||||
ok: false,
|
||||
error:
|
||||
'Expected JSON body: { "name": string, "prompt"?: string, "maxRounds"?: number, "dryRun"?: boolean }',
|
||||
});
|
||||
return;
|
||||
}
|
||||
let prompt = "";
|
||||
if (body.prompt !== undefined && body.prompt !== null) {
|
||||
if (typeof body.prompt !== "string") {
|
||||
sendJson(res, 400, { ok: false, error: '"prompt" must be a string when provided' });
|
||||
return;
|
||||
}
|
||||
prompt = body.prompt;
|
||||
}
|
||||
let maxRounds = handlers.getDefaultMaxRounds();
|
||||
if (body.maxRounds !== undefined && body.maxRounds !== null) {
|
||||
if (typeof body.maxRounds !== "number") {
|
||||
sendJson(res, 400, { ok: false, error: '"maxRounds" must be a number when provided' });
|
||||
return;
|
||||
}
|
||||
maxRounds = body.maxRounds;
|
||||
}
|
||||
let dryRun = false;
|
||||
if (body.dryRun !== undefined && body.dryRun !== null) {
|
||||
if (typeof body.dryRun !== "boolean") {
|
||||
sendJson(res, 400, { ok: false, error: '"dryRun" must be a boolean when provided' });
|
||||
return;
|
||||
}
|
||||
dryRun = body.dryRun;
|
||||
}
|
||||
const result = handlers.triggerWorkflow(body.name, { prompt, maxRounds, dryRun });
|
||||
sendJson(res, result.ok ? 200 : 400, result);
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/kill-workflow") {
|
||||
const raw = await readRequestBody(req);
|
||||
const body = parseJsonBody(raw);
|
||||
if (
|
||||
!isPlainRecord(body) ||
|
||||
typeof body.threadId !== "string" ||
|
||||
body.threadId.length === 0
|
||||
) {
|
||||
sendJson(res, 400, {
|
||||
ok: false,
|
||||
error: 'Expected JSON body: { "threadId": string, "name"?: string }',
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (body.name !== undefined && body.name !== null && typeof body.name !== "string") {
|
||||
sendJson(res, 400, { ok: false, error: '"name" must be a string when provided' });
|
||||
return;
|
||||
}
|
||||
const nameForLog = typeof body.name === "string" && body.name.length > 0 ? body.name : null;
|
||||
if (nameForLog !== null) {
|
||||
process.stderr.write(
|
||||
`[http-api] kill-workflow threadId=${body.threadId} workflowName=${nameForLog}\n`,
|
||||
);
|
||||
}
|
||||
const result = handlers.killWorkflowByRunId(body.threadId);
|
||||
sendJson(res, result.ok ? 200 : 400, result);
|
||||
return;
|
||||
}
|
||||
|
||||
sendJson(res, 404, { ok: false, error: "Not found" });
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
sendJson(res, 500, { ok: false, error: msg });
|
||||
}
|
||||
});
|
||||
|
||||
server.listen(port, HTTP_API_BIND_HOST, () => {
|
||||
process.stderr.write(`[http-api] listening on http://${HTTP_API_BIND_HOST}:${String(port)}\n`);
|
||||
});
|
||||
|
||||
server.on("error", (err) => {
|
||||
process.stderr.write(`[http-api] server error: ${err.message}\n`);
|
||||
});
|
||||
|
||||
async function close(): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((e) => {
|
||||
if (e !== undefined && e !== null) reject(e);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return { close };
|
||||
}
|
||||
@@ -3,17 +3,23 @@
|
||||
* optional file watcher, and daemon IPC.
|
||||
*/
|
||||
|
||||
import { join } from "node:path";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { hostname } from "node:os";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
|
||||
import type { HealthInfo, NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
|
||||
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
|
||||
|
||||
import { createLogStore } from "@uncaged/nerve-store";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import { createDaemonHandlers } from "./daemon-handlers.js";
|
||||
import { createDaemonIpcServer } from "./daemon-ipc.js";
|
||||
import type { DaemonIpcServer } from "./daemon-ipc.js";
|
||||
import { createFileWatcher } from "./file-watcher.js";
|
||||
import type { FileWatcher } from "./file-watcher.js";
|
||||
import { createHttpApiServer } from "./http-api.js";
|
||||
import type { HttpApiServer } from "./http-api.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createKernelFileWatchHandlers } from "./kernel-file-watch.js";
|
||||
import {
|
||||
@@ -53,6 +59,8 @@ export type Kernel = {
|
||||
restartGroup: (group: string) => Promise<void>;
|
||||
reloadConfig: (newConfig: NerveConfig) => void;
|
||||
getHealth: () => KernelHealth;
|
||||
/** HTTP/IPC-oriented health (version, uptime seconds, hostname). */
|
||||
getDaemonHealth: () => HealthInfo;
|
||||
};
|
||||
|
||||
export type KernelOptions = {
|
||||
@@ -60,12 +68,37 @@ export type KernelOptions = {
|
||||
enableFileWatcher?: boolean;
|
||||
logStore?: LogStore;
|
||||
ipcSocketPath?: string | null;
|
||||
/**
|
||||
* When set to a positive integer, overrides `nerve.yaml` `api.port` for the HTTP API.
|
||||
* When `null` or omitted, only `config.api.port` from YAML is used.
|
||||
*/
|
||||
httpApiPortOverride?: number | null;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
|
||||
}
|
||||
|
||||
function readDaemonPackageVersion(): string {
|
||||
try {
|
||||
const here = fileURLToPath(import.meta.url);
|
||||
const pkgPath = join(dirname(here), "..", "package.json");
|
||||
const raw = readFileSync(pkgPath, "utf8");
|
||||
const o: unknown = JSON.parse(raw);
|
||||
if (
|
||||
typeof o === "object" &&
|
||||
o !== null &&
|
||||
"version" in o &&
|
||||
typeof (o as { version: unknown }).version === "string"
|
||||
) {
|
||||
return (o as { version: string }).version;
|
||||
}
|
||||
return "0.0.0";
|
||||
} catch {
|
||||
return "0.0.0";
|
||||
}
|
||||
}
|
||||
|
||||
export function createKernel(
|
||||
initialConfig: NerveConfig,
|
||||
nerveRoot: string,
|
||||
@@ -74,6 +107,8 @@ export function createKernel(
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript ?? resolveWorkerScript();
|
||||
const startTime = Date.now();
|
||||
const startedAtIso = new Date(startTime).toISOString();
|
||||
const daemonVersion = readDaemonPackageVersion();
|
||||
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
|
||||
|
||||
logStore.append({
|
||||
@@ -292,6 +327,16 @@ export function createKernel(
|
||||
};
|
||||
}
|
||||
|
||||
function getDaemonHealth(): HealthInfo {
|
||||
return {
|
||||
ok: true,
|
||||
version: daemonVersion,
|
||||
uptime: Math.floor((Date.now() - startTime) / 1000),
|
||||
startedAt: startedAtIso,
|
||||
hostname: hostname(),
|
||||
};
|
||||
}
|
||||
|
||||
const fileWatchHandlers = createKernelFileWatchHandlers({
|
||||
nerveRoot,
|
||||
getConfig: () => config,
|
||||
@@ -310,28 +355,43 @@ export function createKernel(
|
||||
});
|
||||
}
|
||||
|
||||
const daemonHandlers = createDaemonHandlers({
|
||||
workflowManager,
|
||||
triggerSense,
|
||||
listSenses(): SenseInfo[] {
|
||||
return Object.entries(config.senses).map(([name, senseConfig]) => {
|
||||
const entries = logStore.query({
|
||||
source: "sense",
|
||||
type: "signal",
|
||||
refId: name,
|
||||
});
|
||||
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
|
||||
return {
|
||||
name,
|
||||
group: senseConfig.group,
|
||||
throttle: senseConfig.throttle,
|
||||
timeout: senseConfig.timeout,
|
||||
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
|
||||
};
|
||||
});
|
||||
},
|
||||
getHealthInfo: getDaemonHealth,
|
||||
getDefaultMaxRounds: () => config.maxRounds,
|
||||
});
|
||||
|
||||
let ipcServer: DaemonIpcServer | null = null;
|
||||
if (options.ipcSocketPath != null) {
|
||||
ipcServer = createDaemonIpcServer(options.ipcSocketPath, workflowManager, {
|
||||
triggerSense,
|
||||
listSenses(): SenseInfo[] {
|
||||
return Object.entries(config.senses).map(([name, senseConfig]) => {
|
||||
const entries = logStore.query({
|
||||
source: "sense",
|
||||
type: "signal",
|
||||
refId: name,
|
||||
});
|
||||
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
|
||||
return {
|
||||
name,
|
||||
group: senseConfig.group,
|
||||
throttle: senseConfig.throttle,
|
||||
timeout: senseConfig.timeout,
|
||||
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
|
||||
};
|
||||
});
|
||||
},
|
||||
});
|
||||
ipcServer = createDaemonIpcServer(options.ipcSocketPath, daemonHandlers);
|
||||
}
|
||||
|
||||
let httpApiServer: HttpApiServer | null = null;
|
||||
const httpPortOverride = options.httpApiPortOverride;
|
||||
const effectiveHttpPort =
|
||||
httpPortOverride !== undefined && httpPortOverride !== null && httpPortOverride > 0
|
||||
? httpPortOverride
|
||||
: initialConfig.api.port;
|
||||
if (effectiveHttpPort !== null && effectiveHttpPort > 0) {
|
||||
httpApiServer = createHttpApiServer(effectiveHttpPort, daemonHandlers);
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
@@ -344,6 +404,10 @@ export function createKernel(
|
||||
await ipcServer.close();
|
||||
ipcServer = null;
|
||||
}
|
||||
if (httpApiServer !== null) {
|
||||
await httpApiServer.close();
|
||||
httpApiServer = null;
|
||||
}
|
||||
scheduler.stop();
|
||||
await workflowManager.stop();
|
||||
await senseWorkerPool.shutdownAll();
|
||||
@@ -377,5 +441,6 @@ export function createKernel(
|
||||
restartGroup: (group) => senseWorkerPool.restartGroup(group),
|
||||
reloadConfig,
|
||||
getHealth,
|
||||
getDaemonHealth,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -11,7 +11,12 @@ import type { ChildProcess } from "node:child_process";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, WorkflowConfig, WorkflowMessage } from "@uncaged/nerve-core";
|
||||
import type {
|
||||
NerveConfig,
|
||||
WorkflowConfig,
|
||||
WorkflowMessage,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
@@ -49,6 +54,8 @@ export type WorkflowManager = {
|
||||
queueLength: (workflowName: string) => number;
|
||||
/** Total active workflow threads across all workflows. */
|
||||
totalActiveCount: () => number;
|
||||
/** One row per workflow key in config (sorted by name), using current concurrency / overflow settings. */
|
||||
listWorkflows: () => WorkflowStatus[];
|
||||
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
|
||||
updateConfig: (newConfig: NerveConfig) => void;
|
||||
/**
|
||||
@@ -676,6 +683,22 @@ export function createWorkflowManager(
|
||||
return total;
|
||||
}
|
||||
|
||||
function listWorkflows(): WorkflowStatus[] {
|
||||
const names = Object.keys(config.workflows).sort();
|
||||
const out: WorkflowStatus[] = [];
|
||||
for (const name of names) {
|
||||
const wf = config.workflows[name];
|
||||
if (wf === undefined) continue;
|
||||
out.push({
|
||||
name,
|
||||
activeThreads: activeCount(name),
|
||||
queuedThreads: queueLength(name),
|
||||
config: { concurrency: wf.concurrency, overflow: wf.overflow },
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function updateConfig(newConfig: NerveConfig): void {
|
||||
config = newConfig;
|
||||
}
|
||||
@@ -756,6 +779,7 @@ export function createWorkflowManager(
|
||||
activeCount,
|
||||
queueLength,
|
||||
totalActiveCount,
|
||||
listWorkflows,
|
||||
updateConfig,
|
||||
drainAndRespawn,
|
||||
drainWhenIdle,
|
||||
|
||||
Reference in New Issue
Block a user