diff --git a/packages/cli/src/__tests__/workflow.test.ts b/packages/cli/src/__tests__/workflow.test.ts index 1fbd3cd..0c0d582 100644 --- a/packages/cli/src/__tests__/workflow.test.ts +++ b/packages/cli/src/__tests__/workflow.test.ts @@ -192,6 +192,7 @@ describe("buildListOutput", () => { // header + 2 run lines expect(lines).toHaveLength(3); expect(paginationHint).not.toBeNull(); + expect(paginationHint).toContain("nerve workflow runs"); expect(paginationHint).toContain("--offset 2"); expect(paginationHint).toContain("3 more"); }); @@ -298,7 +299,7 @@ describe("buildInspectOutput", () => { // Integration: getAllWorkflowRuns + buildListOutput end-to-end with real store // --------------------------------------------------------------------------- -describe("workflow list — integration with real store", () => { +describe("workflow runs list — integration with real store", () => { it("lists active runs from the store", () => { upsertRun("r1", "cleanup", "started", 1000); upsertRun("r2", "cleanup", "queued", 2000); diff --git a/packages/cli/src/commands/dev.ts b/packages/cli/src/commands/dev.ts index abf1bca..cafcbdd 100644 --- a/packages/cli/src/commands/dev.ts +++ b/packages/cli/src/commands/dev.ts @@ -1,6 +1,9 @@ import { defineCommand } from "citty"; -import { runForegroundKernelSession } from "../run-foreground-kernel.js"; +import { + type ForegroundSessionOptions, + runForegroundKernelSession, +} from "../run-foreground-kernel.js"; import { loadDaemonModule } from "../workspace-daemon.js"; import { getNerveRoot } from "../workspace.js"; @@ -9,9 +12,25 @@ export const devCommand = defineCommand({ name: "dev", description: "Run the nerve kernel in the foreground (development mode)", }, - async run() { + args: { + port: { + type: "string", + description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.", + default: "", + }, + }, + async run({ args }) { const nerveRoot = getNerveRoot(); const { createKernel } = await loadDaemonModule(nerveRoot); - await runForegroundKernelSession(nerveRoot, createKernel); + let sessionOpts: ForegroundSessionOptions = {}; + if (args.port.length > 0) { + const n = Number.parseInt(args.port, 10); + if (Number.isNaN(n) || n < 1 || n > 65_535) { + process.stderr.write(`❌ Invalid --port: ${args.port}\n`); + process.exit(1); + } + sessionOpts = { httpApiPortOverride: n }; + } + await runForegroundKernelSession(nerveRoot, createKernel, sessionOpts); }, }); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index baaee9c..7c369f9 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -1,9 +1,10 @@ import { spawn } from "node:child_process"; -import { createWriteStream, existsSync } from "node:fs"; +import { createWriteStream, existsSync, readFileSync } from "node:fs"; import { mkdir } from "node:fs/promises"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; +import { parseNerveConfig } from "@uncaged/nerve-core"; import { defineCommand } from "citty"; import { @@ -56,7 +57,7 @@ function daemonBootstrapScript(): string { ); } -async function runDaemon(nerveRoot: string): Promise { +async function runDaemon(nerveRoot: string, cliHttpPort: number | null): Promise { if (isRunning()) { const pid = readPidFile(); process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`); @@ -74,12 +75,27 @@ async function runDaemon(nerveRoot: string): Promise { const bootstrapPath = daemonBootstrapScript(); + const configPath = join(nerveRoot, "nerve.yaml"); + let yamlApiPort: number | null = null; + try { + const raw = readFileSync(configPath, "utf8"); + const parsed = parseNerveConfig(raw); + if (parsed.ok) yamlApiPort = parsed.value.api.port; + } catch { + // kernel bootstrap will surface a clearer error if config is missing + } + const resolvedHttpPort = cliHttpPort ?? yamlApiPort; + const env: NodeJS.ProcessEnv = { ...process.env, NERVE_ROOT: nerveRoot }; + if (resolvedHttpPort !== null && resolvedHttpPort > 0) { + env.NERVE_API_PORT = String(resolvedHttpPort); + } + // After `open`, file-backed WriteStream has a numeric OS fd for spawn stdio; `@types/node` omits `fd` on this WriteStream alias. const logFd = (logStream as unknown as { fd: number }).fd; const child = spawn(process.execPath, [bootstrapPath], { detached: true, stdio: ["ignore", logFd, logFd], - env: { ...process.env, NERVE_ROOT: nerveRoot }, + env, cwd: nerveRoot, }); @@ -109,8 +125,8 @@ async function runDaemon(nerveRoot: string): Promise { } /** Background daemon only — use `nerve dev` for foreground mode. */ -export async function runDaemonStartCommand(): Promise { - await runDaemon(getNerveRoot()); +export async function runDaemonStartCommand(cliHttpPort: number | null = null): Promise { + await runDaemon(getNerveRoot(), cliHttpPort); } export const daemonStartCommand = defineCommand({ @@ -118,7 +134,23 @@ export const daemonStartCommand = defineCommand({ name: "start", description: "Start the nerve daemon in the background", }, - async run() { - await runDaemonStartCommand(); + args: { + port: { + type: "string", + description: "HTTP API port (overrides nerve.yaml api.port). Omit to use YAML / env only.", + default: "", + }, + }, + async run({ args }) { + let cliHttpPort: number | null = null; + if (args.port.length > 0) { + const n = Number.parseInt(args.port, 10); + if (Number.isNaN(n) || n < 1 || n > 65_535) { + process.stderr.write(`❌ Invalid --port: ${args.port}\n`); + process.exit(1); + } + cliHttpPort = n; + } + await runDaemonStartCommand(cliHttpPort); }, }); diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index 2662f60..5813d15 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -7,7 +7,12 @@ import { defineCommand } from "citty"; import { stringify } from "yaml"; import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store"; -import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js"; +import { + killWorkflowViaDaemon, + listWorkflowsViaDaemon, + triggerWorkflowViaDaemon, +} from "../daemon-client.js"; +import { formatRowsAsAlignedTable } from "../sense-sqlite.js"; import { loadDaemonModule } from "../workspace-daemon.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; @@ -125,7 +130,7 @@ export function buildListOutput( const allFlagStr = allFlag ? " --all" : ""; paginationHint = `\n⏩ ${remaining} more run(s) not shown. Fetch next page:\n` + - ` nerve workflow list --offset ${offset + limit}${allFlagStr}${wfFlag}\n`; + ` nerve workflow runs --offset ${offset + limit}${allFlagStr}${wfFlag}\n`; } return { lines, paginationHint }; @@ -298,13 +303,61 @@ export function buildThreadCommandOutput( } // --------------------------------------------------------------------------- -// nerve workflow list +// nerve workflow list (daemon — registered workflows + queue depth) // --------------------------------------------------------------------------- -const workflowListCommand = defineCommand({ +const workflowDaemonListCommand = defineCommand({ meta: { name: "list", - description: "List active (queued/started) workflow runs", + description: "List workflows from the running daemon (concurrency, active, queued)", + }, + async run() { + if (!isRunning()) { + process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n"); + process.exit(1); + } + + const socketPath = getSocketPath(); + let response: Awaited>; + try { + response = await listWorkflowsViaDaemon(socketPath); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`); + process.exit(1); + } + + if (!response.ok) { + process.stderr.write(`❌ Daemon error: ${response.error}\n`); + process.exit(1); + } + + const rows = response.workflows.map((w) => ({ + name: w.name, + active: w.activeThreads, + queued: w.queuedThreads, + concurrency: w.config.concurrency, + overflow: w.config.overflow, + })); + + if (rows.length === 0) { + process.stdout.write("📭 No workflows in nerve.yaml (or empty registry).\n"); + return; + } + + process.stdout.write(`📋 Workflows (${String(rows.length)}):\n\n`); + process.stdout.write(formatRowsAsAlignedTable(rows)); + }, +}); + +// --------------------------------------------------------------------------- +// nerve workflow runs +// --------------------------------------------------------------------------- + +const workflowRunsCommand = defineCommand({ + meta: { + name: "runs", + description: "List active (queued/started) workflow runs from logs", }, args: { all: { @@ -562,7 +615,7 @@ const workflowTriggerCommand = defineCommand({ } process.stdout.write(`✅ Triggered workflow "${args.name}" via daemon.\n`); - process.stdout.write("\n💡 Inspect active runs with: nerve workflow list\n"); + process.stdout.write("\n💡 Inspect active runs with: nerve workflow runs\n"); }, }); @@ -616,7 +669,8 @@ export const workflowCommand = defineCommand({ description: "Manage and inspect workflow runs", }, subCommands: { - list: workflowListCommand, + list: workflowDaemonListCommand, + runs: workflowRunsCommand, inspect: workflowInspectCommand, thread: workflowThreadCommand, trigger: workflowTriggerCommand, diff --git a/packages/cli/src/daemon-client.ts b/packages/cli/src/daemon-client.ts index d4fc8ee..224b90e 100644 --- a/packages/cli/src/daemon-client.ts +++ b/packages/cli/src/daemon-client.ts @@ -10,11 +10,16 @@ import type { Socket } from "node:net"; import type { DaemonIpcListSensesResponse, + DaemonIpcListWorkflowsResponse, DaemonIpcRequest, DaemonIpcTriggerResponse, + DaemonTransport, + DaemonTransportTriggerResult, + HealthInfo, SenseInfo, + WorkflowStatus, } from "@uncaged/nerve-core"; -import { isPlainRecord } from "@uncaged/nerve-core"; +import { DEFAULT_ENGINE_MAX_ROUNDS, isPlainRecord } from "@uncaged/nerve-core"; const CONNECT_TIMEOUT_MS = 3_000; const RESPONSE_TIMEOUT_MS = 5_000; @@ -32,6 +37,19 @@ function isSenseInfo(value: unknown): value is SenseInfo { ); } +function isWorkflowStatus(value: unknown): value is WorkflowStatus { + if (!isPlainRecord(value)) return false; + const cfg = value.config; + if (!isPlainRecord(cfg)) return false; + return ( + typeof value.name === "string" && + typeof value.activeThreads === "number" && + typeof value.queuedThreads === "number" && + typeof cfg.concurrency === "number" && + typeof cfg.overflow === "string" + ); +} + function parseDaemonResponse(line: string): DaemonIpcTriggerResponse { try { const obj: unknown = JSON.parse(line); @@ -62,6 +80,51 @@ function parseListSensesResponse(line: string): DaemonIpcListSensesResponse { return { ok: false, error: `Unexpected daemon response: ${line}` }; } +function parseListWorkflowsResponse(line: string): DaemonIpcListWorkflowsResponse { + try { + const obj: unknown = JSON.parse(line); + if (isPlainRecord(obj)) { + const r = obj; + if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error }; + if (r.ok === true && Array.isArray(r.workflows) && r.workflows.every(isWorkflowStatus)) { + return { ok: true, workflows: r.workflows }; + } + } + } catch { + // fall through + } + return { ok: false, error: `Unexpected daemon response: ${line}` }; +} + +function parseHealthResponse(line: string): HealthInfo | null { + try { + const obj: unknown = JSON.parse(line); + if (!isPlainRecord(obj)) return null; + const r = obj; + if (r.ok === true && isPlainRecord(r.health)) { + const h = r.health; + if ( + typeof h.ok === "boolean" && + typeof h.version === "string" && + typeof h.uptime === "number" && + typeof h.startedAt === "string" && + typeof h.hostname === "string" + ) { + return { + ok: h.ok, + version: h.version, + uptime: h.uptime, + startedAt: h.startedAt, + hostname: h.hostname, + }; + } + } + } catch { + // fall through + } + return null; +} + /** * Connect to the daemon socket, send one JSON request (newline-terminated), * and resolve with the first non-empty line parsed by `parseFirstLine`. @@ -126,6 +189,72 @@ function sendAndReceive( }); } +/** Unix-socket implementation of {@link DaemonTransport} (local daemon). */ +export class UnixTransport implements DaemonTransport { + readonly socketPath: string; + constructor(socketPath: string) { + this.socketPath = socketPath; + } + + async health(): Promise { + const parsed = await sendAndReceive(this.socketPath, { type: "health" }, (line) => + parseHealthResponse(line), + ); + if (parsed === null) { + throw new Error("Unexpected daemon response for health"); + } + return parsed; + } + + async listSenses(): Promise { + const r = await sendAndReceive( + this.socketPath, + { type: "list-senses" }, + parseListSensesResponse, + ); + if (!r.ok) { + throw new Error(r.error); + } + return r.senses; + } + + async listWorkflows(): Promise { + const r = await sendAndReceive( + this.socketPath, + { type: "list-workflows" }, + parseListWorkflowsResponse, + ); + if (!r.ok) { + throw new Error(r.error); + } + return r.workflows; + } + + async triggerSense(name: string): Promise { + return sendAndReceive( + this.socketPath, + { type: "trigger-sense", sense: name }, + parseDaemonResponse, + ); + } + + async triggerWorkflow(name: string): Promise { + const message: DaemonIpcRequest = { + type: "trigger-workflow", + workflow: name, + prompt: "", + maxRounds: DEFAULT_ENGINE_MAX_ROUNDS, + dryRun: false, + }; + return sendAndReceive(this.socketPath, message, parseDaemonResponse); + } + + async killWorkflow(runId: string): Promise { + const message: DaemonIpcRequest = { type: "kill-workflow", runId }; + return sendAndReceive(this.socketPath, message, parseDaemonResponse); + } +} + /** * Send a trigger-workflow message to the running daemon via its Unix socket. * Resolves with the daemon's response or rejects on connection/timeout errors. @@ -168,6 +297,16 @@ export function listSensesViaDaemon(socketPath: string): Promise { + const message: DaemonIpcRequest = { type: "list-workflows" }; + return sendAndReceive(socketPath, message, parseListWorkflowsResponse); +} + /** * Send a kill-workflow message to the running daemon via its Unix socket. * Resolves with the daemon's response or rejects on connection/timeout errors. diff --git a/packages/cli/src/run-foreground-kernel.ts b/packages/cli/src/run-foreground-kernel.ts index c05a1d7..009f917 100644 --- a/packages/cli/src/run-foreground-kernel.ts +++ b/packages/cli/src/run-foreground-kernel.ts @@ -9,7 +9,11 @@ import { getSocketPath } from "./workspace.js"; export type CreateKernelFn = ( config: NerveConfig, nerveRoot: string, - opts: { enableFileWatcher: boolean; ipcSocketPath: string }, + opts: { + enableFileWatcher: boolean; + ipcSocketPath: string; + httpApiPortOverride?: number | null; + }, ) => { groups: Set; ready: Promise; @@ -28,9 +32,23 @@ function readConfig(nerveRoot: string): ReturnType { return parseNerveConfig(raw); } +function parseEnvHttpPort(): number | null { + const envPort = process.env.NERVE_API_PORT; + if (envPort === undefined || envPort === "") return null; + const n = Number.parseInt(envPort, 10); + if (Number.isNaN(n) || n < 1 || n > 65_535) return null; + return n; +} + +export type ForegroundSessionOptions = { + /** Positive integer overrides `nerve.yaml` `api.port` (same precedence as `NERVE_API_PORT`). */ + httpApiPortOverride?: number | null; +}; + export async function runForegroundKernelSession( nerveRoot: string, createKernel: CreateKernelFn, + sessionOpts: ForegroundSessionOptions = {}, ): Promise { const configResult = readConfig(nerveRoot); if (!configResult.ok) { @@ -39,10 +57,23 @@ export async function runForegroundKernelSession( } const config = configResult.value; - const kernel = createKernel(config, nerveRoot, { + const envOverride = parseEnvHttpPort(); + const cliOverride = sessionOpts.httpApiPortOverride ?? null; + const resolvedOverride = + cliOverride !== null && cliOverride > 0 + ? cliOverride + : envOverride !== null && envOverride > 0 + ? envOverride + : null; + + const kernelBase = { enableFileWatcher: true, ipcSocketPath: getSocketPath(), - }); + }; + const kernel = + resolvedOverride !== null + ? createKernel(config, nerveRoot, { ...kernelBase, httpApiPortOverride: resolvedOverride }) + : createKernel(config, nerveRoot, kernelBase); const senseNames = Object.keys(config.senses); const groups = [...kernel.groups]; diff --git a/packages/cli/src/workspace-daemon.ts b/packages/cli/src/workspace-daemon.ts index cabd341..551fc64 100644 --- a/packages/cli/src/workspace-daemon.ts +++ b/packages/cli/src/workspace-daemon.ts @@ -34,7 +34,11 @@ export type DaemonModule = { createKernel: ( config: NerveConfig, nerveRoot: string, - options: { enableFileWatcher: boolean; ipcSocketPath: string }, + options: { + enableFileWatcher: boolean; + ipcSocketPath: string; + httpApiPortOverride?: number | null; + }, ) => { groups: Set; ready: Promise; diff --git a/packages/core/src/__tests__/config.test.ts b/packages/core/src/__tests__/config.test.ts index fc82c2c..540110c 100644 --- a/packages/core/src/__tests__/config.test.ts +++ b/packages/core/src/__tests__/config.test.ts @@ -63,6 +63,7 @@ describe("parseNerveConfig", () => { overflow: "queue", maxQueue: 10, }); + expect(result.value.api).toEqual({ port: null }); }); it("parses config with empty reflexes array", () => { @@ -170,9 +171,39 @@ workflows: maxQueue: 100, }); }); + + it("parses api.port when present", () => { + const yaml = ` +senses: + cpu: + group: system +reflexes: [] +api: + port: 9800 +`; + const result = parseNerveConfig(yaml); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.api).toEqual({ port: 9800 }); + }); }); describe("invalid configs", () => { + it("returns error when api.port is out of range", () => { + const yaml = ` +senses: + cpu: + group: system +reflexes: [] +api: + port: 99999 +`; + const result = parseNerveConfig(yaml); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toMatch(/api\.port/); + }); + it("returns error on bad YAML syntax", () => { const result = parseNerveConfig("senses: [\\nunclosed"); expect(result.ok).toBe(false); diff --git a/packages/core/src/__tests__/daemon-ipc-protocol.test.ts b/packages/core/src/__tests__/daemon-ipc-protocol.test.ts index 5102465..401b9f1 100644 --- a/packages/core/src/__tests__/daemon-ipc-protocol.test.ts +++ b/packages/core/src/__tests__/daemon-ipc-protocol.test.ts @@ -55,7 +55,7 @@ describe("parseDaemonIpcRequest", () => { ).toBeNull(); }); - it("parses trigger-sense and list-senses", () => { + it("parses trigger-sense, list-senses, list-workflows, and health", () => { expect(parseDaemonIpcRequest(JSON.stringify({ type: "trigger-sense", sense: "x" }))).toEqual({ type: "trigger-sense", sense: "x", @@ -63,6 +63,12 @@ describe("parseDaemonIpcRequest", () => { expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-senses" }))).toEqual({ type: "list-senses", }); + expect(parseDaemonIpcRequest(JSON.stringify({ type: "list-workflows" }))).toEqual({ + type: "list-workflows", + }); + expect(parseDaemonIpcRequest(JSON.stringify({ type: "health" }))).toEqual({ + type: "health", + }); }); it("returns null for invalid JSON or unknown type", () => { diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index c6c18b0..d1fb0b4 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -28,10 +28,16 @@ export type QueueOverflowConfig = { export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig; +/** Optional HTTP control plane (Phase 1: no auth). When `port` is null, the HTTP server is not started. */ +export type NerveApiConfig = { + port: number | null; +}; + export type NerveConfig = { /** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */ maxRounds: number; senses: Record; reflexes: ReflexConfig[]; workflows: Record; + api: NerveApiConfig; }; diff --git a/packages/core/src/daemon-ipc-protocol.ts b/packages/core/src/daemon-ipc-protocol.ts index 2606456..b95f523 100644 --- a/packages/core/src/daemon-ipc-protocol.ts +++ b/packages/core/src/daemon-ipc-protocol.ts @@ -7,6 +7,23 @@ import { isPlainRecord } from "./is-plain-record.js"; import type { SenseInfo } from "./sense.js"; +/** Runtime status of a registered workflow (for listing / observability). */ +export type WorkflowStatus = { + name: string; + activeThreads: number; + queuedThreads: number; + config: { concurrency: number; overflow: string }; +}; + +/** Public health payload for HTTP / IPC. */ +export type HealthInfo = { + ok: boolean; + version: string; + uptime: number; + startedAt: string; + hostname: string; +}; + /** Client → daemon: start a workflow run. */ export type DaemonIpcTriggerWorkflowRequest = { type: "trigger-workflow"; @@ -33,12 +50,24 @@ export type DaemonIpcKillWorkflowRequest = { runId: string; }; +/** Client → daemon: list registered workflows and queue/active counts. */ +export type DaemonIpcListWorkflowsRequest = { + type: "list-workflows"; +}; + +/** Client → daemon: public health snapshot. */ +export type DaemonIpcHealthRequest = { + type: "health"; +}; + /** Union of all JSON requests the daemon IPC server accepts. */ export type DaemonIpcRequest = | DaemonIpcTriggerWorkflowRequest | DaemonIpcTriggerSenseRequest | DaemonIpcListSensesRequest - | DaemonIpcKillWorkflowRequest; + | DaemonIpcKillWorkflowRequest + | DaemonIpcListWorkflowsRequest + | DaemonIpcHealthRequest; /** Successful trigger / trigger-sense reply (no body). */ export type DaemonIpcTriggerOkResponse = { ok: true }; @@ -53,11 +82,21 @@ export type DaemonIpcListSensesResponse = | { ok: true; senses: SenseInfo[] } | DaemonIpcErrorResponse; +/** Reply for list-workflows. */ +export type DaemonIpcListWorkflowsResponse = + | { ok: true; workflows: WorkflowStatus[] } + | DaemonIpcErrorResponse; + +/** Reply for health. */ +export type DaemonIpcHealthResponse = { ok: true; health: HealthInfo } | DaemonIpcErrorResponse; + /** Any JSON response the daemon may write on the IPC socket. */ export type DaemonIpcResponse = | DaemonIpcTriggerOkResponse | DaemonIpcErrorResponse - | { ok: true; senses: SenseInfo[] }; + | DaemonIpcListSensesResponse + | DaemonIpcListWorkflowsResponse + | DaemonIpcHealthResponse; function parseTriggerWorkflowFields( req: Record, @@ -98,6 +137,12 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null { if (typeof req.runId !== "string" || req.runId.length === 0) return null; return { type: "kill-workflow", runId: req.runId }; } + if (req.type === "list-workflows") { + return { type: "list-workflows" }; + } + if (req.type === "health") { + return { type: "health" }; + } return null; } catch { return null; diff --git a/packages/core/src/daemon-transport.ts b/packages/core/src/daemon-transport.ts new file mode 100644 index 0000000..f6360fa --- /dev/null +++ b/packages/core/src/daemon-transport.ts @@ -0,0 +1,18 @@ +import type { HealthInfo, WorkflowStatus } from "./daemon-ipc-protocol.js"; +import type { SenseInfo } from "./sense.js"; + +export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string }; + +/** + * Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2). + * Implementations live in CLI / tools; the daemon kernel uses shared handler logic. + */ +export type DaemonTransport = { + health(): Promise; + listSenses(): Promise; + listWorkflows(): Promise; + triggerSense(name: string): Promise; + triggerWorkflow(name: string): Promise; + /** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */ + killWorkflow(runId: string): Promise; +}; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 818ed9c..efbc6e5 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -5,6 +5,7 @@ export type { DropOverflowConfig, QueueOverflowConfig, WorkflowConfig, + NerveApiConfig, NerveConfig, } from "./config.js"; export type { Signal, SenseInfo, SenseResult } from "./sense.js"; @@ -35,15 +36,22 @@ export { } from "./sense-workflow-directive.js"; export type { + WorkflowStatus, + HealthInfo, DaemonIpcTriggerWorkflowRequest, DaemonIpcTriggerSenseRequest, DaemonIpcListSensesRequest, DaemonIpcKillWorkflowRequest, + DaemonIpcListWorkflowsRequest, + DaemonIpcHealthRequest, DaemonIpcRequest, DaemonIpcTriggerOkResponse, DaemonIpcErrorResponse, DaemonIpcTriggerResponse, DaemonIpcListSensesResponse, + DaemonIpcListWorkflowsResponse, + DaemonIpcHealthResponse, DaemonIpcResponse, } from "./daemon-ipc-protocol.js"; export { parseDaemonIpcRequest } from "./daemon-ipc-protocol.js"; +export type { DaemonTransport, DaemonTransportTriggerResult } from "./daemon-transport.js"; diff --git a/packages/core/src/parse-nerve-config.ts b/packages/core/src/parse-nerve-config.ts index 91b0639..5ac52eb 100644 --- a/packages/core/src/parse-nerve-config.ts +++ b/packages/core/src/parse-nerve-config.ts @@ -1,6 +1,12 @@ import { parse } from "yaml"; -import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./config.js"; +import type { + NerveApiConfig, + NerveConfig, + ReflexConfig, + SenseConfig, + WorkflowConfig, +} from "./config.js"; import { isPlainRecord } from "./is-plain-record.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; @@ -245,6 +251,28 @@ function parseReflexes( return ok(reflexes); } +function parseApiConfig(obj: Record): Result { + if (obj.api === undefined || obj.api === null) { + return ok({ port: null }); + } + if (!isPlainRecord(obj.api)) { + return err(new Error("api: must be an object if provided")); + } + const api = obj.api; + if (api.port === undefined || api.port === null) { + return ok({ port: null }); + } + if ( + typeof api.port !== "number" || + !Number.isInteger(api.port) || + api.port < 1 || + api.port > 65_535 + ) { + return err(new Error("api.port: must be an integer between 1 and 65535 if provided")); + } + return ok({ port: api.port }); +} + function parseWorkflows(obj: Record): Result> { if (obj.workflows === undefined || obj.workflows === null) return ok({}); @@ -293,10 +321,14 @@ export function parseNerveConfig(raw: string): Result { const maxRoundsResult = parseEngineMaxRounds(obj); if (!maxRoundsResult.ok) return maxRoundsResult; + const apiResult = parseApiConfig(obj); + if (!apiResult.ok) return apiResult; + return ok({ maxRounds: maxRoundsResult.value, senses, reflexes: reflexesResult.value, workflows: workflowsResult.value, + api: apiResult.value, }); } diff --git a/packages/daemon/src/__tests__/crash-recovery.test.ts b/packages/daemon/src/__tests__/crash-recovery.test.ts index ff2de1e..6e08300 100644 --- a/packages/daemon/src/__tests__/crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/crash-recovery.test.ts @@ -65,6 +65,7 @@ function makeConfig(workflows: Record = {}): NerveConfig reflexes: [], workflows, maxRounds: 10, + api: { port: null }, }; } diff --git a/packages/daemon/src/__tests__/daemon-ipc.test.ts b/packages/daemon/src/__tests__/daemon-ipc.test.ts index 8deb9cc..a12dbea 100644 --- a/packages/daemon/src/__tests__/daemon-ipc.test.ts +++ b/packages/daemon/src/__tests__/daemon-ipc.test.ts @@ -15,6 +15,7 @@ import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createDaemonHandlers } from "../daemon-handlers.js"; import { createDaemonIpcServer } from "../daemon-ipc.js"; import type { DaemonIpcServer } from "../daemon-ipc.js"; @@ -28,15 +29,42 @@ let server: DaemonIpcServer | null = null; function makeMockWorkflowManager() { return { startWorkflow: vi.fn(), + killThread: vi.fn(() => false), stop: vi.fn(async () => {}), totalActiveCount: vi.fn(() => 0), drainAndRespawn: vi.fn(async () => {}), drainWhenIdle: vi.fn(), updateConfig: vi.fn(), - getActiveWorkflowRuns: vi.fn(() => []), + listWorkflows: vi.fn(() => []), + activeCount: vi.fn(() => 0), + queueLength: vi.fn(() => 0), }; } +function openIpcServer( + path: string, + wfManager: ReturnType, + opts: { + triggerSense: (senseName: string) => void; + listSenses: () => unknown[]; + }, +): DaemonIpcServer { + const handlers = createDaemonHandlers({ + workflowManager: wfManager as never, + triggerSense: opts.triggerSense, + listSenses: opts.listSenses as never, + getHealthInfo: () => ({ + ok: true, + version: "0.0.0-test", + uptime: 0, + startedAt: "2020-01-01T00:00:00.000Z", + hostname: "test-host", + }), + getDefaultMaxRounds: () => 10, + }); + return createDaemonIpcServer(path, handlers); +} + function sendRaw(path: string, message: object): Promise { return new Promise((resolve, reject) => { const sock = connect(path, () => { @@ -89,9 +117,10 @@ afterEach(async () => { describe("daemon-ipc — trigger-sense", () => { it("responds ok:true when triggerSense succeeds", async () => { const triggerSense = vi.fn(); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "cpu-usage" }); @@ -105,9 +134,10 @@ describe("daemon-ipc — trigger-sense", () => { const triggerSense = vi.fn(() => { throw new Error('Unknown sense: "no-such-sense"'); }); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "no-such-sense" }); @@ -118,9 +148,10 @@ describe("daemon-ipc — trigger-sense", () => { it("responds ok:false for trigger-sense with empty sense name", async () => { const triggerSense = vi.fn(); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { type: "trigger-sense", sense: "" }); @@ -131,9 +162,10 @@ describe("daemon-ipc — trigger-sense", () => { it("responds ok:false for trigger-sense missing sense field", async () => { const triggerSense = vi.fn(); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { type: "trigger-sense" }); @@ -145,9 +177,9 @@ describe("daemon-ipc — trigger-sense", () => { it("does NOT call triggerSense for trigger-workflow requests", async () => { const triggerSense = vi.fn(); const wfManager = makeMockWorkflowManager(); - server = createDaemonIpcServer(sockPath, wfManager as never, { + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { @@ -169,9 +201,10 @@ describe("daemon-ipc — trigger-sense", () => { it("responds ok:false for completely unknown request type", async () => { const triggerSense = vi.fn(); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense, - listSenses: vi.fn(() => []), + listSenses: () => [], }); const resp = await sendRaw(sockPath, { type: "unknown-type", data: "x" }); @@ -188,7 +221,8 @@ describe("daemon-ipc — trigger-sense", () => { describe("daemon-ipc — list-senses", () => { it("responds ok:true with empty senses array when listSenses returns []", async () => { const listSenses = vi.fn(() => []); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense: vi.fn(), listSenses, }); @@ -217,7 +251,8 @@ describe("daemon-ipc — list-senses", () => { }, ]; const listSenses = vi.fn(() => sensesData); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense: vi.fn(), listSenses, }); @@ -232,7 +267,8 @@ describe("daemon-ipc — list-senses", () => { const listSenses = vi.fn(() => { throw new Error("internal error"); }); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense: vi.fn(), listSenses, }); @@ -244,7 +280,8 @@ describe("daemon-ipc — list-senses", () => { it("does NOT call listSenses for trigger-sense requests", async () => { const listSenses = vi.fn(() => []); - server = createDaemonIpcServer(sockPath, makeMockWorkflowManager() as never, { + const wfManager = makeMockWorkflowManager(); + server = openIpcServer(sockPath, wfManager, { triggerSense: vi.fn(), listSenses, }); diff --git a/packages/daemon/src/__tests__/file-watcher-workflow.test.ts b/packages/daemon/src/__tests__/file-watcher-workflow.test.ts index dcf049e..a80196e 100644 --- a/packages/daemon/src/__tests__/file-watcher-workflow.test.ts +++ b/packages/daemon/src/__tests__/file-watcher-workflow.test.ts @@ -81,6 +81,8 @@ describe("createFileWatcher — workflow file changes (Phase 3)", () => { watcher = createFileWatcher(root, (change) => changes.push(change), 50); await new Promise((r) => setTimeout(r, 100)); + // Isolate the nerve.yaml write from fs.watch startup / coalesced events on some platforms + changes.length = 0; writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n"); await waitFor(() => changes.some((c) => c.kind === "config"), 3000); diff --git a/packages/daemon/src/__tests__/file-watcher.test.ts b/packages/daemon/src/__tests__/file-watcher.test.ts index ae7e836..b36c254 100644 --- a/packages/daemon/src/__tests__/file-watcher.test.ts +++ b/packages/daemon/src/__tests__/file-watcher.test.ts @@ -95,6 +95,8 @@ describe("createFileWatcher", () => { watcher = createFileWatcher(root, (change) => changes.push(change), 50); await new Promise((r) => setTimeout(r, 100)); + // Ignore fs.watch attachment noise on some platforms before asserting post-close silence + changes.length = 0; watcher.close(); watcher = null; diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index f5dd4a7..2809a0a 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -66,7 +66,7 @@ const { createWorkflowManager } = await import("../workflow-manager.js"); const { createKernel } = await import("../kernel.js"); function makeWfConfig(workflows: Record = {}): NerveConfig { - return { senses: {}, reflexes: [], workflows, maxRounds: 10 }; + return { senses: {}, reflexes: [], workflows, maxRounds: 10, api: { port: null } }; } function makeLogStore() { @@ -454,6 +454,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { reflexes: [], workflows: { "my-wf": { concurrency: 1, overflow: "drop" } }, maxRounds: 10, + api: { port: null }, }; const kernel = createKernel(config, nerveRoot, { @@ -489,6 +490,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { reflexes: [], workflows: { "old-wf": { concurrency: 1, overflow: "drop" } }, maxRounds: 10, + api: { port: null }, }; const kernel = createKernel(initialConfig, nerveRoot, { @@ -510,6 +512,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); @@ -532,6 +535,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { reflexes: [], workflows: { "my-wf": { concurrency: 1, overflow: "drop" } }, maxRounds: 10, + api: { port: null }, }; const kernel = createKernel(initialConfig, nerveRoot, { @@ -548,6 +552,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { reflexes: [], workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } }, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); diff --git a/packages/daemon/src/__tests__/kernel-integration.test.ts b/packages/daemon/src/__tests__/kernel-integration.test.ts index c916e12..1320f96 100644 --- a/packages/daemon/src/__tests__/kernel-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-integration.test.ts @@ -30,6 +30,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } diff --git a/packages/daemon/src/__tests__/kernel-phase6.test.ts b/packages/daemon/src/__tests__/kernel-phase6.test.ts index 2dd285a..1e8a9fe 100644 --- a/packages/daemon/src/__tests__/kernel-phase6.test.ts +++ b/packages/daemon/src/__tests__/kernel-phase6.test.ts @@ -78,6 +78,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } @@ -197,6 +198,7 @@ describe("kernel — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }); expect(kernel.groups.has("network")).toBe(true); @@ -214,6 +216,7 @@ describe("kernel — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; const kernel = createKernel(config, nerveRoot); @@ -229,6 +232,7 @@ describe("kernel — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }); expect(kernel.groups.has("network")).toBe(false); @@ -252,6 +256,7 @@ describe("kernel — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }); expect(kernel.getHealth().activeSenses).toBe(2); diff --git a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts index 9fad4be..43601f7 100644 --- a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts +++ b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts @@ -97,6 +97,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index 9e1b646..e2e0289 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -100,6 +100,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } @@ -322,6 +323,7 @@ describe("kernel + workflowManager integration", () => { reflexes: [], workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } }, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); @@ -375,6 +377,7 @@ describe("kernel + workflowManager integration", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); diff --git a/packages/daemon/src/__tests__/kernel.test.ts b/packages/daemon/src/__tests__/kernel.test.ts index 5b2b277..ec199e8 100644 --- a/packages/daemon/src/__tests__/kernel.test.ts +++ b/packages/daemon/src/__tests__/kernel.test.ts @@ -61,6 +61,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } @@ -210,6 +211,7 @@ describe("kernel — groupForSense mapping", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; const kernel = createKernel(config, nerveRoot); diff --git a/packages/daemon/src/__tests__/log-store-integration.test.ts b/packages/daemon/src/__tests__/log-store-integration.test.ts index 8323abb..ab35b37 100644 --- a/packages/daemon/src/__tests__/log-store-integration.test.ts +++ b/packages/daemon/src/__tests__/log-store-integration.test.ts @@ -31,6 +31,7 @@ describe("LogStore + ReflexScheduler integration", () => { reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }], workflows: {}, maxRounds: 10, + api: { port: null }, }; const bus = createSignalBus(); const triggered: string[] = []; @@ -59,6 +60,7 @@ describe("LogStore + ReflexScheduler integration", () => { reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }], workflows: {}, maxRounds: 10, + api: { port: null }, }; const bus = createSignalBus(); const ref: { scheduler: ReturnType | null } = { scheduler: null }; @@ -90,6 +92,7 @@ describe("LogStore + ReflexScheduler integration", () => { reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }], workflows: {}, maxRounds: 10, + api: { port: null }, }; const bus = createSignalBus(); const triggered: string[] = []; diff --git a/packages/daemon/src/__tests__/phase6-integration.test.ts b/packages/daemon/src/__tests__/phase6-integration.test.ts index 6c9fd10..6b2be3d 100644 --- a/packages/daemon/src/__tests__/phase6-integration.test.ts +++ b/packages/daemon/src/__tests__/phase6-integration.test.ts @@ -27,6 +27,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } @@ -152,6 +153,7 @@ describe("phase6 — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); @@ -172,6 +174,7 @@ describe("phase6 — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel = createKernel(config, nerveRoot, { workerScript: MOCK_WORKER, @@ -187,6 +190,7 @@ describe("phase6 — reloadConfig", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); @@ -224,6 +228,7 @@ describe("phase6 — error isolation", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel = createKernel(config, nerveRoot, { @@ -334,6 +339,7 @@ describe("phase6 — getHealth", () => { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, }; kernel.reloadConfig(newConfig); diff --git a/packages/daemon/src/__tests__/reflex-scheduler-throttle-pending.test.ts b/packages/daemon/src/__tests__/reflex-scheduler-throttle-pending.test.ts index cf18866..362f823 100644 --- a/packages/daemon/src/__tests__/reflex-scheduler-throttle-pending.test.ts +++ b/packages/daemon/src/__tests__/reflex-scheduler-throttle-pending.test.ts @@ -12,6 +12,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } diff --git a/packages/daemon/src/__tests__/reflex-scheduler.test.ts b/packages/daemon/src/__tests__/reflex-scheduler.test.ts index cac2a6e..cb67b6e 100644 --- a/packages/daemon/src/__tests__/reflex-scheduler.test.ts +++ b/packages/daemon/src/__tests__/reflex-scheduler.test.ts @@ -18,6 +18,7 @@ function makeConfig(overrides: Partial = {}): NerveConfig { reflexes: [], workflows: {}, maxRounds: 10, + api: { port: null }, ...overrides, }; } @@ -299,6 +300,7 @@ describe("ReflexScheduler — workflow reflexes ignored", () => { workflows: { "my-workflow": { concurrency: 1, overflow: "drop" }, }, + api: { port: null }, }; const bus = createSignalBus(); const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name)); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts index 3830c1e..48a85c2 100644 --- a/packages/daemon/src/__tests__/workflow-manager.test.ts +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -89,6 +89,7 @@ function makeConfig(overrides: Partial = {}): NerveCon senses: {}, reflexes: [], workflows: overrides as NerveConfig["workflows"], + api: { port: null }, }; } diff --git a/packages/daemon/src/daemon-handlers.ts b/packages/daemon/src/daemon-handlers.ts new file mode 100644 index 0000000..bd97d5e --- /dev/null +++ b/packages/daemon/src/daemon-handlers.ts @@ -0,0 +1,79 @@ +import type { HealthInfo, SenseInfo, WorkflowStatus } from "@uncaged/nerve-core"; + +import type { WorkflowManager } from "./workflow-manager.js"; + +export type DaemonHandlerBundle = { + health: () => HealthInfo; + listSenses: () => SenseInfo[]; + listWorkflows: () => WorkflowStatus[]; + getDefaultMaxRounds: () => number; + triggerSense: (senseName: string) => { ok: true } | { ok: false; error: string }; + triggerWorkflow: ( + workflowName: string, + launch: { prompt: string; maxRounds: number; dryRun: boolean }, + ) => { ok: true } | { ok: false; error: string }; + killWorkflowByRunId: (runId: string) => { ok: true } | { ok: false; error: string }; +}; + +export type CreateDaemonHandlersInput = { + workflowManager: WorkflowManager; + triggerSense: (senseName: string) => void; + listSenses: () => SenseInfo[]; + getHealthInfo: () => HealthInfo; + getDefaultMaxRounds: () => number; +}; + +export function createDaemonHandlers(input: CreateDaemonHandlersInput): DaemonHandlerBundle { + const { + workflowManager, + triggerSense: triggerSenseFn, + listSenses: listSensesFn, + getHealthInfo, + getDefaultMaxRounds, + } = input; + + function triggerSense(senseName: string): { ok: true } | { ok: false; error: string } { + try { + triggerSenseFn(senseName); + return { ok: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { ok: false, error: msg }; + } + } + + function triggerWorkflow( + workflowName: string, + launch: { prompt: string; maxRounds: number; dryRun: boolean }, + ): { ok: true } | { ok: false; error: string } { + try { + workflowManager.startWorkflow(workflowName, launch); + return { ok: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { ok: false, error: msg }; + } + } + + function killWorkflowByRunId(runId: string): { ok: true } | { ok: false; error: string } { + try { + const found = workflowManager.killThread(runId); + return found + ? { ok: true } + : { ok: false, error: `Run not found or already finished: ${runId}` }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { ok: false, error: msg }; + } + } + + return { + health: () => getHealthInfo(), + listSenses: () => listSensesFn(), + listWorkflows: () => workflowManager.listWorkflows(), + getDefaultMaxRounds: () => getDefaultMaxRounds(), + triggerSense, + triggerWorkflow, + killWorkflowByRunId, + }; +} diff --git a/packages/daemon/src/daemon-ipc.ts b/packages/daemon/src/daemon-ipc.ts index 5361bc4..6802945 100644 --- a/packages/daemon/src/daemon-ipc.ts +++ b/packages/daemon/src/daemon-ipc.ts @@ -9,28 +9,18 @@ import { rmSync } from "node:fs"; import { type Server, type Socket, createServer } from "node:net"; -import type { DaemonIpcResponse, SenseInfo } from "@uncaged/nerve-core"; +import type { DaemonIpcResponse } from "@uncaged/nerve-core"; import { parseDaemonIpcRequest } from "@uncaged/nerve-core"; -import type { WorkflowManager } from "./workflow-manager.js"; - -export type { SenseInfo }; +import type { DaemonHandlerBundle } from "./daemon-handlers.js"; export type DaemonIpcServer = { close: () => Promise; }; -export type DaemonIpcServerOptions = { - /** Called when a trigger-sense request arrives. Should throw if the sense is unknown. */ - triggerSense: (senseName: string) => void; - /** Called when a list-senses request arrives. Returns sense info for all registered senses. */ - listSenses: () => SenseInfo[]; -}; - export function createDaemonIpcServer( socketPath: string, - workflowManager: WorkflowManager, - opts: DaemonIpcServerOptions, + handlers: DaemonHandlerBundle, ): DaemonIpcServer { // Remove stale socket file if it exists try { @@ -52,26 +42,32 @@ export function createDaemonIpcServer( try { if (req.type === "trigger-workflow") { - workflowManager.startWorkflow(req.workflow, { + const r = handlers.triggerWorkflow(req.workflow, { prompt: req.prompt, maxRounds: req.maxRounds, dryRun: req.dryRun, }); - const resp: DaemonIpcResponse = { ok: true }; + const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; socket.write(`${JSON.stringify(resp)}\n`); } else if (req.type === "trigger-sense") { - opts.triggerSense(req.sense); - const resp: DaemonIpcResponse = { ok: true }; + const r = handlers.triggerSense(req.sense); + const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; socket.write(`${JSON.stringify(resp)}\n`); } else if (req.type === "list-senses") { - const senses = opts.listSenses(); + const senses = handlers.listSenses(); const resp: DaemonIpcResponse = { ok: true, senses }; socket.write(`${JSON.stringify(resp)}\n`); } else if (req.type === "kill-workflow") { - const found = workflowManager.killThread(req.runId); - const resp: DaemonIpcResponse = found - ? { ok: true } - : { ok: false, error: `Run not found or already finished: ${req.runId}` }; + const r = handlers.killWorkflowByRunId(req.runId); + const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; + socket.write(`${JSON.stringify(resp)}\n`); + } else if (req.type === "list-workflows") { + const workflows = handlers.listWorkflows(); + const resp: DaemonIpcResponse = { ok: true, workflows }; + socket.write(`${JSON.stringify(resp)}\n`); + } else if (req.type === "health") { + const health = handlers.health(); + const resp: DaemonIpcResponse = { ok: true, health }; socket.write(`${JSON.stringify(resp)}\n`); } else { const _exhaustive: never = req; diff --git a/packages/daemon/src/http-api.ts b/packages/daemon/src/http-api.ts new file mode 100644 index 0000000..63871fa --- /dev/null +++ b/packages/daemon/src/http-api.ts @@ -0,0 +1,194 @@ +/** + * Optional JSON HTTP control plane (Phase 1 — no auth). + * Uses only `node:http`; shares handler logic with Unix IPC via {@link createDaemonHandlers}. + */ + +import { type IncomingMessage, type Server, type ServerResponse, createServer } from "node:http"; + +import { isPlainRecord } from "@uncaged/nerve-core"; + +import type { DaemonHandlerBundle } from "./daemon-handlers.js"; + +/** Phase 1 HTTP API has no auth — bind loopback only unless explicitly configured later. */ +const HTTP_API_BIND_HOST = "127.0.0.1"; + +const CORS_HEADERS: Record = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type", +}; + +export type HttpApiServer = { + close: () => Promise; +}; + +function setJsonHeaders(res: ServerResponse, status: number): void { + res.statusCode = status; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + for (const [k, v] of Object.entries(CORS_HEADERS)) { + res.setHeader(k, v); + } +} + +function sendJson(res: ServerResponse, status: number, body: unknown): void { + setJsonHeaders(res, status); + res.end(`${JSON.stringify(body)}\n`); +} + +function readRequestBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + req.on("data", (c: Buffer) => { + chunks.push(c); + }); + req.on("end", () => { + resolve(Buffer.concat(chunks).toString("utf8")); + }); + req.on("error", reject); + }); +} + +function parseJsonBody(raw: string): unknown | null { + if (raw.trim().length === 0) return null; + try { + return JSON.parse(raw) as unknown; + } catch { + return null; + } +} + +export function createHttpApiServer(port: number, handlers: DaemonHandlerBundle): HttpApiServer { + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: HTTP router dispatches multiple routes in one handler + const server: Server = createServer(async (req, res) => { + if (req.method === "OPTIONS") { + setJsonHeaders(res, 204); + res.end(); + return; + } + + const url = req.url ?? ""; + const path = url.split("?")[0] ?? ""; + + try { + if (req.method === "GET" && path === "/api/health") { + sendJson(res, 200, handlers.health()); + return; + } + + if (req.method === "GET" && path === "/api/senses") { + sendJson(res, 200, { senses: handlers.listSenses() }); + return; + } + + if (req.method === "GET" && path === "/api/workflows") { + sendJson(res, 200, { workflows: handlers.listWorkflows() }); + return; + } + + if (req.method === "POST" && path === "/api/trigger-sense") { + const raw = await readRequestBody(req); + const body = parseJsonBody(raw); + if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) { + sendJson(res, 400, { ok: false, error: 'Expected JSON body: { "name": string }' }); + return; + } + const result = handlers.triggerSense(body.name); + sendJson(res, result.ok ? 200 : 400, result); + return; + } + + if (req.method === "POST" && path === "/api/trigger-workflow") { + const raw = await readRequestBody(req); + const body = parseJsonBody(raw); + if (!isPlainRecord(body) || typeof body.name !== "string" || body.name.length === 0) { + sendJson(res, 400, { + ok: false, + error: + 'Expected JSON body: { "name": string, "prompt"?: string, "maxRounds"?: number, "dryRun"?: boolean }', + }); + return; + } + let prompt = ""; + if (body.prompt !== undefined && body.prompt !== null) { + if (typeof body.prompt !== "string") { + sendJson(res, 400, { ok: false, error: '"prompt" must be a string when provided' }); + return; + } + prompt = body.prompt; + } + let maxRounds = handlers.getDefaultMaxRounds(); + if (body.maxRounds !== undefined && body.maxRounds !== null) { + if (typeof body.maxRounds !== "number") { + sendJson(res, 400, { ok: false, error: '"maxRounds" must be a number when provided' }); + return; + } + maxRounds = body.maxRounds; + } + let dryRun = false; + if (body.dryRun !== undefined && body.dryRun !== null) { + if (typeof body.dryRun !== "boolean") { + sendJson(res, 400, { ok: false, error: '"dryRun" must be a boolean when provided' }); + return; + } + dryRun = body.dryRun; + } + const result = handlers.triggerWorkflow(body.name, { prompt, maxRounds, dryRun }); + sendJson(res, result.ok ? 200 : 400, result); + return; + } + + if (req.method === "POST" && path === "/api/kill-workflow") { + const raw = await readRequestBody(req); + const body = parseJsonBody(raw); + if ( + !isPlainRecord(body) || + typeof body.threadId !== "string" || + body.threadId.length === 0 + ) { + sendJson(res, 400, { + ok: false, + error: 'Expected JSON body: { "threadId": string, "name"?: string }', + }); + return; + } + if (body.name !== undefined && body.name !== null && typeof body.name !== "string") { + sendJson(res, 400, { ok: false, error: '"name" must be a string when provided' }); + return; + } + const nameForLog = typeof body.name === "string" && body.name.length > 0 ? body.name : null; + if (nameForLog !== null) { + process.stderr.write( + `[http-api] kill-workflow threadId=${body.threadId} workflowName=${nameForLog}\n`, + ); + } + const result = handlers.killWorkflowByRunId(body.threadId); + sendJson(res, result.ok ? 200 : 400, result); + return; + } + + sendJson(res, 404, { ok: false, error: "Not found" }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + sendJson(res, 500, { ok: false, error: msg }); + } + }); + + server.listen(port, HTTP_API_BIND_HOST, () => { + process.stderr.write(`[http-api] listening on http://${HTTP_API_BIND_HOST}:${String(port)}\n`); + }); + + server.on("error", (err) => { + process.stderr.write(`[http-api] server error: ${err.message}\n`); + }); + + async function close(): Promise { + await new Promise((resolve, reject) => { + server.close((e) => { + if (e !== undefined && e !== null) reject(e); + else resolve(); + }); + }); + } + + return { close }; +} diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index d089e53..aade9f1 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -3,17 +3,23 @@ * optional file watcher, and daemon IPC. */ -import { join } from "node:path"; +import { readFileSync } from "node:fs"; +import { hostname } from "node:os"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; -import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core"; +import type { HealthInfo, NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core"; import { routeSenseComputeOutput } from "@uncaged/nerve-core"; import { createLogStore } from "@uncaged/nerve-store"; import type { LogStore } from "@uncaged/nerve-store"; +import { createDaemonHandlers } from "./daemon-handlers.js"; import { createDaemonIpcServer } from "./daemon-ipc.js"; import type { DaemonIpcServer } from "./daemon-ipc.js"; import { createFileWatcher } from "./file-watcher.js"; import type { FileWatcher } from "./file-watcher.js"; +import { createHttpApiServer } from "./http-api.js"; +import type { HttpApiServer } from "./http-api.js"; import { parseWorkerMessage } from "./ipc.js"; import { createKernelFileWatchHandlers } from "./kernel-file-watch.js"; import { @@ -53,6 +59,8 @@ export type Kernel = { restartGroup: (group: string) => Promise; reloadConfig: (newConfig: NerveConfig) => void; getHealth: () => KernelHealth; + /** HTTP/IPC-oriented health (version, uptime seconds, hostname). */ + getDaemonHealth: () => HealthInfo; }; export type KernelOptions = { @@ -60,12 +68,37 @@ export type KernelOptions = { enableFileWatcher?: boolean; logStore?: LogStore; ipcSocketPath?: string | null; + /** + * When set to a positive integer, overrides `nerve.yaml` `api.port` for the HTTP API. + * When `null` or omitted, only `config.api.port` from YAML is used. + */ + httpApiPortOverride?: number | null; }; function defaultKernelOptions(): KernelOptions { return { workerScript: resolveWorkerScript(), enableFileWatcher: false }; } +function readDaemonPackageVersion(): string { + try { + const here = fileURLToPath(import.meta.url); + const pkgPath = join(dirname(here), "..", "package.json"); + const raw = readFileSync(pkgPath, "utf8"); + const o: unknown = JSON.parse(raw); + if ( + typeof o === "object" && + o !== null && + "version" in o && + typeof (o as { version: unknown }).version === "string" + ) { + return (o as { version: string }).version; + } + return "0.0.0"; + } catch { + return "0.0.0"; + } +} + export function createKernel( initialConfig: NerveConfig, nerveRoot: string, @@ -74,6 +107,8 @@ export function createKernel( const bus: SignalBus = createSignalBus(); const workerScript = options.workerScript ?? resolveWorkerScript(); const startTime = Date.now(); + const startedAtIso = new Date(startTime).toISOString(); + const daemonVersion = readDaemonPackageVersion(); const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db")); logStore.append({ @@ -292,6 +327,16 @@ export function createKernel( }; } + function getDaemonHealth(): HealthInfo { + return { + ok: true, + version: daemonVersion, + uptime: Math.floor((Date.now() - startTime) / 1000), + startedAt: startedAtIso, + hostname: hostname(), + }; + } + const fileWatchHandlers = createKernelFileWatchHandlers({ nerveRoot, getConfig: () => config, @@ -310,28 +355,43 @@ export function createKernel( }); } + const daemonHandlers = createDaemonHandlers({ + workflowManager, + triggerSense, + listSenses(): SenseInfo[] { + return Object.entries(config.senses).map(([name, senseConfig]) => { + const entries = logStore.query({ + source: "sense", + type: "signal", + refId: name, + }); + const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null; + return { + name, + group: senseConfig.group, + throttle: senseConfig.throttle, + timeout: senseConfig.timeout, + lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null, + }; + }); + }, + getHealthInfo: getDaemonHealth, + getDefaultMaxRounds: () => config.maxRounds, + }); + let ipcServer: DaemonIpcServer | null = null; if (options.ipcSocketPath != null) { - ipcServer = createDaemonIpcServer(options.ipcSocketPath, workflowManager, { - triggerSense, - listSenses(): SenseInfo[] { - return Object.entries(config.senses).map(([name, senseConfig]) => { - const entries = logStore.query({ - source: "sense", - type: "signal", - refId: name, - }); - const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null; - return { - name, - group: senseConfig.group, - throttle: senseConfig.throttle, - timeout: senseConfig.timeout, - lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null, - }; - }); - }, - }); + ipcServer = createDaemonIpcServer(options.ipcSocketPath, daemonHandlers); + } + + let httpApiServer: HttpApiServer | null = null; + const httpPortOverride = options.httpApiPortOverride; + const effectiveHttpPort = + httpPortOverride !== undefined && httpPortOverride !== null && httpPortOverride > 0 + ? httpPortOverride + : initialConfig.api.port; + if (effectiveHttpPort !== null && effectiveHttpPort > 0) { + httpApiServer = createHttpApiServer(effectiveHttpPort, daemonHandlers); } async function stop(): Promise { @@ -344,6 +404,10 @@ export function createKernel( await ipcServer.close(); ipcServer = null; } + if (httpApiServer !== null) { + await httpApiServer.close(); + httpApiServer = null; + } scheduler.stop(); await workflowManager.stop(); await senseWorkerPool.shutdownAll(); @@ -377,5 +441,6 @@ export function createKernel( restartGroup: (group) => senseWorkerPool.restartGroup(group), reloadConfig, getHealth, + getDaemonHealth, }; } diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index dce111e..b894346 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -11,7 +11,12 @@ import type { ChildProcess } from "node:child_process"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; -import type { NerveConfig, WorkflowConfig, WorkflowMessage } from "@uncaged/nerve-core"; +import type { + NerveConfig, + WorkflowConfig, + WorkflowMessage, + WorkflowStatus, +} from "@uncaged/nerve-core"; import { START, isPlainRecord } from "@uncaged/nerve-core"; import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store"; @@ -49,6 +54,8 @@ export type WorkflowManager = { queueLength: (workflowName: string) => number; /** Total active workflow threads across all workflows. */ totalActiveCount: () => number; + /** One row per workflow key in config (sorted by name), using current concurrency / overflow settings. */ + listWorkflows: () => WorkflowStatus[]; /** Update the config reference (e.g. after hot reload). Active workers are unaffected. */ updateConfig: (newConfig: NerveConfig) => void; /** @@ -676,6 +683,22 @@ export function createWorkflowManager( return total; } + function listWorkflows(): WorkflowStatus[] { + const names = Object.keys(config.workflows).sort(); + const out: WorkflowStatus[] = []; + for (const name of names) { + const wf = config.workflows[name]; + if (wf === undefined) continue; + out.push({ + name, + activeThreads: activeCount(name), + queuedThreads: queueLength(name), + config: { concurrency: wf.concurrency, overflow: wf.overflow }, + }); + } + return out; + } + function updateConfig(newConfig: NerveConfig): void { config = newConfig; } @@ -756,6 +779,7 @@ export function createWorkflowManager( activeCount, queueLength, totalActiveCount, + listWorkflows, updateConfig, drainAndRespawn, drainWhenIdle,