diff --git a/packages/cli/src/__tests__/sense-list.test.ts b/packages/cli/src/__tests__/sense-list.test.ts index 93d2fd6..52ac449 100644 --- a/packages/cli/src/__tests__/sense-list.test.ts +++ b/packages/cli/src/__tests__/sense-list.test.ts @@ -16,30 +16,47 @@ import { join } from "node:path"; import type { SenseInfo } from "@uncaged/nerve-core"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +function intervalLabelFromReflexRecord(rec: Record): string | null { + if (typeof rec.interval !== "number") return null; + const totalSeconds = Math.floor(rec.interval / 1000); + if (totalSeconds < 60) return `every ${totalSeconds}s`; + return `every ${Math.floor(totalSeconds / 60)}m`; +} + +function onTriggerPartFromReflexRecord(rec: Record): string | null { + if (!Array.isArray(rec.on) || rec.on.length === 0) return null; + const onLabels = rec.on.filter((v): v is string => typeof v === "string"); + if (onLabels.length === 0) return null; + return `on: ${onLabels.join(", ")}`; +} + +/** Returns a display line when this reflex targets `senseName`; otherwise null (skip). */ +function reflexTriggerLineForSense(rec: Record, senseName: string): string | null { + if (rec.kind !== "sense" || rec.sense !== senseName) return null; + const parts: string[] = []; + const intervalPart = intervalLabelFromReflexRecord(rec); + if (intervalPart !== null) parts.push(intervalPart); + const onPart = onTriggerPartFromReflexRecord(rec); + if (onPart !== null) parts.push(onPart); + return parts.length > 0 ? parts.join(" · ") : "reflex (no interval or on)"; +} + +function mockSenseTriggerLabels(senseName: string, reflexes: readonly unknown[]): string[] { + const out: string[] = []; + for (const reflex of reflexes) { + if (typeof reflex !== "object" || reflex === null) continue; + const rec = reflex as Record; + const line = reflexTriggerLineForSense(rec, senseName); + if (line !== null) out.push(line); + } + return out; +} + vi.mock("@uncaged/nerve-core", async () => { const actual = await vi.importActual("@uncaged/nerve-core"); return { ...actual, - senseTriggerLabels: (senseName: string, reflexes: readonly unknown[]): string[] => { - const out: string[] = []; - for (const reflex of reflexes) { - if (typeof reflex !== "object" || reflex === null) continue; - const rec = reflex as Record; - if (rec.kind !== "sense" || rec.sense !== senseName) continue; - const parts: string[] = []; - if (typeof rec.interval === "number") { - const totalSeconds = Math.floor(rec.interval / 1000); - if (totalSeconds < 60) parts.push(`every ${totalSeconds}s`); - else parts.push(`every ${Math.floor(totalSeconds / 60)}m`); - } - if (Array.isArray(rec.on) && rec.on.length > 0) { - const onLabels = rec.on.filter((v): v is string => typeof v === "string"); - if (onLabels.length > 0) parts.push(`on: ${onLabels.join(", ")}`); - } - out.push(parts.length > 0 ? parts.join(" · ") : "reflex (no interval or on)"); - } - return out; - }, + senseTriggerLabels: mockSenseTriggerLabels, isSenseInfo: (value: unknown): value is SenseInfo => { if (typeof value !== "object" || value === null) return false; const rec = value as Record; diff --git a/packages/daemon/src/daemon-ipc.ts b/packages/daemon/src/daemon-ipc.ts index 6802945..54ed0bb 100644 --- a/packages/daemon/src/daemon-ipc.ts +++ b/packages/daemon/src/daemon-ipc.ts @@ -9,7 +9,7 @@ import { rmSync } from "node:fs"; import { type Server, type Socket, createServer } from "node:net"; -import type { DaemonIpcResponse } from "@uncaged/nerve-core"; +import type { DaemonIpcRequest, DaemonIpcResponse } from "@uncaged/nerve-core"; import { parseDaemonIpcRequest } from "@uncaged/nerve-core"; import type { DaemonHandlerBundle } from "./daemon-handlers.js"; @@ -29,54 +29,65 @@ export function createDaemonIpcServer( // file did not exist — that is fine } + function writeIpcLine(socket: Socket, resp: DaemonIpcResponse): void { + socket.write(`${JSON.stringify(resp)}\n`); + } + + function dispatchDaemonIpcRequest(socket: Socket, req: DaemonIpcRequest): void { + switch (req.type) { + case "trigger-workflow": { + const r = handlers.triggerWorkflow(req.workflow, { + prompt: req.prompt, + maxRounds: req.maxRounds, + dryRun: req.dryRun, + }); + writeIpcLine(socket, r.ok ? { ok: true } : { ok: false, error: r.error }); + return; + } + case "trigger-sense": { + const r = handlers.triggerSense(req.sense); + writeIpcLine(socket, r.ok ? { ok: true } : { ok: false, error: r.error }); + return; + } + case "list-senses": { + writeIpcLine(socket, { ok: true, senses: handlers.listSenses() }); + return; + } + case "kill-workflow": { + const r = handlers.killWorkflowByRunId(req.runId); + writeIpcLine(socket, r.ok ? { ok: true } : { ok: false, error: r.error }); + return; + } + case "list-workflows": { + writeIpcLine(socket, { ok: true, workflows: handlers.listWorkflows() }); + return; + } + case "health": { + writeIpcLine(socket, { ok: true, health: handlers.health() }); + return; + } + default: { + const _exhaustive: never = req; + void _exhaustive; + } + } + } + function handleLine(socket: Socket, line: string): void { const trimmed = line.trim(); if (trimmed.length === 0) return; const req = parseDaemonIpcRequest(trimmed); if (req === null) { - const resp: DaemonIpcResponse = { ok: false, error: "Invalid request" }; - socket.write(`${JSON.stringify(resp)}\n`); + writeIpcLine(socket, { ok: false, error: "Invalid request" }); return; } try { - if (req.type === "trigger-workflow") { - const r = handlers.triggerWorkflow(req.workflow, { - prompt: req.prompt, - maxRounds: req.maxRounds, - dryRun: req.dryRun, - }); - const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; - socket.write(`${JSON.stringify(resp)}\n`); - } else if (req.type === "trigger-sense") { - const r = handlers.triggerSense(req.sense); - const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; - socket.write(`${JSON.stringify(resp)}\n`); - } else if (req.type === "list-senses") { - const senses = handlers.listSenses(); - const resp: DaemonIpcResponse = { ok: true, senses }; - socket.write(`${JSON.stringify(resp)}\n`); - } else if (req.type === "kill-workflow") { - const r = handlers.killWorkflowByRunId(req.runId); - const resp: DaemonIpcResponse = r.ok ? { ok: true } : { ok: false, error: r.error }; - socket.write(`${JSON.stringify(resp)}\n`); - } else if (req.type === "list-workflows") { - const workflows = handlers.listWorkflows(); - const resp: DaemonIpcResponse = { ok: true, workflows }; - socket.write(`${JSON.stringify(resp)}\n`); - } else if (req.type === "health") { - const health = handlers.health(); - const resp: DaemonIpcResponse = { ok: true, health }; - socket.write(`${JSON.stringify(resp)}\n`); - } else { - const _exhaustive: never = req; - void _exhaustive; - } + dispatchDaemonIpcRequest(socket, req); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - const resp: DaemonIpcResponse = { ok: false, error: msg }; - socket.write(`${JSON.stringify(resp)}\n`); + writeIpcLine(socket, { ok: false, error: msg }); } } diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index a5d5b4a..940f213 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -168,6 +168,47 @@ function validateResumeThreadMsg(obj: Record): string | null { return null; } +function parseParentCompute(obj: Record): Result { + if (typeof obj.sense !== "string") { + return err(new Error("IPC 'compute' message missing string 'sense' field")); + } + return ok({ type: "compute", sense: obj.sense }); +} + +function parseParentStartThread(obj: Record): Result { + const errMsg = validateStartThreadMsg(obj); + if (errMsg !== null) return err(new Error(errMsg)); + // Field types are validated above; `Record` values stay `unknown` to TypeScript. + return ok({ + type: "start-thread", + runId: obj.runId, + workflow: obj.workflow, + prompt: obj.prompt, + maxRounds: obj.maxRounds, + dryRun: obj.dryRun, + } as StartThreadMessage); +} + +function parseParentResumeThread(obj: Record): Result { + const errMsg = validateResumeThreadMsg(obj); + if (errMsg !== null) return err(new Error(errMsg)); + // Elements are validated as plain objects by the kernel; trust the wire shape here. + return ok({ + type: "resume-thread", + runId: obj.runId, + messages: obj.messages as ResumeThreadMessage["messages"], + maxRounds: obj.maxRounds, + dryRun: obj.dryRun, + } as ResumeThreadMessage); +} + +function parseParentKillThread(obj: Record): Result { + if (typeof obj.runId !== "string") { + return err(new Error("'kill-thread' message missing string 'runId'")); + } + return ok({ type: "kill-thread", runId: obj.runId } as KillThreadMessage); +} + /** Validate and parse an unknown IPC message received from the parent process. */ export function parseParentMessage(raw: unknown): Result { if (!isPlainRecord(raw)) { @@ -180,50 +221,22 @@ export function parseParentMessage(raw: unknown): Result if (!PARENT_MSG_TYPES.has(obj.type)) { return err(new Error(`Unknown IPC message type: "${obj.type}"`)); } - if (obj.type === "compute") { - if (typeof obj.sense !== "string") { - return err(new Error("IPC 'compute' message missing string 'sense' field")); - } - return ok({ type: "compute", sense: obj.sense }); + switch (obj.type) { + case "compute": + return parseParentCompute(obj); + case "shutdown": + return ok({ type: "shutdown" }); + case "health-request": + return ok({ type: "health-request" }); + case "start-thread": + return parseParentStartThread(obj); + case "resume-thread": + return parseParentResumeThread(obj); + case "kill-thread": + return parseParentKillThread(obj); + default: + return err(new Error(`Unhandled IPC message type: "${obj.type}"`)); } - if (obj.type === "shutdown") { - return ok({ type: "shutdown" }); - } - if (obj.type === "health-request") { - return ok({ type: "health-request" }); - } - if (obj.type === "start-thread") { - const errMsg = validateStartThreadMsg(obj); - if (errMsg !== null) return err(new Error(errMsg)); - // Field types are validated above; `Record` values stay `unknown` to TypeScript. - return ok({ - type: "start-thread", - runId: obj.runId, - workflow: obj.workflow, - prompt: obj.prompt, - maxRounds: obj.maxRounds, - dryRun: obj.dryRun, - } as StartThreadMessage); - } - if (obj.type === "resume-thread") { - const errMsg = validateResumeThreadMsg(obj); - if (errMsg !== null) return err(new Error(errMsg)); - // Elements are validated as plain objects by the kernel; trust the wire shape here. - return ok({ - type: "resume-thread", - runId: obj.runId, - messages: obj.messages as ResumeThreadMessage["messages"], - maxRounds: obj.maxRounds, - dryRun: obj.dryRun, - } as ResumeThreadMessage); - } - if (obj.type === "kill-thread") { - if (typeof obj.runId !== "string") { - return err(new Error("'kill-thread' message missing string 'runId'")); - } - return ok({ type: "kill-thread", runId: obj.runId } as KillThreadMessage); - } - return err(new Error(`Unhandled IPC message type: "${obj.type}"`)); } function parseSignalMsg(obj: Record): Result {