feat(http-api): Phase 2 — CLI remote access + bearer token auth #137
@@ -0,0 +1,31 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import {
|
||||
consumeGlobalDaemonCliFlags,
|
||||
getCliDaemonApiToken,
|
||||
getCliDaemonHost,
|
||||
} from "../cli-global.js";
|
||||
|
||||
describe("consumeGlobalDaemonCliFlags", () => {
|
||||
it("strips --host and --api-token and populates getters", () => {
|
||||
const out = consumeGlobalDaemonCliFlags([
|
||||
"--host",
|
||||
"192.168.1.5:9800",
|
||||
"--api-token=abc",
|
||||
"sense",
|
||||
"list",
|
||||
]);
|
||||
expect(out).toEqual(["sense", "list"]);
|
||||
expect(getCliDaemonHost()).toBe("192.168.1.5:9800");
|
||||
expect(getCliDaemonApiToken()).toBe("abc");
|
||||
});
|
||||
|
||||
it("supports --host=value form", () => {
|
||||
consumeGlobalDaemonCliFlags(["--host=luming:9800", "status"]);
|
||||
expect(getCliDaemonHost()).toBe("luming:9800");
|
||||
});
|
||||
|
||||
it("throws when --host has no value", () => {
|
||||
expect(() => consumeGlobalDaemonCliFlags(["--host", "--api-token", "x"])).toThrow(/--host/);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,85 @@
|
||||
let cliDaemonHost: string | null = null;
|
||||
let cliDaemonApiToken: string | null = null;
|
||||
|
||||
function readEqOrNextFlag(
|
||||
argv: string[],
|
||||
i: number,
|
||||
flagEq: string,
|
||||
flagWord: string,
|
||||
missingMsg: string,
|
||||
): { value: string; lastConsumedIndex: number } | null {
|
||||
const a = argv[i];
|
||||
if (a === undefined) return null;
|
||||
if (a.startsWith(flagEq)) {
|
||||
const value = a.slice(flagEq.length);
|
||||
if (value.length === 0 || value.startsWith("-")) {
|
||||
throw new Error(missingMsg);
|
||||
}
|
||||
return { value, lastConsumedIndex: i };
|
||||
}
|
||||
if (a === flagWord) {
|
||||
const v = argv[i + 1];
|
||||
if (v === undefined || v.length === 0 || v.startsWith("-")) {
|
||||
throw new Error(missingMsg);
|
||||
}
|
||||
return { value: v, lastConsumedIndex: i + 1 };
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes `--host` / `--api-token` from argv before citty parses subcommands.
|
||||
* Must run once at process startup (see `cli.ts`).
|
||||
*/
|
||||
export function consumeGlobalDaemonCliFlags(argv: string[]): string[] {
|
||||
cliDaemonHost = null;
|
||||
cliDaemonApiToken = null;
|
||||
const out: string[] = [];
|
||||
|
||||
for (let i = 0; i < argv.length; i++) {
|
||||
const a = argv[i];
|
||||
if (a === undefined) continue;
|
||||
|
||||
const hostRead = readEqOrNextFlag(
|
||||
argv,
|
||||
i,
|
||||
"--host=",
|
||||
"--host",
|
||||
"--host requires a non-empty value (e.g. 192.168.1.1:9800)",
|
||||
);
|
||||
if (hostRead !== null) {
|
||||
cliDaemonHost = hostRead.value;
|
||||
i = hostRead.lastConsumedIndex;
|
||||
continue;
|
||||
}
|
||||
|
||||
const tokenRead = readEqOrNextFlag(
|
||||
argv,
|
||||
i,
|
||||
"--api-token=",
|
||||
"--api-token",
|
||||
"--api-token requires a value",
|
||||
);
|
||||
if (tokenRead !== null) {
|
||||
cliDaemonApiToken = tokenRead.value.length > 0 ? tokenRead.value : null;
|
||||
i = tokenRead.lastConsumedIndex;
|
||||
continue;
|
||||
}
|
||||
|
||||
out.push(a);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
export function isRemoteDaemonCli(): boolean {
|
||||
return cliDaemonHost !== null && cliDaemonHost.length > 0;
|
||||
}
|
||||
|
||||
export function getCliDaemonHost(): string | null {
|
||||
return cliDaemonHost;
|
||||
}
|
||||
|
||||
export function getCliDaemonApiToken(): string | null {
|
||||
return cliDaemonApiToken;
|
||||
}
|
||||
+12
-2
@@ -1,5 +1,6 @@
|
||||
import { defineCommand, runMain } from "citty";
|
||||
|
||||
import { consumeGlobalDaemonCliFlags } from "./cli-global.js";
|
||||
import { daemonCommand } from "./commands/daemon.js";
|
||||
import { devCommand } from "./commands/dev.js";
|
||||
import { initCommand } from "./commands/init.js";
|
||||
@@ -35,7 +36,8 @@ function normalizeNerveArgv(argv: string[]): string[] {
|
||||
const main = defineCommand({
|
||||
meta: {
|
||||
name: "nerve",
|
||||
description: "Nerve — an AI agent kernel",
|
||||
description:
|
||||
"Nerve — an AI agent kernel. Global options: --host <host:port> (remote HTTP), --api-token <secret> (Bearer auth).",
|
||||
},
|
||||
subCommands: {
|
||||
init: initCommand,
|
||||
@@ -52,4 +54,12 @@ const main = defineCommand({
|
||||
},
|
||||
});
|
||||
|
||||
runMain(main, { rawArgs: normalizeNerveArgv(process.argv.slice(2)) });
|
||||
let cliArgv = process.argv.slice(2);
|
||||
try {
|
||||
cliArgv = consumeGlobalDaemonCliFlags(cliArgv);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
runMain(main, { rawArgs: normalizeNerveArgv(cliArgv) });
|
||||
|
||||
@@ -5,7 +5,8 @@ import type { DatabaseSync } from "node:sqlite";
|
||||
import { type SenseInfo, isPlainRecord, parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js";
|
||||
import { isRemoteDaemonCli } from "../cli-global.js";
|
||||
import { resolveDaemonTransport } from "../daemon-client.js";
|
||||
import {
|
||||
defaultPreviewSql,
|
||||
formatRowsAsAlignedTable,
|
||||
@@ -14,7 +15,7 @@ import {
|
||||
parseSenseQueryArgs,
|
||||
pickDefaultPreviewTable,
|
||||
} from "../sense-sqlite.js";
|
||||
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
|
||||
import { getNerveRoot, isRunning } from "../workspace.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Formatting helpers (exported for tests)
|
||||
@@ -79,7 +80,7 @@ const senseListCommand = defineCommand({
|
||||
description: "List all registered senses and their status",
|
||||
},
|
||||
async run() {
|
||||
if (!isRunning()) {
|
||||
if (!isRemoteDaemonCli() && !isRunning()) {
|
||||
process.stderr.write(
|
||||
"⚠️ Daemon is not running — showing static config only (no last signal time).\n\n",
|
||||
);
|
||||
@@ -89,22 +90,17 @@ const senseListCommand = defineCommand({
|
||||
return;
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: { ok: true; senses: SenseInfo[] } | { ok: false; error: string };
|
||||
const transport = resolveDaemonTransport();
|
||||
let senses: SenseInfo[];
|
||||
try {
|
||||
response = await listSensesViaDaemon(socketPath);
|
||||
senses = await transport.listSenses();
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
process.stderr.write(`❌ Daemon error: ${response.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
process.stdout.write(formatSenseList(response.senses));
|
||||
process.stdout.write(formatSenseList(senses));
|
||||
},
|
||||
});
|
||||
|
||||
@@ -124,15 +120,15 @@ const senseTriggerCommand = defineCommand({
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
if (!isRunning()) {
|
||||
if (!isRemoteDaemonCli() && !isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
const transport = resolveDaemonTransport();
|
||||
let response: { ok: true } | { ok: false; error: string };
|
||||
try {
|
||||
response = await triggerSenseViaDaemon(socketPath, args.name);
|
||||
response = await transport.triggerSense(args.name);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
|
||||
@@ -4,6 +4,8 @@ import { join } from "node:path";
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { isRemoteDaemonCli } from "../cli-global.js";
|
||||
import { resolveDaemonTransport } from "../daemon-client.js";
|
||||
import { getNerveRoot, getPidPath, isRunning, readPidFile } from "../workspace.js";
|
||||
|
||||
function formatUptime(ms: number): string {
|
||||
@@ -42,6 +44,23 @@ export const statusCommand = defineCommand({
|
||||
description: "Show nerve daemon status",
|
||||
},
|
||||
async run() {
|
||||
if (isRemoteDaemonCli()) {
|
||||
const transport = resolveDaemonTransport();
|
||||
try {
|
||||
const health = await transport.health();
|
||||
process.stdout.write("✅ Nerve daemon is reachable (remote HTTP).\n");
|
||||
process.stdout.write(` hostname: ${health.hostname}\n`);
|
||||
process.stdout.write(` version: ${health.version}\n`);
|
||||
process.stdout.write(` uptime: ${formatUptime(health.uptime * 1000)}\n`);
|
||||
process.stdout.write(` started: ${health.startedAt}\n`);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Cannot reach remote daemon: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isRunning()) {
|
||||
process.stdout.write("😴 Nerve daemon is not running.\n");
|
||||
return;
|
||||
|
||||
@@ -1,20 +1,16 @@
|
||||
import { existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import type { DaemonIpcTriggerResponse } from "@uncaged/nerve-core";
|
||||
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
import { stringify } from "yaml";
|
||||
|
||||
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
|
||||
import {
|
||||
killWorkflowViaDaemon,
|
||||
listWorkflowsViaDaemon,
|
||||
triggerWorkflowViaDaemon,
|
||||
} from "../daemon-client.js";
|
||||
import { isRemoteDaemonCli } from "../cli-global.js";
|
||||
import { resolveDaemonTransport } from "../daemon-client.js";
|
||||
import { formatRowsAsAlignedTable } from "../sense-sqlite.js";
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
|
||||
import { getNerveRoot, isRunning } from "../workspace.js";
|
||||
|
||||
export const DEFAULT_PAGE_SIZE = 20;
|
||||
|
||||
@@ -312,27 +308,22 @@ const workflowDaemonListCommand = defineCommand({
|
||||
description: "List workflows from the running daemon (concurrency, active, queued)",
|
||||
},
|
||||
async run() {
|
||||
if (!isRunning()) {
|
||||
if (!isRemoteDaemonCli() && !isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: Awaited<ReturnType<typeof listWorkflowsViaDaemon>>;
|
||||
const transport = resolveDaemonTransport();
|
||||
let workflows: Awaited<ReturnType<typeof transport.listWorkflows>>;
|
||||
try {
|
||||
response = await listWorkflowsViaDaemon(socketPath);
|
||||
workflows = await transport.listWorkflows();
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
process.stderr.write(`❌ Daemon error: ${response.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const rows = response.workflows.map((w) => ({
|
||||
const rows = workflows.map((w) => ({
|
||||
name: w.name,
|
||||
active: w.activeThreads,
|
||||
queued: w.queuedThreads,
|
||||
@@ -594,15 +585,15 @@ const workflowTriggerCommand = defineCommand({
|
||||
if (typeof p.dryRun === "boolean") dryRun = p.dryRun;
|
||||
}
|
||||
|
||||
if (!isRunning()) {
|
||||
if (!isRemoteDaemonCli() && !isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: DaemonIpcTriggerResponse;
|
||||
const transport = resolveDaemonTransport();
|
||||
let response: { ok: true } | { ok: false; error: string };
|
||||
try {
|
||||
response = await triggerWorkflowViaDaemon(socketPath, args.name, prompt, maxRounds, dryRun);
|
||||
response = await transport.triggerWorkflow(args.name, { prompt, maxRounds, dryRun });
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
@@ -635,15 +626,15 @@ const workflowKillCommand = defineCommand({
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
if (!isRunning()) {
|
||||
if (!isRemoteDaemonCli() && !isRunning()) {
|
||||
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const socketPath = getSocketPath();
|
||||
let response: DaemonIpcTriggerResponse;
|
||||
const transport = resolveDaemonTransport();
|
||||
let response: { ok: true } | { ok: false; error: string };
|
||||
try {
|
||||
response = await killWorkflowViaDaemon(socketPath, args.runId);
|
||||
response = await transport.killWorkflow(args.runId);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
|
||||
|
||||
@@ -15,41 +15,27 @@ import type {
|
||||
DaemonIpcTriggerResponse,
|
||||
DaemonTransport,
|
||||
DaemonTransportTriggerResult,
|
||||
DaemonTransportWorkflowLaunch,
|
||||
HealthInfo,
|
||||
SenseInfo,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core";
|
||||
import {
|
||||
DEFAULT_ENGINE_MAX_ROUNDS,
|
||||
isPlainRecord,
|
||||
isSenseInfo,
|
||||
isWorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
|
||||
import { getCliDaemonApiToken, getCliDaemonHost } from "./cli-global.js";
|
||||
import { HttpTransport } from "./http-transport.js";
|
||||
import { getSocketPath } from "./workspace.js";
|
||||
|
||||
const CONNECT_TIMEOUT_MS = 3_000;
|
||||
const RESPONSE_TIMEOUT_MS = 5_000;
|
||||
|
||||
export type { SenseInfo };
|
||||
|
||||
function isSenseInfo(value: unknown): value is SenseInfo {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
return (
|
||||
typeof value.name === "string" &&
|
||||
typeof value.group === "string" &&
|
||||
(value.throttle === null || typeof value.throttle === "number") &&
|
||||
(value.timeout === null || typeof value.timeout === "number") &&
|
||||
(value.lastSignalTimestamp === null || typeof value.lastSignalTimestamp === "number")
|
||||
);
|
||||
}
|
||||
|
||||
function isWorkflowStatus(value: unknown): value is WorkflowStatus {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
const cfg = value.config;
|
||||
if (!isPlainRecord(cfg)) return false;
|
||||
return (
|
||||
typeof value.name === "string" &&
|
||||
typeof value.activeThreads === "number" &&
|
||||
typeof value.queuedThreads === "number" &&
|
||||
typeof cfg.concurrency === "number" &&
|
||||
typeof cfg.overflow === "string"
|
||||
);
|
||||
}
|
||||
|
||||
function parseDaemonResponse(line: string): DaemonIpcTriggerResponse {
|
||||
try {
|
||||
const obj: unknown = JSON.parse(line);
|
||||
@@ -238,13 +224,19 @@ export class UnixTransport implements DaemonTransport {
|
||||
);
|
||||
}
|
||||
|
||||
async triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult> {
|
||||
async triggerWorkflow(
|
||||
name: string,
|
||||
launch: DaemonTransportWorkflowLaunch | null,
|
||||
): Promise<DaemonTransportTriggerResult> {
|
||||
const prompt = launch !== null ? launch.prompt : "";
|
||||
const maxRounds = launch !== null ? launch.maxRounds : DEFAULT_ENGINE_MAX_ROUNDS;
|
||||
const dryRun = launch !== null ? launch.dryRun : false;
|
||||
const message: DaemonIpcRequest = {
|
||||
type: "trigger-workflow",
|
||||
workflow: name,
|
||||
prompt: "",
|
||||
maxRounds: DEFAULT_ENGINE_MAX_ROUNDS,
|
||||
dryRun: false,
|
||||
prompt,
|
||||
maxRounds,
|
||||
dryRun,
|
||||
};
|
||||
return sendAndReceive(this.socketPath, message, parseDaemonResponse);
|
||||
}
|
||||
@@ -318,3 +310,17 @@ export function killWorkflowViaDaemon(
|
||||
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
|
||||
return sendAndReceive(socketPath, message, parseDaemonResponse);
|
||||
}
|
||||
|
||||
/** Unix socket when no `--host`; otherwise {@link HttpTransport} for remote HTTP API. */
|
||||
export function resolveDaemonTransport(): DaemonTransport {
|
||||
const host = getCliDaemonHost();
|
||||
if (host !== null && host.length > 0) {
|
||||
const tok = getCliDaemonApiToken();
|
||||
return tok !== null && tok.length > 0
|
||||
? new HttpTransport({ host, token: tok })
|
||||
: new HttpTransport({ host });
|
||||
}
|
||||
return new UnixTransport(getSocketPath());
|
||||
}
|
||||
|
||||
export { HttpTransport } from "./http-transport.js";
|
||||
|
||||
@@ -0,0 +1,185 @@
|
||||
import type {
|
||||
DaemonTransport,
|
||||
DaemonTransportTriggerResult,
|
||||
DaemonTransportWorkflowLaunch,
|
||||
HealthInfo,
|
||||
SenseInfo,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import {
|
||||
DEFAULT_ENGINE_MAX_ROUNDS,
|
||||
isPlainRecord,
|
||||
isSenseInfo,
|
||||
isWorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
|
||||
function normalizeBaseUrl(host: string): string {
|
||||
const t = host.trim();
|
||||
const withScheme = t.startsWith("http://") || t.startsWith("https://") ? t : `http://${t}`;
|
||||
return withScheme.endsWith("/") ? withScheme.slice(0, -1) : withScheme;
|
||||
}
|
||||
|
||||
function isHealthInfo(value: unknown): value is HealthInfo {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
return (
|
||||
typeof value.ok === "boolean" &&
|
||||
typeof value.version === "string" &&
|
||||
typeof value.uptime === "number" &&
|
||||
typeof value.startedAt === "string" &&
|
||||
typeof value.hostname === "string"
|
||||
);
|
||||
}
|
||||
|
||||
async function readJsonBody(res: Response): Promise<unknown> {
|
||||
const text = await res.text();
|
||||
if (text.trim().length === 0) return null;
|
||||
try {
|
||||
return JSON.parse(text) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function httpErrorMessage(status: number, body: unknown): string {
|
||||
if (isPlainRecord(body) && body.ok === false && typeof body.error === "string") {
|
||||
return body.error;
|
||||
}
|
||||
return `HTTP ${String(status)}`;
|
||||
}
|
||||
|
||||
/** Remote daemon control plane via JSON HTTP API (Phase 2). */
|
||||
export class HttpTransport implements DaemonTransport {
|
||||
private readonly baseUrl: string;
|
||||
private readonly token: string | null;
|
||||
|
||||
constructor(opts: { host: string; token?: string | null }) {
|
||||
this.baseUrl = normalizeBaseUrl(opts.host);
|
||||
this.token =
|
||||
opts.token !== undefined && opts.token !== null && opts.token.length > 0 ? opts.token : null;
|
||||
}
|
||||
|
||||
private baseHeaders(): Record<string, string> {
|
||||
const h: Record<string, string> = { Accept: "application/json" };
|
||||
if (this.token !== null) {
|
||||
h.Authorization = `Bearer ${this.token}`;
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
async health(): Promise<HealthInfo> {
|
||||
const res = await fetch(`${this.baseUrl}/api/health`, {
|
||||
headers: this.baseHeaders(),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
if (!res.ok || !isHealthInfo(body)) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
return body;
|
||||
}
|
||||
|
||||
async listSenses(): Promise<SenseInfo[]> {
|
||||
const res = await fetch(`${this.baseUrl}/api/senses`, {
|
||||
headers: this.baseHeaders(),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
if (!res.ok || !isPlainRecord(body) || !Array.isArray(body.senses)) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
if (!body.senses.every(isSenseInfo)) {
|
||||
throw new Error("Unexpected senses payload from daemon HTTP API");
|
||||
}
|
||||
return body.senses;
|
||||
}
|
||||
|
||||
async listWorkflows(): Promise<WorkflowStatus[]> {
|
||||
const res = await fetch(`${this.baseUrl}/api/workflows`, {
|
||||
headers: this.baseHeaders(),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
if (!res.ok || !isPlainRecord(body) || !Array.isArray(body.workflows)) {
|
||||
throw new Error(httpErrorMessage(res.status, body));
|
||||
}
|
||||
if (!body.workflows.every(isWorkflowStatus)) {
|
||||
throw new Error("Unexpected workflows payload from daemon HTTP API");
|
||||
}
|
||||
return body.workflows;
|
||||
}
|
||||
|
||||
async triggerSense(name: string): Promise<DaemonTransportTriggerResult> {
|
||||
const res = await fetch(`${this.baseUrl}/api/trigger-sense`, {
|
||||
method: "POST",
|
||||
headers: { ...this.baseHeaders(), "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ name }),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (!isPlainRecord(body)) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (body.ok === true) return { ok: true };
|
||||
if (body.ok === false && typeof body.error === "string")
|
||||
return { ok: false, error: body.error };
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
|
||||
async triggerWorkflow(
|
||||
name: string,
|
||||
launch: DaemonTransportWorkflowLaunch | null,
|
||||
): Promise<DaemonTransportTriggerResult> {
|
||||
const L =
|
||||
launch !== null
|
||||
? launch
|
||||
: { prompt: "", maxRounds: DEFAULT_ENGINE_MAX_ROUNDS, dryRun: false };
|
||||
const res = await fetch(`${this.baseUrl}/api/trigger-workflow`, {
|
||||
method: "POST",
|
||||
headers: { ...this.baseHeaders(), "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
name,
|
||||
prompt: L.prompt,
|
||||
maxRounds: L.maxRounds,
|
||||
dryRun: L.dryRun,
|
||||
}),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (!isPlainRecord(body)) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (body.ok === true) return { ok: true };
|
||||
if (body.ok === false && typeof body.error === "string")
|
||||
return { ok: false, error: body.error };
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
|
||||
async killWorkflow(runId: string): Promise<DaemonTransportTriggerResult> {
|
||||
const res = await fetch(`${this.baseUrl}/api/kill-workflow`, {
|
||||
method: "POST",
|
||||
headers: { ...this.baseHeaders(), "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ threadId: runId }),
|
||||
});
|
||||
const body = await readJsonBody(res);
|
||||
if (res.status === 401) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (!isPlainRecord(body)) {
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
if (body.ok === true) return { ok: true };
|
||||
if (body.ok === false && typeof body.error === "string")
|
||||
return { ok: false, error: body.error };
|
||||
return { ok: false, error: httpErrorMessage(res.status, body) };
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,7 @@ describe("parseNerveConfig", () => {
|
||||
overflow: "queue",
|
||||
maxQueue: 10,
|
||||
});
|
||||
expect(result.value.api).toEqual({ port: null });
|
||||
expect(result.value.api).toEqual({ port: null, token: null, host: "127.0.0.1" });
|
||||
});
|
||||
|
||||
it("parses config with empty reflexes array", () => {
|
||||
@@ -184,11 +184,94 @@ api:
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.api).toEqual({ port: 9800 });
|
||||
expect(result.value.api).toEqual({ port: 9800, token: null, host: "127.0.0.1" });
|
||||
});
|
||||
|
||||
it("allows api.host localhost without token (loopback)", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: localhost
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.api).toEqual({ port: 9800, token: null, host: "localhost" });
|
||||
});
|
||||
|
||||
it("parses api.token and api.host when port is set", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
token: "secret"
|
||||
host: "0.0.0.0"
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.api).toEqual({ port: 9800, token: "secret", host: "0.0.0.0" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("invalid configs", () => {
|
||||
it("returns error when api.token is empty string", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
token: ""
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/api\.token/);
|
||||
});
|
||||
|
||||
it("returns error when api.host is empty string", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: ""
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/api\.host/);
|
||||
});
|
||||
|
||||
it("returns error when api.host is non-loopback and api.token is not set", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: "0.0.0.0"
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toBe(
|
||||
"api.host binds to non-loopback address, api.token is required for security",
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when api.port is out of range", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
|
||||
@@ -28,9 +28,13 @@ export type QueueOverflowConfig = {
|
||||
|
||||
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
|
||||
|
||||
/** Optional HTTP control plane (Phase 1: no auth). When `port` is null, the HTTP server is not started. */
|
||||
/** Optional HTTP control plane. When `port` is null, the HTTP server is not started. */
|
||||
export type NerveApiConfig = {
|
||||
port: number | null;
|
||||
/** When set, HTTP API requires `Authorization: Bearer <token>`. */
|
||||
token: string | null;
|
||||
/** Bind address (e.g. `127.0.0.1`, `0.0.0.0`). Meaningful when `port` is set. */
|
||||
host: string;
|
||||
};
|
||||
|
||||
export type NerveConfig = {
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
import type { WorkflowStatus } from "./daemon-ipc-protocol.js";
|
||||
import { isPlainRecord } from "./is-plain-record.js";
|
||||
import type { SenseInfo } from "./sense.js";
|
||||
|
||||
/** Type guard for JSON {@link SenseInfo} payloads from daemon HTTP/IPC. */
|
||||
export function isSenseInfo(value: unknown): value is SenseInfo {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
return (
|
||||
typeof value.name === "string" &&
|
||||
typeof value.group === "string" &&
|
||||
(value.throttle === null || typeof value.throttle === "number") &&
|
||||
(value.timeout === null || typeof value.timeout === "number") &&
|
||||
(value.lastSignalTimestamp === null || typeof value.lastSignalTimestamp === "number")
|
||||
);
|
||||
}
|
||||
|
||||
/** Type guard for JSON {@link WorkflowStatus} payloads from daemon HTTP/IPC. */
|
||||
export function isWorkflowStatus(value: unknown): value is WorkflowStatus {
|
||||
if (!isPlainRecord(value)) return false;
|
||||
const cfg = value.config;
|
||||
if (!isPlainRecord(cfg)) return false;
|
||||
return (
|
||||
typeof value.name === "string" &&
|
||||
typeof value.activeThreads === "number" &&
|
||||
typeof value.queuedThreads === "number" &&
|
||||
typeof cfg.concurrency === "number" &&
|
||||
typeof cfg.overflow === "string"
|
||||
);
|
||||
}
|
||||
@@ -3,6 +3,12 @@ import type { SenseInfo } from "./sense.js";
|
||||
|
||||
export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string };
|
||||
|
||||
export type DaemonTransportWorkflowLaunch = {
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
/**
|
||||
* Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2).
|
||||
* Implementations live in CLI / tools; the daemon kernel uses shared handler logic.
|
||||
@@ -12,7 +18,11 @@ export type DaemonTransport = {
|
||||
listSenses(): Promise<SenseInfo[]>;
|
||||
listWorkflows(): Promise<WorkflowStatus[]>;
|
||||
triggerSense(name: string): Promise<DaemonTransportTriggerResult>;
|
||||
triggerWorkflow(name: string): Promise<DaemonTransportTriggerResult>;
|
||||
/** When `launch` is null, implementations use engine defaults (empty prompt, default max rounds, dryRun false). */
|
||||
triggerWorkflow(
|
||||
name: string,
|
||||
launch: DaemonTransportWorkflowLaunch | null,
|
||||
): Promise<DaemonTransportTriggerResult>;
|
||||
/** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */
|
||||
killWorkflow(runId: string): Promise<DaemonTransportTriggerResult>;
|
||||
};
|
||||
|
||||
@@ -35,6 +35,7 @@ export {
|
||||
routeSenseComputeOutput,
|
||||
} from "./sense-workflow-directive.js";
|
||||
|
||||
export { isSenseInfo, isWorkflowStatus } from "./daemon-payload-guards.js";
|
||||
export type {
|
||||
WorkflowStatus,
|
||||
HealthInfo,
|
||||
@@ -54,4 +55,8 @@ export type {
|
||||
DaemonIpcResponse,
|
||||
} from "./daemon-ipc-protocol.js";
|
||||
export { parseDaemonIpcRequest } from "./daemon-ipc-protocol.js";
|
||||
export type { DaemonTransport, DaemonTransportTriggerResult } from "./daemon-transport.js";
|
||||
export type {
|
||||
DaemonTransport,
|
||||
DaemonTransportTriggerResult,
|
||||
DaemonTransportWorkflowLaunch,
|
||||
} from "./daemon-transport.js";
|
||||
|
||||
@@ -251,16 +251,50 @@ function parseReflexes(
|
||||
return ok(reflexes);
|
||||
}
|
||||
|
||||
const DEFAULT_API_BIND_HOST = "127.0.0.1";
|
||||
|
||||
/** Hosts that may bind the HTTP API without `api.token` (loopback-only). */
|
||||
function isLoopbackOnlyApiHost(host: string): boolean {
|
||||
const h = host.trim();
|
||||
return h === "127.0.0.1" || h.toLowerCase() === "localhost";
|
||||
}
|
||||
|
||||
function parseApiTokenField(api: Record<string, unknown>): Result<string | null> {
|
||||
if (api.token === undefined || api.token === null) {
|
||||
return ok(null);
|
||||
}
|
||||
if (typeof api.token !== "string") {
|
||||
return err(new Error("api.token: must be a string when provided"));
|
||||
}
|
||||
if (api.token.length === 0) {
|
||||
return err(new Error("api.token: must not be empty when provided"));
|
||||
}
|
||||
return ok(api.token);
|
||||
}
|
||||
|
||||
function parseApiHostField(api: Record<string, unknown>): Result<string> {
|
||||
if (api.host === undefined || api.host === null) {
|
||||
return ok(DEFAULT_API_BIND_HOST);
|
||||
}
|
||||
if (typeof api.host !== "string") {
|
||||
return err(new Error("api.host: must be a string when provided"));
|
||||
}
|
||||
if (api.host.length === 0) {
|
||||
return err(new Error("api.host: must not be empty when provided"));
|
||||
}
|
||||
return ok(api.host);
|
||||
}
|
||||
|
||||
function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
|
||||
if (obj.api === undefined || obj.api === null) {
|
||||
return ok({ port: null });
|
||||
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
|
||||
}
|
||||
if (!isPlainRecord(obj.api)) {
|
||||
return err(new Error("api: must be an object if provided"));
|
||||
}
|
||||
const api = obj.api;
|
||||
if (api.port === undefined || api.port === null) {
|
||||
return ok({ port: null });
|
||||
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
|
||||
}
|
||||
if (
|
||||
typeof api.port !== "number" ||
|
||||
@@ -270,7 +304,21 @@ function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
|
||||
) {
|
||||
return err(new Error("api.port: must be an integer between 1 and 65535 if provided"));
|
||||
}
|
||||
return ok({ port: api.port });
|
||||
|
||||
const tokenResult = parseApiTokenField(api);
|
||||
if (!tokenResult.ok) return tokenResult;
|
||||
const hostResult = parseApiHostField(api);
|
||||
if (!hostResult.ok) return hostResult;
|
||||
|
||||
if (!isLoopbackOnlyApiHost(hostResult.value) && tokenResult.value === null) {
|
||||
return err(
|
||||
new Error(
|
||||
"api.host binds to non-loopback address, api.token is required for security",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({ port: api.port, token: tokenResult.value, host: hostResult.value });
|
||||
}
|
||||
|
||||
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
|
||||
|
||||
@@ -65,7 +65,7 @@ function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig
|
||||
reflexes: [],
|
||||
workflows,
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,13 @@ const { createWorkflowManager } = await import("../workflow-manager.js");
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
|
||||
return { senses: {}, reflexes: [], workflows, maxRounds: 10, api: { port: null } };
|
||||
return {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows,
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
}
|
||||
|
||||
function makeLogStore() {
|
||||
@@ -454,7 +460,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
@@ -490,7 +496,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
const kernel = createKernel(initialConfig, nerveRoot, {
|
||||
@@ -512,7 +518,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
@@ -535,7 +541,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
const kernel = createKernel(initialConfig, nerveRoot, {
|
||||
@@ -552,7 +558,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { DaemonHandlerBundle } from "../daemon-handlers.js";
|
||||
import { type HttpApiServer, createHttpApiServer } from "../http-api.js";
|
||||
|
||||
const handlers: DaemonHandlerBundle = {
|
||||
health: () => ({
|
||||
ok: true,
|
||||
version: "0-test",
|
||||
uptime: 2,
|
||||
startedAt: "2020-01-01T00:00:00.000Z",
|
||||
hostname: "test-host",
|
||||
}),
|
||||
listSenses: () => [],
|
||||
listWorkflows: () => [],
|
||||
getDefaultMaxRounds: () => 10,
|
||||
triggerSense: vi.fn((): { ok: true } => ({ ok: true })),
|
||||
triggerWorkflow: vi.fn((): { ok: true } => ({ ok: true })),
|
||||
killWorkflowByRunId: vi.fn((): { ok: true } => ({ ok: true })),
|
||||
};
|
||||
|
||||
describe("createHttpApiServer — bearer auth", () => {
|
||||
let srv: HttpApiServer | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (srv !== null) {
|
||||
await srv.close();
|
||||
srv = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("serves GET /api/health without Authorization when token is not configured", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: null }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/health`);
|
||||
expect(res.status).toBe(200);
|
||||
const body = (await res.json()) as { version: string };
|
||||
expect(body.version).toBe("0-test");
|
||||
});
|
||||
|
||||
it("returns 401 when token is configured and Authorization is missing", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: "secret" }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/health`);
|
||||
expect(res.status).toBe(401);
|
||||
expect(await res.json()).toEqual({ ok: false, error: "Unauthorized" });
|
||||
});
|
||||
|
||||
it("returns 401 when bearer token does not match", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: "secret" }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/health`, {
|
||||
headers: { Authorization: "Bearer wrong" },
|
||||
});
|
||||
expect(res.status).toBe(401);
|
||||
});
|
||||
|
||||
it("allows OPTIONS without Authorization when token is configured", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: "secret" }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/health`, { method: "OPTIONS" });
|
||||
expect(res.status).toBe(204);
|
||||
});
|
||||
|
||||
it("allows GET with matching Bearer token", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: "secret" }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/health`, {
|
||||
headers: { Authorization: "Bearer secret" },
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
});
|
||||
|
||||
it("returns 413 when POST body exceeds 1MB", async () => {
|
||||
srv = createHttpApiServer({ port: 0, host: "127.0.0.1", token: null }, handlers);
|
||||
const { port } = await srv.ready();
|
||||
const oversized = "x".repeat(1024 * 1024 + 1);
|
||||
const res = await fetch(`http://127.0.0.1:${String(port)}/api/trigger-sense`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: oversized,
|
||||
});
|
||||
expect(res.status).toBe(413);
|
||||
expect(await res.json()).toEqual({ ok: false, error: "Payload too large" });
|
||||
});
|
||||
});
|
||||
@@ -30,7 +30,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -198,7 +198,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
@@ -216,7 +216,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -232,7 +232,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
@@ -256,7 +256,7 @@ describe("kernel — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
});
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
|
||||
@@ -97,7 +97,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -323,7 +323,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
reflexes: [],
|
||||
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
@@ -377,7 +377,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -211,7 +211,7 @@ describe("kernel — groupForSense mapping", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
@@ -60,7 +60,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
|
||||
@@ -92,7 +92,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
|
||||
@@ -27,7 +27,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -153,7 +153,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
@@ -174,7 +174,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: MOCK_WORKER,
|
||||
@@ -190,7 +190,7 @@ describe("phase6 — reloadConfig", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
@@ -228,7 +228,7 @@ describe("phase6 — error isolation", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
@@ -339,7 +339,7 @@ describe("phase6 — getHealth", () => {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
@@ -300,7 +300,7 @@ describe("ReflexScheduler — workflow reflexes ignored", () => {
|
||||
workflows: {
|
||||
"my-workflow": { concurrency: 1, overflow: "drop" },
|
||||
},
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
@@ -89,7 +89,7 @@ function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveCon
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: overrides as NerveConfig["workflows"],
|
||||
api: { port: null },
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
+109
-17
@@ -1,25 +1,43 @@
|
||||
/**
|
||||
* Optional JSON HTTP control plane (Phase 1 — no auth).
|
||||
* Optional JSON HTTP control plane.
|
||||
* Uses only `node:http`; shares handler logic with Unix IPC via {@link createDaemonHandlers}.
|
||||
*/
|
||||
|
||||
import { timingSafeEqual } from "node:crypto";
|
||||
import { once } from "node:events";
|
||||
import { type IncomingMessage, type Server, type ServerResponse, createServer } from "node:http";
|
||||
|
||||
import { isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { DaemonHandlerBundle } from "./daemon-handlers.js";
|
||||
|
||||
/** Phase 1 HTTP API has no auth — bind loopback only unless explicitly configured later. */
|
||||
const HTTP_API_BIND_HOST = "127.0.0.1";
|
||||
const MAX_REQUEST_BODY_BYTES = 1024 * 1024;
|
||||
|
||||
class PayloadTooLargeError extends Error {
|
||||
constructor() {
|
||||
super("Payload too large");
|
||||
this.name = "PayloadTooLargeError";
|
||||
}
|
||||
}
|
||||
|
||||
const CORS_HEADERS: Record<string, string> = {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
||||
"Access-Control-Allow-Headers": "Content-Type",
|
||||
"Access-Control-Allow-Headers": "Content-Type, Authorization",
|
||||
};
|
||||
|
||||
export type HttpApiListenOptions = {
|
||||
port: number;
|
||||
/** Bind host (e.g. `127.0.0.1` or `0.0.0.0` when auth is enabled). */
|
||||
host: string;
|
||||
/** When non-null, all non-OPTIONS requests must send `Authorization: Bearer <token>`. */
|
||||
token: string | null;
|
||||
};
|
||||
|
||||
export type HttpApiServer = {
|
||||
close: () => Promise<void>;
|
||||
/** Resolves when the server is listening (needed when `port` is `0` in tests). */
|
||||
ready: () => Promise<{ host: string; port: number }>;
|
||||
};
|
||||
|
||||
function setJsonHeaders(res: ServerResponse, status: number): void {
|
||||
@@ -35,16 +53,48 @@ function sendJson(res: ServerResponse, status: number, body: unknown): void {
|
||||
res.end(`${JSON.stringify(body)}\n`);
|
||||
}
|
||||
|
||||
function readRequestBody(req: IncomingMessage): Promise<string> {
|
||||
function readRequestBody(req: IncomingMessage, res: ServerResponse): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let total = 0;
|
||||
const chunks: Buffer[] = [];
|
||||
req.on("data", (c: Buffer) => {
|
||||
|
||||
function cleanup(): void {
|
||||
req.removeListener("data", onData);
|
||||
req.removeListener("end", onEnd);
|
||||
req.removeListener("error", onError);
|
||||
}
|
||||
|
||||
function onData(c: Buffer): void {
|
||||
if (total + c.length > MAX_REQUEST_BODY_BYTES) {
|
||||
cleanup();
|
||||
if (!res.writableEnded) {
|
||||
res.once("finish", () => {
|
||||
req.destroy();
|
||||
});
|
||||
sendJson(res, 413, { ok: false, error: "Payload too large" });
|
||||
} else {
|
||||
req.destroy();
|
||||
}
|
||||
reject(new PayloadTooLargeError());
|
||||
return;
|
||||
}
|
||||
total += c.length;
|
||||
chunks.push(c);
|
||||
});
|
||||
req.on("end", () => {
|
||||
}
|
||||
|
||||
function onEnd(): void {
|
||||
cleanup();
|
||||
resolve(Buffer.concat(chunks).toString("utf8"));
|
||||
});
|
||||
req.on("error", reject);
|
||||
}
|
||||
|
||||
function onError(err: Error): void {
|
||||
cleanup();
|
||||
reject(err);
|
||||
}
|
||||
|
||||
req.on("data", onData);
|
||||
req.on("end", onEnd);
|
||||
req.on("error", onError);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,7 +107,26 @@ function parseJsonBody(raw: string): unknown | null {
|
||||
}
|
||||
}
|
||||
|
||||
export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle): HttpApiServer {
|
||||
function extractBearerToken(authHeader: string | undefined): string | null {
|
||||
if (authHeader === undefined || authHeader.length === 0) return null;
|
||||
const m = /^Bearer\s+(\S+)$/i.exec(authHeader.trim());
|
||||
return m?.[1] ?? null;
|
||||
}
|
||||
|
||||
function bearerTokenMatches(expected: string, received: string | null): boolean {
|
||||
if (received === null) return false;
|
||||
const a = Buffer.from(expected, "utf8");
|
||||
const b = Buffer.from(received, "utf8");
|
||||
if (a.length !== b.length) return false;
|
||||
return timingSafeEqual(a, b);
|
||||
}
|
||||
|
||||
export function createHttpApiServer(
|
||||
listen: HttpApiListenOptions,
|
||||
handlers: DaemonHandlerBundle,
|
||||
): HttpApiServer {
|
||||
const { port, host, token: expectedToken } = listen;
|
||||
|
||||
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: HTTP router dispatches multiple routes in one handler
|
||||
const server: Server = createServer(async (req, res) => {
|
||||
if (req.method === "OPTIONS") {
|
||||
@@ -66,6 +135,14 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
return;
|
||||
}
|
||||
|
||||
if (expectedToken !== null) {
|
||||
const presented = extractBearerToken(req.headers.authorization);
|
||||
if (!bearerTokenMatches(expectedToken, presented)) {
|
||||
sendJson(res, 401, { ok: false, error: "Unauthorized" });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const url = req.url ?? "";
|
||||
const path = url.split("?")[0] ?? "";
|
||||
|
||||
@@ -86,7 +163,7 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/trigger-sense") {
|
||||
const raw = await readRequestBody(req);
|
||||
const raw = await readRequestBody(req, res);
|
||||
const body = parseJsonBody(raw);
|
||||
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
|
||||
sendJson(res, 400, { ok: false, error: 'Expected JSON body: { "name": string }' });
|
||||
@@ -98,7 +175,7 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/trigger-workflow") {
|
||||
const raw = await readRequestBody(req);
|
||||
const raw = await readRequestBody(req, res);
|
||||
const body = parseJsonBody(raw);
|
||||
if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) {
|
||||
sendJson(res, 400, {
|
||||
@@ -138,7 +215,7 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
}
|
||||
|
||||
if (req.method === "POST" && path === "/api/kill-workflow") {
|
||||
const raw = await readRequestBody(req);
|
||||
const raw = await readRequestBody(req, res);
|
||||
const body = parseJsonBody(raw);
|
||||
if (
|
||||
!isPlainRecord(body) ||
|
||||
@@ -168,19 +245,34 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
|
||||
sendJson(res, 404, { ok: false, error: "Not found" });
|
||||
} catch (err) {
|
||||
if (err instanceof PayloadTooLargeError) {
|
||||
return;
|
||||
}
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
sendJson(res, 500, { ok: false, error: msg });
|
||||
}
|
||||
});
|
||||
|
||||
server.listen(port, HTTP_API_BIND_HOST, () => {
|
||||
process.stderr.write(`[http-api] listening on http://${HTTP_API_BIND_HOST}:${String(port)}\n`);
|
||||
server.listen(port, host, () => {
|
||||
const addr = server.address();
|
||||
const displayPort =
|
||||
typeof addr === "object" && addr !== null ? String(addr.port) : String(port);
|
||||
process.stderr.write(`[http-api] listening on http://${host}:${displayPort}\n`);
|
||||
});
|
||||
|
||||
server.on("error", (err) => {
|
||||
process.stderr.write(`[http-api] server error: ${err.message}\n`);
|
||||
});
|
||||
|
||||
async function ready(): Promise<{ host: string; port: number }> {
|
||||
await once(server, "listening");
|
||||
const addr = server.address();
|
||||
if (typeof addr === "object" && addr !== null) {
|
||||
return { host: addr.address, port: addr.port };
|
||||
}
|
||||
return { host, port };
|
||||
}
|
||||
|
||||
async function close(): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((e) => {
|
||||
@@ -190,5 +282,5 @@ export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle)
|
||||
});
|
||||
}
|
||||
|
||||
return { close };
|
||||
return { close, ready };
|
||||
}
|
||||
|
||||
@@ -391,7 +391,14 @@ export function createKernel(
|
||||
? httpPortOverride
|
||||
: initialConfig.api.port;
|
||||
if (effectiveHttpPort !== null && effectiveHttpPort > 0) {
|
||||
httpApiServer = createHttpApiServer(effectiveHttpPort, daemonHandlers);
|
||||
httpApiServer = createHttpApiServer(
|
||||
{
|
||||
port: effectiveHttpPort,
|
||||
host: config.api.host,
|
||||
token: config.api.token,
|
||||
},
|
||||
daemonHandlers,
|
||||
);
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
|
||||
Reference in New Issue
Block a user