diff --git a/packages/cli/src/commands/sense.ts b/packages/cli/src/commands/sense.ts index 96b7877..41eed3b 100644 --- a/packages/cli/src/commands/sense.ts +++ b/packages/cli/src/commands/sense.ts @@ -2,7 +2,7 @@ import { readFileSync } from "node:fs"; import { join } from "node:path"; import { DatabaseSync } from "node:sqlite"; -import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core"; +import { type SenseInfo, isPlainRecord, parseNerveConfig } from "@uncaged/nerve-core"; import { defineCommand } from "citty"; import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js"; @@ -240,7 +240,8 @@ const senseQueryCommand = defineCommand({ } } - const rows = db.prepare(sql).all() as Record[]; + const rawRows: unknown[] = db.prepare(sql).all(); + const rows: Record[] = rawRows.filter(isPlainRecord); if (args.json) { process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 10fe1e3..baaee9c 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -74,9 +74,11 @@ async function runDaemon(nerveRoot: string): Promise { const bootstrapPath = daemonBootstrapScript(); + // After `open`, file-backed WriteStream has a numeric OS fd for spawn stdio; `@types/node` omits `fd` on this WriteStream alias. + const logFd = (logStream as unknown as { fd: number }).fd; const child = spawn(process.execPath, [bootstrapPath], { detached: true, - stdio: ["ignore", (logStream as any).fd, (logStream as any).fd], + stdio: ["ignore", logFd, logFd], env: { ...process.env, NERVE_ROOT: nerveRoot }, cwd: nerveRoot, }); diff --git a/packages/cli/src/commands/status.ts b/packages/cli/src/commands/status.ts index e7d2954..06754b9 100644 --- a/packages/cli/src/commands/status.ts +++ b/packages/cli/src/commands/status.ts @@ -47,7 +47,11 @@ export const statusCommand = defineCommand({ return; } - const pid = readPidFile() as number; + const pid = readPidFile(); + if (pid === null) { + process.stdout.write("😴 Nerve daemon is not running.\n"); + return; + } const configPath = join(getNerveRoot(), "nerve.yaml"); let senseList: string[] = []; diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index 3cf2744..fcd8402 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -1,6 +1,7 @@ import { existsSync } from "node:fs"; import { join } from "node:path"; +import { isPlainRecord } from "@uncaged/nerve-core"; import { defineCommand } from "citty"; import { stringify } from "yaml"; @@ -203,7 +204,9 @@ export function partitionWorkflowMessage(msg: { const contentBody = msg.content; const meta: Record = msg.meta !== null && msg.meta !== undefined && typeof msg.meta === "object" - ? (msg.meta as Record) + ? isPlainRecord(msg.meta) + ? msg.meta + : (msg.meta as Record) : {}; return { roleStr, contentBody, meta }; } diff --git a/packages/cli/src/daemon-client.ts b/packages/cli/src/daemon-client.ts index a5a4bda..21015fb 100644 --- a/packages/cli/src/daemon-client.ts +++ b/packages/cli/src/daemon-client.ts @@ -9,6 +9,7 @@ import { connect } from "node:net"; import type { Socket } from "node:net"; import type { SenseInfo } from "@uncaged/nerve-core"; +import { isPlainRecord } from "@uncaged/nerve-core"; const CONNECT_TIMEOUT_MS = 3_000; const RESPONSE_TIMEOUT_MS = 5_000; @@ -19,11 +20,22 @@ type TriggerResponse = { ok: true } | { ok: false; error: string }; type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string }; +function isSenseInfo(value: unknown): value is SenseInfo { + if (!isPlainRecord(value)) return false; + return ( + typeof value.name === "string" && + typeof value.group === "string" && + (value.throttle === null || typeof value.throttle === "number") && + (value.timeout === null || typeof value.timeout === "number") && + (value.lastSignalTs === null || typeof value.lastSignalTs === "number") + ); +} + function parseDaemonResponse(line: string): TriggerResponse { try { - const obj = JSON.parse(line) as unknown; - if (obj !== null && typeof obj === "object") { - const r = obj as Record; + const obj: unknown = JSON.parse(line); + if (isPlainRecord(obj)) { + const r = obj; if (r.ok === true) return { ok: true }; if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error }; } @@ -35,12 +47,13 @@ function parseDaemonResponse(line: string): TriggerResponse { function parseListSensesResponse(line: string): ListSensesResponse { try { - const obj = JSON.parse(line) as unknown; - if (obj !== null && typeof obj === "object") { - const r = obj as Record; + const obj: unknown = JSON.parse(line); + if (isPlainRecord(obj)) { + const r = obj; if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error }; - if (r.ok === true && Array.isArray(r.senses)) - return { ok: true, senses: r.senses as SenseInfo[] }; + if (r.ok === true && Array.isArray(r.senses) && r.senses.every(isSenseInfo)) { + return { ok: true, senses: r.senses }; + } } } catch { // fall through diff --git a/packages/cli/src/workspace-daemon.ts b/packages/cli/src/workspace-daemon.ts index c61128f..cabd341 100644 --- a/packages/cli/src/workspace-daemon.ts +++ b/packages/cli/src/workspace-daemon.ts @@ -46,5 +46,6 @@ export type DaemonModule = { export async function loadDaemonModule(nerveRoot: string): Promise { const entry = assertWorkspaceDaemonInstalled(nerveRoot); const url = pathToFileURL(entry).href; + // Dynamic import return type is module-specific; narrow at this workspace boundary. return import(url) as Promise; } diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index e8321a6..65eb2d8 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -1,5 +1,6 @@ import { parse } from "yaml"; +import { isPlainRecord } from "./is-plain-record.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./types.js"; @@ -40,11 +41,11 @@ function parseDurationField(field: unknown, label: string): Result { - if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + if (!isPlainRecord(raw)) { return err(new Error(`senses.${name}: must be an object`)); } - const obj = raw as Record; + const obj = raw; if (typeof obj.group !== "string" || obj.group.trim() === "") { return err(new Error(`senses.${name}.group: required string`)); @@ -77,10 +78,10 @@ function validateSenseConfig(name: string, raw: unknown): Result { function parseOnField(index: number, obj: Record): Result { if (obj.on === undefined || obj.on === null) return ok(null); - if (!Array.isArray(obj.on) || !obj.on.every((item) => typeof item === "string")) { + if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) { return err(new Error(`reflexes[${index}].on: must be an array of strings`)); } - return ok(obj.on as string[]); + return ok(obj.on); } function parseSenseReflex( @@ -118,11 +119,11 @@ function validateReflexConfig( raw: unknown, senseNames: Set, ): Result { - if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + if (!isPlainRecord(raw)) { return err(new Error(`reflexes[${index}]: must be an object`)); } - const obj = raw as Record; + const obj = raw; const hasSense = obj.sense !== undefined; const hasWorkflowKey = Object.hasOwn(obj, "workflow"); @@ -158,11 +159,11 @@ function parseEngineMaxRounds(obj: Record): Result { } function validateWorkflowConfig(name: string, raw: unknown): Result { - if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + if (!isPlainRecord(raw)) { return err(new Error(`workflows.${name}: must be an object`)); } - const obj = raw as Record; + const obj = raw; if ( typeof obj.concurrency !== "number" || @@ -209,11 +210,11 @@ function validateWorkflowConfig(name: string, raw: unknown): Result, ): Result<{ senses: Record; senseNames: Set }> { - if (obj.senses === null || typeof obj.senses !== "object" || Array.isArray(obj.senses)) { + if (!isPlainRecord(obj.senses)) { return err(new Error("senses: required object")); } - const sensesRaw = obj.senses as Record; + const sensesRaw = obj.senses; const senses: Record = {}; const senseNames = new Set(Object.keys(sensesRaw)); @@ -249,11 +250,11 @@ function parseWorkflows( ): Result | null> { if (obj.workflows === undefined || obj.workflows === null) return ok(null); - if (typeof obj.workflows !== "object" || Array.isArray(obj.workflows)) { + if (!isPlainRecord(obj.workflows)) { return err(new Error("workflows: must be an object if provided")); } - const workflowsRaw = obj.workflows as Record; + const workflowsRaw = obj.workflows; const workflows: Record = {}; for (const [name, wfRaw] of Object.entries(workflowsRaw)) { @@ -275,11 +276,11 @@ export function parseNerveConfig(raw: string): Result { return err(new Error(`YAML parse error: ${message}`)); } - if (parsed === null || typeof parsed !== "object" || Array.isArray(parsed)) { + if (!isPlainRecord(parsed)) { return err(new Error("Config must be a YAML object")); } - const obj = parsed as Record; + const obj = parsed; const sensesResult = parseSenses(obj); if (!sensesResult.ok) return sensesResult; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f212605..8978cad 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -22,6 +22,7 @@ export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js"; export type { Result } from "./result.js"; export { ok, err } from "./result.js"; export { parseNerveConfig } from "./config.js"; +export { isPlainRecord } from "./is-plain-record.js"; export type { ParsedSenseWorkflowDirective, SenseComputeRoute } from "./sense-workflow-directive.js"; export { parseSenseWorkflowDirective, routeSenseComputeOutput } from "./sense-workflow-directive.js"; diff --git a/packages/core/src/is-plain-record.ts b/packages/core/src/is-plain-record.ts new file mode 100644 index 0000000..e0d6176 --- /dev/null +++ b/packages/core/src/is-plain-record.ts @@ -0,0 +1,7 @@ +/** + * Narrows `unknown` to a plain JSON-style object (not null, not array). + * Use after `JSON.parse` / YAML / IPC when validating structure field-by-field. + */ +export function isPlainRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/core/src/sense-workflow-directive.ts b/packages/core/src/sense-workflow-directive.ts index e4ec75c..cb01bc0 100644 --- a/packages/core/src/sense-workflow-directive.ts +++ b/packages/core/src/sense-workflow-directive.ts @@ -1,3 +1,4 @@ +import { isPlainRecord } from "./is-plain-record.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; @@ -54,10 +55,10 @@ function stripWorkflowKey(payload: Record): Record; + const obj = payload; if (!Object.hasOwn(obj, "workflow")) { return { kind: "signal", payload }; } diff --git a/packages/daemon/src/daemon-ipc.ts b/packages/daemon/src/daemon-ipc.ts index bb2e58f..b8ad68b 100644 --- a/packages/daemon/src/daemon-ipc.ts +++ b/packages/daemon/src/daemon-ipc.ts @@ -14,6 +14,7 @@ import { rmSync } from "node:fs"; import { type Server, type Socket, createServer } from "node:net"; import type { SenseInfo } from "@uncaged/nerve-core"; +import { isPlainRecord } from "@uncaged/nerve-core"; import type { WorkflowManager } from "./workflow-manager.js"; @@ -51,14 +52,14 @@ export type DaemonIpcServer = { function parseRequest(line: string): DaemonRequest | null { try { - const obj = JSON.parse(line) as unknown; - if (obj === null || typeof obj !== "object") return null; - const req = obj as Record; + const obj: unknown = JSON.parse(line); + if (!isPlainRecord(obj)) return null; + const req = obj; if (req.type === "trigger-workflow") { if (typeof req.workflow !== "string" || req.workflow.length === 0) return null; if (typeof req.prompt !== "string") return null; if (typeof req.maxRounds !== "number") return null; - return { type: "trigger-workflow", workflow: req.workflow, prompt: req.prompt, maxRounds: req.maxRounds as number }; + return { type: "trigger-workflow", workflow: req.workflow, prompt: req.prompt, maxRounds: req.maxRounds }; } if (req.type === "trigger-sense") { if (typeof req.sense !== "string" || req.sense.length === 0) return null; diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index b03626f..d6121f0 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -4,7 +4,7 @@ */ import type { Result } from "@uncaged/nerve-core"; -import { err, ok } from "@uncaged/nerve-core"; +import { err, isPlainRecord, ok } from "@uncaged/nerve-core"; /** Parent → Worker: trigger one compute cycle for a sense */ export type ComputeMessage = { @@ -148,76 +148,115 @@ function validateResumeThreadMsg(obj: Record): string | null { /** Validate and parse an unknown IPC message received from the parent process. */ export function parseParentMessage(raw: unknown): Result { - if (raw === null || typeof raw !== "object") { + if (!isPlainRecord(raw)) { return err(new Error("IPC message is not an object")); } - const obj = raw as Record; + const obj = raw; if (typeof obj.type !== "string") { return err(new Error("IPC message missing string 'type' field")); } if (!PARENT_MSG_TYPES.has(obj.type)) { return err(new Error(`Unknown IPC message type: "${obj.type}"`)); } + if (obj.type === "compute") { + if (typeof obj.sense !== "string") { + return err(new Error("IPC 'compute' message missing string 'sense' field")); + } + return ok({ type: "compute", sense: obj.sense }); + } + if (obj.type === "shutdown") { + return ok({ type: "shutdown" }); + } + if (obj.type === "health-request") { + return ok({ type: "health-request" }); + } if (obj.type === "start-thread") { const errMsg = validateStartThreadMsg(obj); if (errMsg !== null) return err(new Error(errMsg)); + // Field types are validated above; `Record` values stay `unknown` to TypeScript. + return ok({ + type: "start-thread", + runId: obj.runId, + workflow: obj.workflow, + prompt: obj.prompt, + maxRounds: obj.maxRounds, + } as StartThreadMessage); } if (obj.type === "resume-thread") { const errMsg = validateResumeThreadMsg(obj); if (errMsg !== null) return err(new Error(errMsg)); + // Elements are validated as plain objects by the kernel; trust the wire shape here. + return ok({ + type: "resume-thread", + runId: obj.runId, + messages: obj.messages as ResumeThreadMessage["messages"], + maxRounds: obj.maxRounds, + } as ResumeThreadMessage); } - return ok(raw as ParentToWorkerMessage); + return err(new Error(`Unhandled IPC message type: "${obj.type}"`)); } -function parseSignalMsg(obj: Record, raw: unknown): Result { +function parseSignalMsg(obj: Record): Result { if (typeof obj.sense !== "string") { return err(new Error("Worker 'signal' message missing string 'sense' field")); } if (!("payload" in obj)) { return err(new Error("Worker 'signal' message missing 'payload' field")); } - return ok(raw as SignalMessage); + return ok({ + type: "signal", + sense: obj.sense, + payload: obj.payload, + }); } -function parseErrorMsg(obj: Record, raw: unknown): Result { +function parseErrorMsg(obj: Record): Result { if (typeof obj.sense !== "string") { return err(new Error("Worker 'error' message missing string 'sense' field")); } if (typeof obj.error !== "string") { return err(new Error("Worker 'error' message missing string 'error' field")); } - return ok(raw as ErrorMessage); + return ok({ + type: "error", + sense: obj.sense, + error: obj.error, + }); } -function parseHealthResponseMsg( - obj: Record, - raw: unknown, -): Result { +function parseHealthResponseMsg(obj: Record): Result { if (!Array.isArray(obj.senses)) { return err(new Error("Worker 'health-response' message missing 'senses' array")); } if (typeof obj.inFlightCount !== "number") { return err(new Error("Worker 'health-response' message missing 'inFlightCount' number")); } - return ok(raw as HealthResponseMessage); + return ok({ + type: "health-response", + // Kernel only sends string[] today; keep accepting any array elements without filtering. + senses: obj.senses as string[], + inFlightCount: obj.inFlightCount, + }); } -const THREAD_EVENT_TYPES = new Set([ - "queued", - "started", - "step_complete", - "completed", - "failed", -]); +function isThreadEventType(value: string): value is ThreadEventType { + switch (value) { + case "queued": + case "started": + case "step_complete": + case "completed": + case "failed": + return true; + default: + return false; + } +} -function parseThreadEventMsg( - obj: Record, - raw: unknown, -): Result { +function parseThreadEventMsg(obj: Record): Result { if (typeof obj.runId !== "string") { return err(new Error("Worker 'thread-event' message missing string 'runId' field")); } - if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) { + if (typeof obj.eventType !== "string" || !isThreadEventType(obj.eventType)) { return err( new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`), ); @@ -225,20 +264,26 @@ function parseThreadEventMsg( if (!("payload" in obj)) { return err(new Error("Worker 'thread-event' message missing 'payload' field")); } - return ok(raw as ThreadEventMessage); + return ok({ + type: "thread-event", + runId: obj.runId, + eventType: obj.eventType, + payload: obj.payload, + }); } -function parseWorkflowErrorMsg( - obj: Record, - raw: unknown, -): Result { +function parseWorkflowErrorMsg(obj: Record): Result { if (typeof obj.runId !== "string") { return err(new Error("Worker 'workflow-error' message missing string 'runId' field")); } if (typeof obj.error !== "string") { return err(new Error("Worker 'workflow-error' message missing string 'error' field")); } - return ok(raw as WorkflowErrorMessage); + return ok({ + type: "workflow-error", + runId: obj.runId, + error: obj.error, + }); } const WORKER_MSG_TYPES = new Set([ @@ -251,17 +296,14 @@ const WORKER_MSG_TYPES = new Set([ "thread-workflow-message", ]); -function parseThreadWorkflowMessageMsg( - obj: Record, - raw: unknown, -): Result { +function parseThreadWorkflowMessageMsg(obj: Record): Result { if (typeof obj.runId !== "string") { return err(new Error("Worker 'thread-workflow-message' missing string 'runId' field")); } - if (obj.message === null || typeof obj.message !== "object") { + if (!isPlainRecord(obj.message)) { return err(new Error("Worker 'thread-workflow-message' missing object 'message' field")); } - const msg = obj.message as Record; + const msg = obj.message; if (typeof msg.role !== "string") { return err(new Error("Worker 'thread-workflow-message' message missing string 'role' field")); } @@ -275,26 +317,35 @@ function parseThreadWorkflowMessageMsg( new Error("Worker 'thread-workflow-message' message missing number 'timestamp' field"), ); } - return ok(raw as ThreadWorkflowMessageMessage); + return ok({ + type: "thread-workflow-message", + runId: obj.runId, + message: { + role: msg.role, + content: msg.content, + meta: "meta" in msg ? msg.meta : undefined, + timestamp: msg.timestamp, + }, + }); } /** Validate and parse an unknown IPC message received from a worker process. */ export function parseWorkerMessage(raw: unknown): Result { - if (raw === null || typeof raw !== "object") { + if (!isPlainRecord(raw)) { return err(new Error("Worker IPC message is not an object")); } - const obj = raw as Record; + const obj = raw; if (typeof obj.type !== "string") { return err(new Error("Worker IPC message missing string 'type' field")); } if (!WORKER_MSG_TYPES.has(obj.type)) { return err(new Error(`Unknown worker IPC message type: "${obj.type}"`)); } - if (obj.type === "signal") return parseSignalMsg(obj, raw); - if (obj.type === "error") return parseErrorMsg(obj, raw); - if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw); - if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw); - if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw); - if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj, raw); + if (obj.type === "signal") return parseSignalMsg(obj); + if (obj.type === "error") return parseErrorMsg(obj); + if (obj.type === "health-response") return parseHealthResponseMsg(obj); + if (obj.type === "thread-event") return parseThreadEventMsg(obj); + if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj); + if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj); return ok({ type: "ready" }); } diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index e508033..207fc9c 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -100,7 +100,8 @@ export function createKernel( } let stopped = false; - let scheduler: ReflexScheduler = null as unknown as ReflexScheduler; + /** Assigned before workers start; `handleWorkerMessage` only runs after this is set. */ + let scheduler!: ReflexScheduler; let readyResolve: (() => void) | undefined; const ready = new Promise((resolve) => { diff --git a/packages/daemon/src/sense-runtime.ts b/packages/daemon/src/sense-runtime.ts index 9d4d9cb..495576f 100644 --- a/packages/daemon/src/sense-runtime.ts +++ b/packages/daemon/src/sense-runtime.ts @@ -6,7 +6,7 @@ import { drizzle } from "drizzle-orm/node-sqlite"; import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite"; import type { Result } from "@uncaged/nerve-core"; -import { err, ok } from "@uncaged/nerve-core"; +import { err, isPlainRecord, ok } from "@uncaged/nerve-core"; import type { BlobStore } from "@uncaged/nerve-store"; @@ -108,8 +108,9 @@ export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Resu const filesResult = listMigrationFiles(migrationsDir); if (!filesResult.ok) return filesResult; + const migrationRows = sqlite.prepare("SELECT name FROM _migrations").all(); const applied = new Set( - (sqlite.prepare("SELECT name FROM _migrations").all() as Array<{ name: string }>).map( + migrationRows.filter((r): r is { name: string } => isPlainRecord(r) && typeof r.name === "string").map( (r) => r.name, ), ); @@ -145,6 +146,7 @@ export function openSenseDb( const migResult = runMigrations(sqlite, migrationsDir); if (!migResult.ok) return migResult; + // Drizzle infers a schema-specific DB type; senses are schema-agnostic at this layer. const db = drizzle({ client: sqlite }) as DrizzleDB; return ok({ sqlite, db }); } @@ -162,6 +164,7 @@ export function openPeerDb(dbPath: string): Result { return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`)); } + // Same schema-agnostic Drizzle wrapper as openSenseDb. return ok(drizzle({ client: sqlite }) as DrizzleDB); } @@ -180,18 +183,13 @@ export async function loadComputeFn(senseIndexPath: string): Promise).compute !== "function" - ) { + if (!isPlainRecord(mod) || !("compute" in mod) || typeof mod.compute !== "function") { return err( new Error(`Sense module "${senseIndexPath}" must export a named "compute" function`), ); } - return ok((mod as { compute: ComputeFn }).compute); + return ok(mod.compute as ComputeFn); } /** @@ -232,7 +230,9 @@ export async function executeCompute( } catch (e) { const msg = e instanceof Error ? e.message : String(e); if (controller.signal.aborted) { - return err(new Error(`compute("${runtime.name}") timed out after ${timeoutMs as number}ms`)); + return err( + new Error(`compute("${runtime.name}") timed out after ${String(timeoutMs ?? "?")}ms`), + ); } return err(new Error(`compute("${runtime.name}") threw: ${msg}`)); } finally { diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index f8f19e1..fdae7de 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -12,7 +12,7 @@ import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import type { NerveConfig, WorkflowConfig, WorkflowMessage } from "@uncaged/nerve-core"; -import { START } from "@uncaged/nerve-core"; +import { START, isPlainRecord } from "@uncaged/nerve-core"; import type { ResumeThreadMessage, @@ -91,8 +91,8 @@ function readLaunchFromTriggerPayload( raw: unknown, engineDefaultMaxRounds: number, ): { prompt: string; maxRounds: number } { - if (raw !== null && typeof raw === "object" && !Array.isArray(raw)) { - const o = raw as Record; + if (isPlainRecord(raw)) { + const o = raw; if (typeof o.prompt === "string" && typeof o.maxRounds === "number") { return { prompt: o.prompt, maxRounds: o.maxRounds }; } diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts index cbcfd47..c920f28 100644 --- a/packages/daemon/src/workflow-worker.ts +++ b/packages/daemon/src/workflow-worker.ts @@ -12,8 +12,14 @@ import { existsSync } from "node:fs"; import { join, resolve } from "node:path"; -import type { RoleMeta, WorkflowDefinition, WorkflowMessage } from "@uncaged/nerve-core"; -import { END, START } from "@uncaged/nerve-core"; +import type { + Moderator, + RoleMeta, + StartSignal, + WorkflowDefinition, + WorkflowMessage, +} from "@uncaged/nerve-core"; +import { END, START, isPlainRecord } from "@uncaged/nerve-core"; import type { ThreadEventType, @@ -23,6 +29,8 @@ import type { import { parseParentMessage } from "./ipc.js"; import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js"; +type ModeratorInput = Parameters>[0]; + // --------------------------------------------------------------------------- // IPC helpers // --------------------------------------------------------------------------- @@ -93,21 +101,17 @@ async function runThread( return; } - const lastSignal = + const lastSignal: ModeratorInput = lastMsg.role === START ? { role: START, content: lastMsg.content, - meta: lastMsg.meta as { maxRounds: number }, + meta: lastMsg.meta as StartSignal["meta"], timestamp: lastMsg.timestamp, } : { role: lastMsg.role, meta: lastMsg.meta as Record }; - let nextRole = def.moderator( - lastSignal as Parameters[0], - roleRound, - maxRounds, - ); + let nextRole = def.moderator(lastSignal, roleRound, maxRounds); if (nextRole === END) { sendThreadEvent(runId, "completed", null); @@ -150,8 +154,8 @@ async function runThread( roleRound += 1; - const signal = { role: nextRole, meta: result.meta }; - nextRole = def.moderator(signal as Parameters[0], roleRound, maxRounds); + const signal: ModeratorInput = { role: nextRole, meta: result.meta }; + nextRole = def.moderator(signal, roleRound, maxRounds); if (nextRole === END) { sendThreadEvent(runId, "completed", null); @@ -166,6 +170,17 @@ async function runThread( // Workflow definition loader // --------------------------------------------------------------------------- +function isWorkflowDefinitionShape(def: unknown): def is WorkflowDefinition { + if (!isPlainRecord(def)) return false; + return ( + typeof def.moderator === "function" && + typeof def.roles === "object" && + def.roles !== null && + !Array.isArray(def.roles) && + typeof def.name === "string" + ); +} + async function loadWorkflowDefinition( nerveRoot: string, workflowName: string, @@ -186,19 +201,13 @@ async function loadWorkflowDefinition( const mod = await import(indexPath); const def: unknown = mod.default ?? mod; - if ( - def === null || - typeof def !== "object" || - typeof (def as WorkflowDefinition).moderator !== "function" || - typeof (def as WorkflowDefinition).roles !== "object" || - typeof (def as WorkflowDefinition).name !== "string" - ) { + if (!isWorkflowDefinitionShape(def)) { throw new Error( `Workflow "${workflowName}" must export a WorkflowDefinition with "name", "roles", and "moderator".`, ); } - return def as WorkflowDefinition; + return def; } // --------------------------------------------------------------------------- @@ -253,7 +262,7 @@ function handleMessage( const previous = inFlight.get(runId) ?? Promise.resolve(); const next = previous - .then(() => runThread(def, runId, maxRounds, messages as WorkflowMessage[], null)) + .then(() => runThread(def, runId, maxRounds, messages, null)) .catch((e: unknown) => { const errMsg = e instanceof Error ? e.message : String(e); sendWorkflowError(runId, errMsg); diff --git a/packages/store/src/log-store.ts b/packages/store/src/log-store.ts index 35c7370..f25afb0 100644 --- a/packages/store/src/log-store.ts +++ b/packages/store/src/log-store.ts @@ -11,6 +11,8 @@ import { mkdirSync, writeFileSync } from "node:fs"; import { dirname, join } from "node:path"; import { DatabaseSync, type StatementSync } from "node:sqlite"; +import { isPlainRecord } from "@uncaged/nerve-core"; + import { DEFAULT_LOG_RETENTION_MS, LOG_ARCHIVE_META_KEY, @@ -68,11 +70,15 @@ const VALID_WORKFLOW_STATUSES = new Set([ "interrupted", ]); +function isWorkflowRunStatus(value: string): value is WorkflowRunStatus { + return VALID_WORKFLOW_STATUSES.has(value); +} + function validateWorkflowRunStatus(status: string): WorkflowRunStatus { - if (!VALID_WORKFLOW_STATUSES.has(status)) { + if (!isWorkflowRunStatus(status)) { throw new Error(`Invalid workflow run status from DB: "${status}"`); } - return status as WorkflowRunStatus; + return status; } /** One row in the workflow_runs materialized table. */ @@ -508,10 +514,9 @@ export function createLogStore(dbPath: string): LogStore { const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined; if (row === undefined || row.payload === null) return null; try { - const parsed = JSON.parse(row.payload) as unknown; - if (parsed !== null && typeof parsed === "object") { - const obj = parsed as Record; - return obj.triggerPayload ?? null; + const parsed: unknown = JSON.parse(row.payload); + if (isPlainRecord(parsed)) { + return parsed.triggerPayload ?? null; } } catch { // malformed @@ -525,12 +530,8 @@ export function createLogStore(dbPath: string): LogStore { for (const row of rows) { if (row.payload === null) continue; try { - const parsed = JSON.parse(row.payload) as unknown; - if ( - parsed !== null && - typeof parsed === "object" && - typeof (parsed as Record).type === "string" - ) { + const parsed: unknown = JSON.parse(row.payload); + if (isPlainRecord(parsed) && typeof parsed.type === "string") { result.push(parsed as { type: string; [key: string]: unknown }); } } catch { @@ -544,9 +545,9 @@ export function createLogStore(dbPath: string): LogStore { payload: string, ): { role: string; content: string; meta: unknown; timestamp: number } | null { try { - const parsed = JSON.parse(payload) as unknown; - if (parsed === null || typeof parsed !== "object") return null; - const obj = parsed as Record; + const parsed: unknown = JSON.parse(payload); + if (!isPlainRecord(parsed)) return null; + const obj = parsed; if (typeof obj.role !== "string" || typeof obj.content !== "string") return null; return { role: obj.role, @@ -584,9 +585,9 @@ export function createLogStore(dbPath: string): LogStore { fallbackTs: number, ): { role: string; content: string; meta: unknown; timestamp: number } | null { try { - const parsed = JSON.parse(payload) as unknown; - if (parsed === null || typeof parsed !== "object") return null; - const obj = parsed as Record; + const parsed: unknown = JSON.parse(payload); + if (!isPlainRecord(parsed)) return null; + const obj = parsed; if (typeof obj.role === "string" && typeof obj.content === "string") { return { role: obj.role,