fix(http-api): bind 127.0.0.1, support trigger body params, fix kill-workflow fields #136

Merged
xiaomo merged 3 commits from feat/133-http-api into main 2026-04-25 06:11:57 +00:00
34 changed files with 946 additions and 90 deletions
+2 -1
View File
@@ -192,6 +192,7 @@ describe("buildListOutput", () => {
// header + 2 run lines
expect(lines).toHaveLength(3);
expect(paginationHint).not.toBeNull();
expect(paginationHint).toContain("nerve workflow runs");
expect(paginationHint).toContain("--offset 2");
expect(paginationHint).toContain("3 more");
});
@@ -298,7 +299,7 @@ describe("buildInspectOutput", () => {
// Integration: getAllWorkflowRuns + buildListOutput end-to-end with real store
// ---------------------------------------------------------------------------
describe("workflow list — integration with real store", () => {
describe("workflow runs list — integration with real store", () => {
it("lists active runs from the store", () => {
upsertRun("r1", "cleanup", "started", 1000);
upsertRun("r2", "cleanup", "queued", 2000);
+22 -3
View File
@@ -1,6 +1,9 @@
import { defineCommand } from "citty";
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
import {
type ForegroundSessionOptions,
runForegroundKernelSession,
} from "../run-foreground-kernel.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot } from "../workspace.js";
@@ -9,9 +12,25 @@ export const devCommand = defineCommand({
name: "dev",
description: "Run the nerve kernel in the foreground (development mode)",
},
async run() {
args: {
port: {
type: "string",
description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.",
default: "",
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
const { createKernel } = await loadDaemonModule(nerveRoot);
await runForegroundKernelSession(nerveRoot, createKernel);
let sessionOpts: ForegroundSessionOptions = {};
if (args.port.length > 0) {
const n = Number.parseInt(args.port, 10);
if (Number.isNaN(n) || n < 1 || n > 65_535) {
process.stderr.write(`❌ Invalid --port: ${args.port}\n`);
process.exit(1);
}
sessionOpts = { httpApiPortOverride: n };
}
await runForegroundKernelSession(nerveRoot, createKernel, sessionOpts);
},
});
+39 -7
View File
@@ -1,9 +1,10 @@
import { spawn } from "node:child_process";
import { createWriteStream, existsSync } from "node:fs";
import { createWriteStream, existsSync, readFileSync } from "node:fs";
import { mkdir } from "node:fs/promises";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import {
@@ -56,7 +57,7 @@ function daemonBootstrapScript(): string {
);
}
async function runDaemon(nerveRoot: string): Promise<void> {
async function runDaemon(nerveRoot: string, cliHttpPort: number | null): Promise<void> {
if (isRunning()) {
const pid = readPidFile();
process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`);
@@ -74,12 +75,27 @@ async function runDaemon(nerveRoot: string): Promise<void> {
const bootstrapPath = daemonBootstrapScript();
const configPath = join(nerveRoot, "nerve.yaml");
let yamlApiPort: number | null = null;
try {
const raw = readFileSync(configPath, "utf8");
const parsed = parseNerveConfig(raw);
if (parsed.ok) yamlApiPort = parsed.value.api.port;
} catch {
// kernel bootstrap will surface a clearer error if config is missing
}
const resolvedHttpPort = cliHttpPort ?? yamlApiPort;
const env: NodeJS.ProcessEnv = { ...process.env, NERVE_ROOT: nerveRoot };
if (resolvedHttpPort !== null && resolvedHttpPort > 0) {
env.NERVE_API_PORT = String(resolvedHttpPort);
}
// After `open`, file-backed WriteStream has a numeric OS fd for spawn stdio; `@types/node` omits `fd` on this WriteStream alias.
const logFd = (logStream as unknown as { fd: number }).fd;
const child = spawn(process.execPath, [bootstrapPath], {
detached: true,
stdio: ["ignore", logFd, logFd],
env: { ...process.env, NERVE_ROOT: nerveRoot },
env,
cwd: nerveRoot,
});
@@ -109,8 +125,8 @@ async function runDaemon(nerveRoot: string): Promise<void> {
}
/** Background daemon only — use `nerve dev` for foreground mode. */
export async function runDaemonStartCommand(): Promise<void> {
await runDaemon(getNerveRoot());
export async function runDaemonStartCommand(cliHttpPort: number | null = null): Promise<void> {
await runDaemon(getNerveRoot(), cliHttpPort);
}
export const daemonStartCommand = defineCommand({
@@ -118,7 +134,23 @@ export const daemonStartCommand = defineCommand({
name: "start",
description: "Start the nerve daemon in the background",
},
async run() {
await runDaemonStartCommand();
args: {
port: {
type: "string",
description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.",
default: "",
},
},
async run({ args }) {
let cliHttpPort: number | null = null;
if (args.port.length > 0) {
const n = Number.parseInt(args.port, 10);
if (Number.isNaN(n) || n < 1 || n > 65_535) {
process.stderr.write(`❌ Invalid --port: ${args.port}\n`);
process.exit(1);
}
cliHttpPort = n;
}
await runDaemonStartCommand(cliHttpPort);
},
});
+61 -7
View File
@@ -7,7 +7,12 @@ import { defineCommand } from "citty";
import { stringify } from "yaml";
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js";
import {
killWorkflowViaDaemon,
listWorkflowsViaDaemon,
triggerWorkflowViaDaemon,
} from "../daemon-client.js";
import { formatRowsAsAlignedTable } from "../sense-sqlite.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
@@ -125,7 +130,7 @@ export function buildListOutput(
const allFlagStr = allFlag ? " --all" : "";
paginationHint =
`\n⏩ ${remaining} more run(s) not shown. Fetch next page:\n` +
` nerve workflow list --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
` nerve workflow runs --offset ${offset + limit}${allFlagStr}${wfFlag}\n`;
}
return { lines, paginationHint };
@@ -298,13 +303,61 @@ export function buildThreadCommandOutput(
}
// ---------------------------------------------------------------------------
// nerve workflow list
// nerve workflow list (daemon — registered workflows + queue depth)
// ---------------------------------------------------------------------------
const workflowListCommand = defineCommand({
const workflowDaemonListCommand = defineCommand({
meta: {
name: "list",
description: "List active (queued/started) workflow runs",
description: "List workflows from the running daemon (concurrency, active, queued)",
},
async run() {
if (!isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.exit(1);
}
const socketPath = getSocketPath();
let response: Awaited<ReturnType<typeof listWorkflowsViaDaemon>>;
try {
response = await listWorkflowsViaDaemon(socketPath);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
process.exit(1);
}
if (!response.ok) {
process.stderr.write(`❌ Daemon error: ${response.error}\n`);
process.exit(1);
}
const rows = response.workflows.map((w) => ({
name: w.name,
active: w.activeThreads,
queued: w.queuedThreads,
concurrency: w.config.concurrency,
overflow: w.config.overflow,
}));
if (rows.length === 0) {
process.stdout.write("📭 No workflows in nerve.yaml (or empty registry).\n");
return;
}
process.stdout.write(`📋 Workflows (${String(rows.length)}):\n\n`);
process.stdout.write(formatRowsAsAlignedTable(rows));
},
});
// ---------------------------------------------------------------------------
// nerve workflow runs
// ---------------------------------------------------------------------------
const workflowRunsCommand = defineCommand({
meta: {
name: "runs",
description: "List active (queued/started) workflow runs from logs",
},
args: {
all: {
@@ -562,7 +615,7 @@ const workflowTriggerCommand = defineCommand({
}
process.stdout.write(`✅ Triggered workflow "${args.name}" via daemon.\n`);
process.stdout.write("\n💡 Inspect active runs with: nerve workflow list\n");
process.stdout.write("\n💡 Inspect active runs with: nerve workflow runs\n");
},
});
@@ -616,7 +669,8 @@ export const workflowCommand = defineCommand({
description: "Manage and inspect workflow runs",
},
subCommands: {
list: workflowListCommand,
list: workflowDaemonListCommand,
runs: workflowRunsCommand,
inspect: workflowInspectCommand,
thread: workflowThreadCommand,
trigger: workflowTriggerCommand,
+140 -1
View File
@@ -10,11 +10,16 @@ import type { Socket } from "node:net";
import type {
DaemonIpcListSensesResponse,
DaemonIpcListWorkflowsResponse,
DaemonIpcRequest,
DaemonIpcTriggerResponse,
DaemonTransport,
DaemonTransportTriggerResult,
HealthInfo,
SenseInfo,
WorkflowStatus,
} from "@uncaged/nerve-core";
import { isPlainRecord } from "@uncaged/nerve-core";
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core";
const CONNECT_TIMEOUT_MS = 3_000;
const RESPONSE_TIMEOUT_MS = 5_000;
@@ -32,6 +37,19 @@ function isSenseInfo(value: unknown): value is SenseInfo {
);
}
function isWorkflowStatus(value: unknown): value is WorkflowStatus {
if (!isPlainRecord(value)) return false;
const cfg = value.config;
if (!isPlainRecord(cfg)) return false;
return (
typeof value.name === "string" &&
typeof value.activeThreads === "number" &&
typeof value.queuedThreads === "number" &&
typeof cfg.concurrency === "number" &&
typeof cfg.overflow === "string"
);
}
function parseDaemonResponse(line: string): DaemonIpcTriggerResponse {
try {
const obj: unknown = JSON.parse(line);
@@ -62,6 +80,51 @@ function parseListSensesResponse(line: string): DaemonIpcListSensesResponse {
return { ok: false, error: `Unexpected daemon response: ${line}` };
}
function parseListWorkflowsResponse(line: string): DaemonIpcListWorkflowsResponse {
try {
const obj: unknown = JSON.parse(line);
if (isPlainRecord(obj)) {
const r = obj;
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
if (r.ok === true && Array.isArray(r.workflows) && r.workflows.every(isWorkflowStatus)) {
return { ok: true, workflows: r.workflows };
}
}
} catch {
// fall through
}
return { ok: false, error: `Unexpected daemon response: ${line}` };
}
function parseHealthResponse(line: string): HealthInfo | null {
try {
const obj: unknown = JSON.parse(line);
if (!isPlainRecord(obj)) return null;
const r = obj;
if (r.ok === true && isPlainRecord(r.health)) {
const h = r.health;
if (
typeof h.ok === "boolean" &&
typeof h.version === "string" &&
typeof h.uptime === "number" &&
typeof h.startedAt === "string" &&
typeof h.hostname === "string"
) {
return {
ok: h.ok,
version: h.version,
uptime: h.uptime,
startedAt: h.startedAt,
hostname: h.hostname,
};
}
}
} catch {
// fall through
}
return null;
}
/**
* Connect to the daemon socket, send one JSON request (newline-terminated),
* and resolve with the first non-empty line parsed by `parseFirstLine`.
@@ -126,6 +189,72 @@ function sendAndReceive<T>(
});
}
/** Unix-socket implementation of {@link DaemonTransport} (local daemon). */
export class UnixTransport implements DaemonTransport {
readonly socketPath: string;
constructor(socketPath: string) {
this.socketPath = socketPath;
}
async health(): Promise<HealthInfo> {
const parsed = await sendAndReceive(this.socketPath, { type: "health" }, (line) =>
parseHealthResponse(line),
);
if (parsed === null) {
throw new Error("Unexpected daemon response for health");
}
return parsed;
}
async listSenses(): Promise<SenseInfo[]> {
const r = await sendAndReceive(
this.socketPath,
{ type: "list-senses" },
parseListSensesResponse,
);
if (!r.ok) {
throw new Error(r.error);
}
return r.senses;
}
async listWorkflows(): Promise<WorkflowStatus[]> {
const r = await sendAndReceive(
this.socketPath,
{ type: "list-workflows" },
parseListWorkflowsResponse,
);
if (!r.ok) {
throw new Error(r.error);
}
return r.workflows;
}
async triggerSense(name: string): Promise<DaemonTransportTriggerResult> {
return sendAndReceive(
this.socketPath,
{ type: "trigger-sense", sense: name },
parseDaemonResponse,
);
}
async triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult> {
const message: DaemonIpcRequest = {
type: "trigger-workflow",
workflow: name,
prompt: "",
maxRounds: DEFAULT_ENGINE_MAX_ROUNDS,
dryRun: false,
};
return sendAndReceive(this.socketPath, message, parseDaemonResponse);
}
async killWorkflow(runId: string): Promise<DaemonTransportTriggerResult> {
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
return sendAndReceive(this.socketPath, message, parseDaemonResponse);
}
}
/**
* Send a trigger-workflow message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors.
@@ -168,6 +297,16 @@ export function listSensesViaDaemon(socketPath: string): Promise<DaemonIpcListSe
return sendAndReceive(socketPath, message, parseListSensesResponse);
}
/**
* Send a list-workflows message to the running daemon via its Unix socket.
*/
export function listWorkflowsViaDaemon(
socketPath: string,
): Promise<DaemonIpcListWorkflowsResponse> {
const message: DaemonIpcRequest = { type: "list-workflows" };
return sendAndReceive(socketPath, message, parseListWorkflowsResponse);
}
/**
* Send a kill-workflow message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors.
+34 -3
View File
@@ -9,7 +9,11 @@ import { getSocketPath } from "./workspace.js";
export type CreateKernelFn = (
config: NerveConfig,
nerveRoot: string,
opts: { enableFileWatcher: boolean; ipcSocketPath: string },
opts: {
enableFileWatcher: boolean;
ipcSocketPath: string;
httpApiPortOverride?: number | null;
},
) => {
groups: Set<string>;
ready: Promise<void>;
@@ -28,9 +32,23 @@ function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
return parseNerveConfig(raw);
}
function parseEnvHttpPort(): number | null {
const envPort = process.env.NERVE_API_PORT;
if (envPort === undefined || envPort === "") return null;
const n = Number.parseInt(envPort, 10);
if (Number.isNaN(n) || n < 1 || n > 65_535) return null;
return n;
}
export type ForegroundSessionOptions = {
/** Positive integer overrides `nerve.yaml` `api.port` (same precedence as `NERVE_API_PORT`). */
httpApiPortOverride?: number | null;
};
export async function runForegroundKernelSession(
nerveRoot: string,
createKernel: CreateKernelFn,
sessionOpts: ForegroundSessionOptions = {},
): Promise<void> {
const configResult = readConfig(nerveRoot);
if (!configResult.ok) {
@@ -39,10 +57,23 @@ export async function runForegroundKernelSession(
}
const config = configResult.value;
const kernel = createKernel(config, nerveRoot, {
const envOverride = parseEnvHttpPort();
const cliOverride = sessionOpts.httpApiPortOverride ?? null;
const resolvedOverride =
cliOverride !== null && cliOverride > 0
? cliOverride
: envOverride !== null && envOverride > 0
? envOverride
: null;
const kernelBase = {
enableFileWatcher: true,
ipcSocketPath: getSocketPath(),
});
};
const kernel =
resolvedOverride !== null
? createKernel(config, nerveRoot, { ...kernelBase, httpApiPortOverride: resolvedOverride })
: createKernel(config, nerveRoot, kernelBase);
const senseNames = Object.keys(config.senses);
const groups = [...kernel.groups];
+5 -1
View File
@@ -34,7 +34,11 @@ export type DaemonModule = {
createKernel: (
config: NerveConfig,
nerveRoot: string,
options: { enableFileWatcher: boolean; ipcSocketPath: string },
options: {
enableFileWatcher: boolean;
ipcSocketPath: string;
httpApiPortOverride?: number | null;
},
) => {
groups: Set<string>;
ready: Promise<void>;
@@ -63,6 +63,7 @@ describe("parseNerveConfig", () => {
overflow: "queue",
maxQueue: 10,
});
expect(result.value.api).toEqual({ port: null });
});
it("parses config with empty reflexes array", () => {
@@ -170,9 +171,39 @@ workflows:
maxQueue: 100,
});
});
it("parses api.port when present", () => {
const yaml = `
senses:
cpu:
group: system
reflexes: []
api:
port: 9800
`;
const result = parseNerveConfig(yaml);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.api).toEqual({ port: 9800 });
});
});
describe("invalid configs", () => {
it("returns error when api.port is out of range", () => {
const yaml = `
senses:
cpu:
group: system
reflexes: []
api:
port: 99999
`;
const result = parseNerveConfig(yaml);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/api\.port/);
});
it("returns error on bad YAML syntax", () => {
const result = parseNerveConfig("senses: [\\nunclosed");
expect(result.ok).toBe(false);
@@ -55,7 +55,7 @@ describe("parseDaemonIpcRequest", () => {
).toBeNull();
});
it("parses trigger-sense and list-senses", () => {
it("parses trigger-sense, list-senses, list-workflows, and health", () => {
expect(parseDaemonIpcRequest(JSON.stringify({ type: "trigger-sense", sense: "x" }))).toEqual({
type: "trigger-sense",
sense: "x",
@@ -63,6 +63,12 @@ describe("parseDaemonIpcRequest", () => {
expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-senses" }))).toEqual({
type: "list-senses",
});
expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-workflows" }))).toEqual({
type: "list-workflows",
});
expect(parseDaemonIpcRequest(JSON.stringify({ type: "health" }))).toEqual({
type: "health",
});
});
it("returns null for invalid JSON or unknown type", () => {
+6
View File
@@ -28,10 +28,16 @@ export type QueueOverflowConfig = {
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
/** Optional HTTP control plane (Phase 1: no auth). When `port` is null, the HTTP server is not started. */
export type NerveApiConfig = {
port: number | null;
};
export type NerveConfig = {
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
maxRounds: number;
senses: Record<string, SenseConfig>;
reflexes: ReflexConfig[];
workflows: Record<string, WorkflowConfig>;
api: NerveApiConfig;
};
+47 -2
View File
@@ -7,6 +7,23 @@
import { isPlainRecord } from "./is-plain-record.js";
import type { SenseInfo } from "./sense.js";
/** Runtime status of a registered workflow (for listing / observability). */
export type WorkflowStatus = {
name: string;
activeThreads: number;
queuedThreads: number;
config: { concurrency: number; overflow: string };
};
/** Public health payload for HTTP / IPC. */
export type HealthInfo = {
ok: boolean;
version: string;
uptime: number;
startedAt: string;
hostname: string;
};
/** Client → daemon: start a workflow run. */
export type DaemonIpcTriggerWorkflowRequest = {
type: "trigger-workflow";
@@ -33,12 +50,24 @@ export type DaemonIpcKillWorkflowRequest = {
runId: string;
};
/** Client → daemon: list registered workflows and queue/active counts. */
export type DaemonIpcListWorkflowsRequest = {
type: "list-workflows";
};
/** Client → daemon: public health snapshot. */
export type DaemonIpcHealthRequest = {
type: "health";
};
/** Union of all JSON requests the daemon IPC server accepts. */
export type DaemonIpcRequest =
| DaemonIpcTriggerWorkflowRequest
| DaemonIpcTriggerSenseRequest
| DaemonIpcListSensesRequest
| DaemonIpcKillWorkflowRequest;
| DaemonIpcKillWorkflowRequest
| DaemonIpcListWorkflowsRequest
| DaemonIpcHealthRequest;
/** Successful trigger / trigger-sense reply (no body). */
export type DaemonIpcTriggerOkResponse = { ok: true };
@@ -53,11 +82,21 @@ export type DaemonIpcListSensesResponse =
| { ok: true; senses: SenseInfo[] }
| DaemonIpcErrorResponse;
/** Reply for list-workflows. */
export type DaemonIpcListWorkflowsResponse =
| { ok: true; workflows: WorkflowStatus[] }
| DaemonIpcErrorResponse;
/** Reply for health. */
export type DaemonIpcHealthResponse = { ok: true; health: HealthInfo } | DaemonIpcErrorResponse;
/** Any JSON response the daemon may write on the IPC socket. */
export type DaemonIpcResponse =
| DaemonIpcTriggerOkResponse
| DaemonIpcErrorResponse
| { ok: true; senses: SenseInfo[] };
| DaemonIpcListSensesResponse
| DaemonIpcListWorkflowsResponse
| DaemonIpcHealthResponse;
function parseTriggerWorkflowFields(
req: Record<string, unknown>,
@@ -98,6 +137,12 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
if (typeof req.runId !== "string" || req.runId.length === 0) return null;
return { type: "kill-workflow", runId: req.runId };
}
if (req.type === "list-workflows") {
return { type: "list-workflows" };
}
if (req.type === "health") {
return { type: "health" };
}
return null;
} catch {
return null;
+18
View File
@@ -0,0 +1,18 @@
import type { HealthInfo, WorkflowStatus } from "./daemon-ipc-protocol.js";
import type { SenseInfo } from "./sense.js";
export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string };
/**
* Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2).
* Implementations live in CLI / tools; the daemon kernel uses shared handler logic.
*/
export type DaemonTransport = {
health(): Promise<HealthInfo>;
listSenses(): Promise<SenseInfo[]>;
listWorkflows(): Promise<WorkflowStatus[]>;
triggerSense(name: string): Promise<DaemonTransportTriggerResult>;
triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult>;
/** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */
killWorkflow(runId: string): Promise<DaemonTransportTriggerResult>;
};
+8
View File
@@ -5,6 +5,7 @@ export type {
DropOverflowConfig,
QueueOverflowConfig,
WorkflowConfig,
NerveApiConfig,
NerveConfig,
} from "./config.js";
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
@@ -35,15 +36,22 @@ export {
} from "./sense-workflow-directive.js";
export type {
WorkflowStatus,
HealthInfo,
DaemonIpcTriggerWorkflowRequest,
DaemonIpcTriggerSenseRequest,
DaemonIpcListSensesRequest,
DaemonIpcKillWorkflowRequest,
DaemonIpcListWorkflowsRequest,
DaemonIpcHealthRequest,
DaemonIpcRequest,
DaemonIpcTriggerOkResponse,
DaemonIpcErrorResponse,
DaemonIpcTriggerResponse,
DaemonIpcListSensesResponse,
DaemonIpcListWorkflowsResponse,
DaemonIpcHealthResponse,
DaemonIpcResponse,
} from "./daemon-ipc-protocol.js";
export { parseDaemonIpcRequest } from "./daemon-ipc-protocol.js";
export type { DaemonTransport, DaemonTransportTriggerResult } from "./daemon-transport.js";
+33 -1
View File
@@ -1,6 +1,12 @@
import { parse } from "yaml";
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./config.js";
import type {
NerveApiConfig,
NerveConfig,
ReflexConfig,
SenseConfig,
WorkflowConfig,
} from "./config.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
@@ -245,6 +251,28 @@ function parseReflexes(
return ok(reflexes);
}
function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
if (obj.api === undefined || obj.api === null) {
return ok({ port: null });
}
if (!isPlainRecord(obj.api)) {
return err(new Error("api: must be an object if provided"));
}
const api = obj.api;
if (api.port === undefined || api.port === null) {
return ok({ port: null });
}
if (
typeof api.port !== "number" ||
!Number.isInteger(api.port) ||
api.port < 1 ||
api.port > 65_535
) {
return err(new Error("api.port: must be an integer between 1 and 65535 if provided"));
}
return ok({ port: api.port });
}
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
if (obj.workflows === undefined || obj.workflows === null) return ok({});
@@ -293,10 +321,14 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
const maxRoundsResult = parseEngineMaxRounds(obj);
if (!maxRoundsResult.ok) return maxRoundsResult;
const apiResult = parseApiConfig(obj);
if (!apiResult.ok) return apiResult;
return ok({
maxRounds: maxRoundsResult.value,
senses,
reflexes: reflexesResult.value,
workflows: workflowsResult.value,
api: apiResult.value,
});
}
@@ -65,6 +65,7 @@ function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig
reflexes: [],
workflows,
maxRounds: 10,
api: { port: null },
};
}
@@ -15,6 +15,7 @@ import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createDaemonHandlers } from "../daemon-handlers.js";
import { createDaemonIpcServer } from "../daemon-ipc.js";
import type { DaemonIpcServer } from "../daemon-ipc.js";
@@ -28,15 +29,42 @@ let server: DaemonIpcServer | null = null;
function makeMockWorkflowManager() {
return {
startWorkflow: vi.fn(),
killThread: vi.fn(() => false),
stop: vi.fn(async () => {}),
totalActiveCount: vi.fn(() => 0),
drainAndRespawn: vi.fn(async () => {}),
drainWhenIdle: vi.fn(),
updateConfig: vi.fn(),
getActiveWorkflowRuns: vi.fn(() => []),
listWorkflows: vi.fn(() => []),
activeCount: vi.fn(() => 0),
queueLength: vi.fn(() => 0),
};
}
function openIpcServer(
path: string,
wfManager: ReturnType<typeof makeMockWorkflowManager>,
opts: {
triggerSense: (senseName: string) => void;
listSenses: () => unknown[];
},
): DaemonIpcServer {
const handlers = createDaemonHandlers({
workflowManager: wfManager as never,
triggerSense: opts.triggerSense,
listSenses: opts.listSenses as never,
getHealthInfo: () => ({
ok: true,
version: "0.0.0-test",
uptime: 0,
startedAt: "2020-01-01T00:00:00.000Z",
hostname: "test-host",
}),
getDefaultMaxRounds: () => 10,
});
return createDaemonIpcServer(path, handlers);
}
function sendRaw(path: string, message: object): Promise<object> {
return new Promise((resolve, reject) => {
const sock = connect(path, () => {
@@ -89,9 +117,10 @@ afterEach(async () => {
describe("daemon-ipc — trigger-sense", () => {
it("responds ok:true when triggerSense succeeds", async () => {
const triggerSense = vi.fn();
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "cpu-usage" });
@@ -105,9 +134,10 @@ describe("daemon-ipc — trigger-sense", () => {
const triggerSense = vi.fn(() => {
throw new Error('Unknown sense: "no-such-sense"');
});
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "no-such-sense" });
@@ -118,9 +148,10 @@ describe("daemon-ipc — trigger-sense", () => {
it("responds ok:false for trigger-sense with empty sense name", async () => {
const triggerSense = vi.fn();
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "" });
@@ -131,9 +162,10 @@ describe("daemon-ipc — trigger-sense", () => {
it("responds ok:false for trigger-sense missing sense field", async () => {
const triggerSense = vi.fn();
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, { type: "trigger-sense" });
@@ -145,9 +177,9 @@ describe("daemon-ipc — trigger-sense", () => {
it("does NOT call triggerSense for trigger-workflow requests", async () => {
const triggerSense = vi.fn();
const wfManager = makeMockWorkflowManager();
server = createDaemonIpcServer(sockPath, wfManager as never, {
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, {
@@ -169,9 +201,10 @@ describe("daemon-ipc — trigger-sense", () => {
it("responds ok:false for completely unknown request type", async () => {
const triggerSense = vi.fn();
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense,
listSenses: vi.fn(() => []),
listSenses: () => [],
});
const resp = await sendRaw(sockPath, { type: "unknown-type", data: "x" });
@@ -188,7 +221,8 @@ describe("daemon-ipc — trigger-sense", () => {
describe("daemon-ipc — list-senses", () => {
it("responds ok:true with empty senses array when listSenses returns []", async () => {
const listSenses = vi.fn(() => []);
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense: vi.fn(),
listSenses,
});
@@ -217,7 +251,8 @@ describe("daemon-ipc — list-senses", () => {
},
];
const listSenses = vi.fn(() => sensesData);
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense: vi.fn(),
listSenses,
});
@@ -232,7 +267,8 @@ describe("daemon-ipc — list-senses", () => {
const listSenses = vi.fn(() => {
throw new Error("internal error");
});
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense: vi.fn(),
listSenses,
});
@@ -244,7 +280,8 @@ describe("daemon-ipc — list-senses", () => {
it("does NOT call listSenses for trigger-sense requests", async () => {
const listSenses = vi.fn(() => []);
server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, {
const wfManager = makeMockWorkflowManager();
server = openIpcServer(sockPath, wfManager, {
triggerSense: vi.fn(),
listSenses,
});
@@ -81,6 +81,8 @@ describe("createFileWatcher — workflow file changes (Phase 3)", () => {
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
// Isolate the nerve.yaml write from fs.watch startup / coalesced events on some platforms
changes.length = 0;
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
await waitFor(() => changes.some((c) => c.kind === "config"), 3000);
@@ -95,6 +95,8 @@ describe("createFileWatcher", () => {
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
// Ignore fs.watch attachment noise on some platforms before asserting post-close silence
changes.length = 0;
watcher.close();
watcher = null;
@@ -66,7 +66,7 @@ const { createWorkflowManager } = await import("../workflow-manager.js");
const { createKernel } = await import("../kernel.js");
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
return { senses: {}, reflexes: [], workflows, maxRounds: 10 };
return { senses: {}, reflexes: [], workflows, maxRounds: 10, api: { port: null } };
}
function makeLogStore() {
@@ -454,6 +454,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
reflexes: [],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
maxRounds: 10,
api: { port: null },
};
const kernel = createKernel(config, nerveRoot, {
@@ -489,6 +490,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
reflexes: [],
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
maxRounds: 10,
api: { port: null },
};
const kernel = createKernel(initialConfig, nerveRoot, {
@@ -510,6 +512,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -532,6 +535,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
reflexes: [],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
maxRounds: 10,
api: { port: null },
};
const kernel = createKernel(initialConfig, nerveRoot, {
@@ -548,6 +552,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
reflexes: [],
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -30,6 +30,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -78,6 +78,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -197,6 +198,7 @@ describe("kernel — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
});
expect(kernel.groups.has("network")).toBe(true);
@@ -214,6 +216,7 @@ describe("kernel — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
const kernel = createKernel(config, nerveRoot);
@@ -229,6 +232,7 @@ describe("kernel — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
});
expect(kernel.groups.has("network")).toBe(false);
@@ -252,6 +256,7 @@ describe("kernel — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
});
expect(kernel.getHealth().activeSenses).toBe(2);
@@ -97,6 +97,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -100,6 +100,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -322,6 +323,7 @@ describe("kernel + workflowManager integration", () => {
reflexes: [],
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -375,6 +377,7 @@ describe("kernel + workflowManager integration", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -61,6 +61,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -210,6 +211,7 @@ describe("kernel — groupForSense mapping", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
const kernel = createKernel(config, nerveRoot);
@@ -31,6 +31,7 @@ describe("LogStore + ReflexScheduler integration", () => {
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: {},
maxRounds: 10,
api: { port: null },
};
const bus = createSignalBus();
const triggered: string[] = [];
@@ -59,6 +60,7 @@ describe("LogStore + ReflexScheduler integration", () => {
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
workflows: {},
maxRounds: 10,
api: { port: null },
};
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
@@ -90,6 +92,7 @@ describe("LogStore + ReflexScheduler integration", () => {
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: {},
maxRounds: 10,
api: { port: null },
};
const bus = createSignalBus();
const triggered: string[] = [];
@@ -27,6 +27,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -152,6 +153,7 @@ describe("phase6 — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -172,6 +174,7 @@ describe("phase6 — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -187,6 +190,7 @@ describe("phase6 — reloadConfig", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -224,6 +228,7 @@ describe("phase6 — error isolation", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel = createKernel(config, nerveRoot, {
@@ -334,6 +339,7 @@ describe("phase6 — getHealth", () => {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
};
kernel.reloadConfig(newConfig);
@@ -12,6 +12,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -18,6 +18,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
reflexes: [],
workflows: {},
maxRounds: 10,
api: { port: null },
...overrides,
};
}
@@ -299,6 +300,7 @@ describe("ReflexScheduler — workflow reflexes ignored", () => {
workflows: {
"my-workflow": { concurrency: 1, overflow: "drop" },
},
api: { port: null },
};
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -89,6 +89,7 @@ function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveCon
senses: {},
reflexes: [],
workflows: overrides as NerveConfig["workflows"],
api: { port: null },
};
}
+79
View File
@@ -0,0 +1,79 @@
import type { HealthInfo, SenseInfo, WorkflowStatus } from "@uncaged/nerve-core";
import type { WorkflowManager } from "./workflow-manager.js";
export type DaemonHandlerBundle = {
health: () => HealthInfo;
listSenses: () => SenseInfo[];
listWorkflows: () => WorkflowStatus[];
getDefaultMaxRounds: () => number;
triggerSense: (senseName: string) => { ok: true } | { ok: false; error: string };
triggerWorkflow: (
workflowName: string,
launch: { prompt: string; maxRounds: number; dryRun: boolean },
) => { ok: true } | { ok: false; error: string };
killWorkflowByRunId: (runId: string) => { ok: true } | { ok: false; error: string };
};
export type CreateDaemonHandlersInput = {
workflowManager: WorkflowManager;
triggerSense: (senseName: string) => void;
listSenses: () => SenseInfo[];
getHealthInfo: () => HealthInfo;
getDefaultMaxRounds: () => number;
};
export function createDaemonHandlers(input: CreateDaemonHandlersInput): DaemonHandlerBundle {
const {
workflowManager,
triggerSense: triggerSenseFn,
listSenses: listSensesFn,
getHealthInfo,
getDefaultMaxRounds,
} = input;
function triggerSense(senseName: string): { ok: true } | { ok: false; error: string } {
try {
triggerSenseFn(senseName);
return { ok: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { ok: false, error: msg };
}
}
function triggerWorkflow(
workflowName: string,
launch: { prompt: string; maxRounds: number; dryRun: boolean },
): { ok: true } | { ok: false; error: string } {
try {
workflowManager.startWorkflow(workflowName, launch);
return { ok: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { ok: false, error: msg };
}
}
function killWorkflowByRunId(runId: string): { ok: true } | { ok: false; error: string } {
try {
const found = workflowManager.killThread(runId);
return found
? { ok: true }
: { ok: false, error: `Run not found or already finished: ${runId}` };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { ok: false, error: msg };
}
}
return {
health: () => getHealthInfo(),
listSenses: () => listSensesFn(),
listWorkflows: () => workflowManager.listWorkflows(),
getDefaultMaxRounds: () => getDefaultMaxRounds(),
triggerSense,
triggerWorkflow,
killWorkflowByRunId,
};
}
+18 -22
View File
@@ -9,28 +9,18 @@
import { rmSync } from "node:fs";
import { type Server, type Socket, createServer } from "node:net";
import type { DaemonIpcResponse, SenseInfo } from "@uncaged/nerve-core";
import type { DaemonIpcResponse } from "@uncaged/nerve-core";
import { parseDaemonIpcRequest } from "@uncaged/nerve-core";
import type { WorkflowManager } from "./workflow-manager.js";
export type { SenseInfo };
import type { DaemonHandlerBundle } from "./daemon-handlers.js";
export type DaemonIpcServer = {
close: () => Promise<void>;
};
export type DaemonIpcServerOptions = {
/** Called when a trigger-sense request arrives. Should throw if the sense is unknown. */
triggerSense: (senseName: string) => void;
/** Called when a list-senses request arrives. Returns sense info for all registered senses. */
listSenses: () => SenseInfo[];
};
export function createDaemonIpcServer(
socketPath: string,
workflowManager: WorkflowManager,
opts: DaemonIpcServerOptions,
handlers: DaemonHandlerBundle,
): DaemonIpcServer {
// Remove stale socket file if it exists
try {
@@ -52,26 +42,32 @@ export function createDaemonIpcServer(
try {
if (req.type === "trigger-workflow") {
workflowManager.startWorkflow(req.workflow, {
const r = handlers.triggerWorkflow(req.workflow, {
prompt: req.prompt,
maxRounds: req.maxRounds,
dryRun: req.dryRun,
});
const resp: DaemonIpcResponse = { ok: true };
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "trigger-sense") {
opts.triggerSense(req.sense);
const resp: DaemonIpcResponse = { ok: true };
const r = handlers.triggerSense(req.sense);
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "list-senses") {
const senses = opts.listSenses();
const senses = handlers.listSenses();
const resp: DaemonIpcResponse = { ok: true, senses };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "kill-workflow") {
const found = workflowManager.killThread(req.runId);
const resp: DaemonIpcResponse = found
? { ok: true }
: { ok: false, error: `Run not found or already finished: ${req.runId}` };
const r = handlers.killWorkflowByRunId(req.runId);
const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "list-workflows") {
const workflows = handlers.listWorkflows();
const resp: DaemonIpcResponse = { ok: true, workflows };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "health") {
const health = handlers.health();
const resp: DaemonIpcResponse = { ok: true, health };
socket.write(`${JSON.stringify(resp)}\n`);
} else {
const _exhaustive: never = req;
+194
View File
@@ -0,0 +1,194 @@
/**
* Optional JSON HTTP control plane (Phase 1 — no auth).
* Uses only `node:http`; shares handler logic with Unix IPC via {@link createDaemonHandlers}.
*/
import { type IncomingMessage, type Server, type ServerResponse, createServer } from "node:http";
import { isPlainRecord } from "@uncaged/nerve-core";
import type { DaemonHandlerBundle } from "./daemon-handlers.js";
/** Phase 1 HTTP API has no auth — bind loopback only unless explicitly configured later. */
const HTTP_API_BIND_HOST = "127.0.0.1";
const CORS_HEADERS: Record<string, string> = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
};
export type HttpApiServer = {
close: () => Promise<void>;
};
function setJsonHeaders(res: ServerResponse, status: number): void {
res.statusCode = status;
res.setHeader("Content-Type", "application/json; charset=utf-8");
for (const [k, v] of Object.entries(CORS_HEADERS)) {
res.setHeader(k, v);
}
}
function sendJson(res: ServerResponse, status: number, body: unknown): void {
setJsonHeaders(res, status);
res.end(`${JSON.stringify(body)}\n`);
}
function readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on("data", (c: Buffer) => {
chunks.push(c);
});
req.on("end", () => {
resolve(Buffer.concat(chunks).toString("utf8"));
});
req.on("error", reject);
});
}
function parseJsonBody(raw: string): unknown | null {
if (raw.trim().length === 0) return null;
try {
return JSON.parse(raw) as unknown;
} catch {
return null;
}
}
export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle): HttpApiServer {
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: HTTP router dispatches multiple routes in one handler
const server: Server = createServer(async (req, res) => {
if (req.method === "OPTIONS") {
setJsonHeaders(res, 204);
res.end();
return;
}
const url = req.url ?? "";
const path = url.split("?")[0] ?? "";
try {
if (req.method === "GET" && path === "/api/health") {
sendJson(res, 200, handlers.health());
return;
}
if (req.method === "GET" && path === "/api/senses") {
sendJson(res, 200, { senses: handlers.listSenses() });
return;
}
if (req.method === "GET" && path === "/api/workflows") {
sendJson(res, 200, { workflows: handlers.listWorkflows() });
return;
}
if (req.method === "POST" && path === "/api/trigger-sense") {
const raw = await readRequestBody(req);
const body = parseJsonBody(raw);
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
sendJson(res, 400, { ok: false, error: 'Expected JSON body: { "name": string }' });
return;
}
const result = handlers.triggerSense(body.name);
sendJson(res, result.ok ? 200 : 400, result);
return;
}
if (req.method === "POST" && path === "/api/trigger-workflow") {
const raw = await readRequestBody(req);
const body = parseJsonBody(raw);
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
sendJson(res, 400, {
ok: false,
error:
'Expected JSON body: { "name": string, "prompt"?: string, "maxRounds"?: number, "dryRun"?: boolean }',
});
return;
}
let prompt = "";
if (body.prompt !== undefined && body.prompt !== null) {
if (typeof body.prompt !== "string") {
sendJson(res, 400, { ok: false, error: '"prompt" must be a string when provided' });
return;
}
prompt = body.prompt;
}
let maxRounds = handlers.getDefaultMaxRounds();
if (body.maxRounds !== undefined && body.maxRounds !== null) {
if (typeof body.maxRounds !== "number") {
sendJson(res, 400, { ok: false, error: '"maxRounds" must be a number when provided' });
return;
}
maxRounds = body.maxRounds;
}
let dryRun = false;
if (body.dryRun !== undefined && body.dryRun !== null) {
if (typeof body.dryRun !== "boolean") {
sendJson(res, 400, { ok: false, error: '"dryRun" must be a boolean when provided' });
return;
}
dryRun = body.dryRun;
}
const result = handlers.triggerWorkflow(body.name, { prompt, maxRounds, dryRun });
sendJson(res, result.ok ? 200 : 400, result);
return;
}
if (req.method === "POST" && path === "/api/kill-workflow") {
const raw = await readRequestBody(req);
const body = parseJsonBody(raw);
if (
!isPlainRecord(body) ||
typeof body.threadId !== "string" ||
body.threadId.length === 0
) {
sendJson(res, 400, {
ok: false,
error: 'Expected JSON body: { "threadId": string, "name"?: string }',
});
return;
}
if (body.name !== undefined && body.name !== null && typeof body.name !== "string") {
sendJson(res, 400, { ok: false, error: '"name" must be a string when provided' });
return;
}
const nameForLog = typeof body.name === "string" && body.name.length > 0 ? body.name : null;
if (nameForLog !== null) {
process.stderr.write(
`[http-api] kill-workflow threadId=${body.threadId} workflowName=${nameForLog}\n`,
);
}
const result = handlers.killWorkflowByRunId(body.threadId);
sendJson(res, result.ok ? 200 : 400, result);
return;
}
sendJson(res, 404, { ok: false, error: "Not found" });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
sendJson(res, 500, { ok: false, error: msg });
}
});
server.listen(port, HTTP_API_BIND_HOST, () => {
process.stderr.write(`[http-api] listening on http://${HTTP_API_BIND_HOST}:${String(port)}\n`);
});
server.on("error", (err) => {
process.stderr.write(`[http-api] server error: ${err.message}\n`);
});
async function close(): Promise<void> {
await new Promise<void>((resolve, reject) => {
server.close((e) => {
if (e !== undefined && e !== null) reject(e);
else resolve();
});
});
}
return { close };
}
+87 -22
View File
@@ -3,17 +3,23 @@
* optional file watcher, and daemon IPC.
*/
import { join } from "node:path";
import { readFileSync } from "node:fs";
import { hostname } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
import type { HealthInfo, NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
import { createLogStore } from "@uncaged/nerve-store";
import type { LogStore } from "@uncaged/nerve-store";
import { createDaemonHandlers } from "./daemon-handlers.js";
import { createDaemonIpcServer } from "./daemon-ipc.js";
import type { DaemonIpcServer } from "./daemon-ipc.js";
import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js";
import { createHttpApiServer } from "./http-api.js";
import type { HttpApiServer } from "./http-api.js";
import { parseWorkerMessage } from "./ipc.js";
import { createKernelFileWatchHandlers } from "./kernel-file-watch.js";
import {
@@ -53,6 +59,8 @@ export type Kernel = {
restartGroup: (group: string) => Promise<void>;
reloadConfig: (newConfig: NerveConfig) => void;
getHealth: () => KernelHealth;
/** HTTP/IPC-oriented health (version, uptime seconds, hostname). */
getDaemonHealth: () => HealthInfo;
};
export type KernelOptions = {
@@ -60,12 +68,37 @@ export type KernelOptions = {
enableFileWatcher?: boolean;
logStore?: LogStore;
ipcSocketPath?: string | null;
/**
* When set to a positive integer, overrides `nerve.yaml` `api.port` for the HTTP API.
* When `null` or omitted, only `config.api.port` from YAML is used.
*/
httpApiPortOverride?: number | null;
};
function defaultKernelOptions(): KernelOptions {
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
}
function readDaemonPackageVersion(): string {
try {
const here = fileURLToPath(import.meta.url);
const pkgPath = join(dirname(here), "..", "package.json");
const raw = readFileSync(pkgPath, "utf8");
const o: unknown = JSON.parse(raw);
if (
typeof o === "object" &&
o !== null &&
"version" in o &&
typeof (o as { version: unknown }).version === "string"
) {
return (o as { version: string }).version;
}
return "0.0.0";
} catch {
return "0.0.0";
}
}
export function createKernel(
initialConfig: NerveConfig,
nerveRoot: string,
@@ -74,6 +107,8 @@ export function createKernel(
const bus: SignalBus = createSignalBus();
const workerScript = options.workerScript ?? resolveWorkerScript();
const startTime = Date.now();
const startedAtIso = new Date(startTime).toISOString();
const daemonVersion = readDaemonPackageVersion();
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
logStore.append({
@@ -292,6 +327,16 @@ export function createKernel(
};
}
function getDaemonHealth(): HealthInfo {
return {
ok: true,
version: daemonVersion,
uptime: Math.floor((Date.now() - startTime) / 1000),
startedAt: startedAtIso,
hostname: hostname(),
};
}
const fileWatchHandlers = createKernelFileWatchHandlers({
nerveRoot,
getConfig: () => config,
@@ -310,28 +355,43 @@ export function createKernel(
});
}
const daemonHandlers = createDaemonHandlers({
workflowManager,
triggerSense,
listSenses(): SenseInfo[] {
return Object.entries(config.senses).map(([name, senseConfig]) => {
const entries = logStore.query({
source: "sense",
type: "signal",
refId: name,
});
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
return {
name,
group: senseConfig.group,
throttle: senseConfig.throttle,
timeout: senseConfig.timeout,
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
};
});
},
getHealthInfo: getDaemonHealth,
getDefaultMaxRounds: () => config.maxRounds,
});
let ipcServer: DaemonIpcServer | null = null;
if (options.ipcSocketPath != null) {
ipcServer = createDaemonIpcServer(options.ipcSocketPath, workflowManager, {
triggerSense,
listSenses(): SenseInfo[] {
return Object.entries(config.senses).map(([name, senseConfig]) => {
const entries = logStore.query({
source: "sense",
type: "signal",
refId: name,
});
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
return {
name,
group: senseConfig.group,
throttle: senseConfig.throttle,
timeout: senseConfig.timeout,
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
};
});
},
});
ipcServer = createDaemonIpcServer(options.ipcSocketPath, daemonHandlers);
}
let httpApiServer: HttpApiServer | null = null;
const httpPortOverride = options.httpApiPortOverride;
const effectiveHttpPort =
httpPortOverride !== undefined && httpPortOverride !== null && httpPortOverride > 0
? httpPortOverride
: initialConfig.api.port;
if (effectiveHttpPort !== null && effectiveHttpPort > 0) {
httpApiServer = createHttpApiServer(effectiveHttpPort, daemonHandlers);
}
async function stop(): Promise<void> {
@@ -344,6 +404,10 @@ export function createKernel(
await ipcServer.close();
ipcServer = null;
}
if (httpApiServer !== null) {
await httpApiServer.close();
httpApiServer = null;
}
scheduler.stop();
await workflowManager.stop();
await senseWorkerPool.shutdownAll();
@@ -377,5 +441,6 @@ export function createKernel(
restartGroup: (group) => senseWorkerPool.restartGroup(group),
reloadConfig,
getHealth,
getDaemonHealth,
};
}
+25 -1
View File
@@ -11,7 +11,12 @@ import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig, WorkflowMessage } from "@uncaged/nerve-core";
import type {
NerveConfig,
WorkflowConfig,
WorkflowMessage,
WorkflowStatus,
} from "@uncaged/nerve-core";
import { START, isPlainRecord } from "@uncaged/nerve-core";
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
@@ -49,6 +54,8 @@ export type WorkflowManager = {
queueLength: (workflowName: string) => number;
/** Total active workflow threads across all workflows. */
totalActiveCount: () => number;
/** One row per workflow key in config (sorted by name), using current concurrency / overflow settings. */
listWorkflows: () => WorkflowStatus[];
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
updateConfig: (newConfig: NerveConfig) => void;
/**
@@ -676,6 +683,22 @@ export function createWorkflowManager(
return total;
}
function listWorkflows(): WorkflowStatus[] {
const names = Object.keys(config.workflows).sort();
const out: WorkflowStatus[] = [];
for (const name of names) {
const wf = config.workflows[name];
if (wf === undefined) continue;
out.push({
name,
activeThreads: activeCount(name),
queuedThreads: queueLength(name),
config: { concurrency: wf.concurrency, overflow: wf.overflow },
});
}
return out;
}
function updateConfig(newConfig: NerveConfig): void {
config = newConfig;
}
@@ -756,6 +779,7 @@ export function createWorkflowManager(
activeCount,
queueLength,
totalActiveCount,
listWorkflows,
updateConfig,
drainAndRespawn,
drainWhenIdle,