Compare commits

...

2 Commits

Author SHA1 Message Date
xingyue f5cb72db50 refactor: improve type safety across codebase
- Add isPlainRecord() type guard to eliminate 'as Record<string, unknown>' casts
- Replace 'as any' with properly typed assertions in start.ts
- Remove 'null as unknown as' pattern in kernel.ts
- Add type predicates for array narrowing (item is string)
- Improve IPC message type narrowing in daemon-client.ts and ipc.ts
- Type better-sqlite3 and drizzle return values properly

No runtime behavior changes.
2026-04-24 20:07:58 +08:00
xiaomo e433e7c2a9 Merge pull request 'refactor(daemon): split kernel.ts into focused modules' (#89) from refactor/split-kernel into main 2026-04-24 11:41:44 +00:00
17 changed files with 228 additions and 131 deletions
+3 -2
View File
@@ -2,7 +2,7 @@ import { readFileSync } from "node:fs";
import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core";
import { type SenseInfo, isPlainRecord, parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js";
@@ -240,7 +240,8 @@ const senseQueryCommand = defineCommand({
}
}
const rows = db.prepare(sql).all() as Record<string, unknown>[];
const rawRows: unknown[] = db.prepare(sql).all();
const rows: Record<string, unknown>[] = rawRows.filter(isPlainRecord);
if (args.json) {
process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`);
+3 -1
View File
@@ -74,9 +74,11 @@ async function runDaemon(nerveRoot: string): Promise<void> {
const bootstrapPath = daemonBootstrapScript();
// After `open`, file-backed WriteStream has a numeric OS fd for spawn stdio; `@types/node` omits `fd` on this WriteStream alias.
const logFd = (logStream as unknown as { fd: number }).fd;
const child = spawn(process.execPath, [bootstrapPath], {
detached: true,
stdio: ["ignore", (logStream as any).fd, (logStream as any).fd],
stdio: ["ignore", logFd, logFd],
env: { ...process.env, NERVE_ROOT: nerveRoot },
cwd: nerveRoot,
});
+5 -1
View File
@@ -47,7 +47,11 @@ export const statusCommand = defineCommand({
return;
}
const pid = readPidFile() as number;
const pid = readPidFile();
if (pid === null) {
process.stdout.write("😴 Nerve daemon is not running.\n");
return;
}
const configPath = join(getNerveRoot(), "nerve.yaml");
let senseList: string[] = [];
+4 -1
View File
@@ -1,6 +1,7 @@
import { existsSync } from "node:fs";
import { join } from "node:path";
import { isPlainRecord } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import { stringify } from "yaml";
@@ -203,7 +204,9 @@ export function partitionWorkflowMessage(msg: {
const contentBody = msg.content;
const meta: Record<string, unknown> =
msg.meta !== null && msg.meta !== undefined && typeof msg.meta === "object"
? (msg.meta as Record<string, unknown>)
? isPlainRecord(msg.meta)
? msg.meta
: (msg.meta as Record<string, unknown>)
: {};
return { roleStr, contentBody, meta };
}
+21 -8
View File
@@ -9,6 +9,7 @@ import { connect } from "node:net";
import type { Socket } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
import { isPlainRecord } from "@uncaged/nerve-core";
const CONNECT_TIMEOUT_MS = 3_000;
const RESPONSE_TIMEOUT_MS = 5_000;
@@ -19,11 +20,22 @@ type TriggerResponse = { ok: true } | { ok: false; error: string };
type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string };
function isSenseInfo(value: unknown): value is SenseInfo {
if (!isPlainRecord(value)) return false;
return (
typeof value.name === "string" &&
typeof value.group === "string" &&
(value.throttle === null || typeof value.throttle === "number") &&
(value.timeout === null || typeof value.timeout === "number") &&
(value.lastSignalTs === null || typeof value.lastSignalTs === "number")
);
}
function parseDaemonResponse(line: string): TriggerResponse {
try {
const obj = JSON.parse(line) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
const obj: unknown = JSON.parse(line);
if (isPlainRecord(obj)) {
const r = obj;
if (r.ok === true) return { ok: true };
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
}
@@ -35,12 +47,13 @@ function parseDaemonResponse(line: string): TriggerResponse {
function parseListSensesResponse(line: string): ListSensesResponse {
try {
const obj = JSON.parse(line) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
const obj: unknown = JSON.parse(line);
if (isPlainRecord(obj)) {
const r = obj;
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
if (r.ok === true && Array.isArray(r.senses))
return { ok: true, senses: r.senses as SenseInfo[] };
if (r.ok === true && Array.isArray(r.senses) && r.senses.every(isSenseInfo)) {
return { ok: true, senses: r.senses };
}
}
} catch {
// fall through
+1
View File
@@ -46,5 +46,6 @@ export type DaemonModule = {
export async function loadDaemonModule(nerveRoot: string): Promise<DaemonModule> {
const entry = assertWorkspaceDaemonInstalled(nerveRoot);
const url = pathToFileURL(entry).href;
// Dynamic import return type is module-specific; narrow at this workspace boundary.
return import(url) as Promise<DaemonModule>;
}
+15 -14
View File
@@ -1,5 +1,6 @@
import { parse } from "yaml";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./types.js";
@@ -40,11 +41,11 @@ function parseDurationField(field: unknown, label: string): Result<number | null
}
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
if (!isPlainRecord(raw)) {
return err(new Error(`senses.${name}: must be an object`));
}
const obj = raw as Record<string, unknown>;
const obj = raw;
if (typeof obj.group !== "string" || obj.group.trim() === "") {
return err(new Error(`senses.${name}.group: required string`));
@@ -77,10 +78,10 @@ function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[] | null> {
if (obj.on === undefined || obj.on === null) return ok(null);
if (!Array.isArray(obj.on) || !obj.on.every((item) => typeof item === "string")) {
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
}
return ok(obj.on as string[]);
return ok(obj.on);
}
function parseSenseReflex(
@@ -118,11 +119,11 @@ function validateReflexConfig(
raw: unknown,
senseNames: Set<string>,
): Result<ReflexConfig> {
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
if (!isPlainRecord(raw)) {
return err(new Error(`reflexes[${index}]: must be an object`));
}
const obj = raw as Record<string, unknown>;
const obj = raw;
const hasSense = obj.sense !== undefined;
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
@@ -158,11 +159,11 @@ function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
}
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
if (!isPlainRecord(raw)) {
return err(new Error(`workflows.${name}: must be an object`));
}
const obj = raw as Record<string, unknown>;
const obj = raw;
if (
typeof obj.concurrency !== "number" ||
@@ -209,11 +210,11 @@ function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConf
function parseSenses(
obj: Record<string, unknown>,
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
if (obj.senses === null || typeof obj.senses !== "object" || Array.isArray(obj.senses)) {
if (!isPlainRecord(obj.senses)) {
return err(new Error("senses: required object"));
}
const sensesRaw = obj.senses as Record<string, unknown>;
const sensesRaw = obj.senses;
const senses: Record<string, SenseConfig> = {};
const senseNames = new Set(Object.keys(sensesRaw));
@@ -249,11 +250,11 @@ function parseWorkflows(
): Result<Record<string, WorkflowConfig> | null> {
if (obj.workflows === undefined || obj.workflows === null) return ok(null);
if (typeof obj.workflows !== "object" || Array.isArray(obj.workflows)) {
if (!isPlainRecord(obj.workflows)) {
return err(new Error("workflows: must be an object if provided"));
}
const workflowsRaw = obj.workflows as Record<string, unknown>;
const workflowsRaw = obj.workflows;
const workflows: Record<string, WorkflowConfig> = {};
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
@@ -275,11 +276,11 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
return err(new Error(`YAML parse error: ${message}`));
}
if (parsed === null || typeof parsed !== "object" || Array.isArray(parsed)) {
if (!isPlainRecord(parsed)) {
return err(new Error("Config must be a YAML object"));
}
const obj = parsed as Record<string, unknown>;
const obj = parsed;
const sensesResult = parseSenses(obj);
if (!sensesResult.ok) return sensesResult;
+1
View File
@@ -22,6 +22,7 @@ export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js";
export type { Result } from "./result.js";
export { ok, err } from "./result.js";
export { parseNerveConfig } from "./config.js";
export { isPlainRecord } from "./is-plain-record.js";
export type { ParsedSenseWorkflowDirective, SenseComputeRoute } from "./sense-workflow-directive.js";
export { parseSenseWorkflowDirective, routeSenseComputeOutput } from "./sense-workflow-directive.js";
+7
View File
@@ -0,0 +1,7 @@
/**
* Narrows `unknown` to a plain JSON-style object (not null, not array).
* Use after `JSON.parse` / YAML / IPC when validating structure field-by-field.
*/
export function isPlainRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
@@ -1,3 +1,4 @@
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
@@ -54,10 +55,10 @@ function stripWorkflowKey(payload: Record<string, unknown>): Record<string, unkn
* - `workflow: "name|n|prompt"` → launch workflow; no Signal is emitted to the bus
*/
export function routeSenseComputeOutput(payload: unknown): SenseComputeRoute {
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
if (!isPlainRecord(payload)) {
return { kind: "signal", payload };
}
const obj = payload as Record<string, unknown>;
const obj = payload;
if (!Object.hasOwn(obj, "workflow")) {
return { kind: "signal", payload };
}
+5 -4
View File
@@ -14,6 +14,7 @@ import { rmSync } from "node:fs";
import { type Server, type Socket, createServer } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
import { isPlainRecord } from "@uncaged/nerve-core";
import type { WorkflowManager } from "./workflow-manager.js";
@@ -51,14 +52,14 @@ export type DaemonIpcServer = {
function parseRequest(line: string): DaemonRequest | null {
try {
const obj = JSON.parse(line) as unknown;
if (obj === null || typeof obj !== "object") return null;
const req = obj as Record<string, unknown>;
const obj: unknown = JSON.parse(line);
if (!isPlainRecord(obj)) return null;
const req = obj;
if (req.type === "trigger-workflow") {
if (typeof req.workflow !== "string" || req.workflow.length === 0) return null;
if (typeof req.prompt !== "string") return null;
if (typeof req.maxRounds !== "number") return null;
return { type: "trigger-workflow", workflow: req.workflow, prompt: req.prompt, maxRounds: req.maxRounds as number };
return { type: "trigger-workflow", workflow: req.workflow, prompt: req.prompt, maxRounds: req.maxRounds };
}
if (req.type === "trigger-sense") {
if (typeof req.sense !== "string" || req.sense.length === 0) return null;
+97 -46
View File
@@ -4,7 +4,7 @@
*/
import type { Result } from "@uncaged/nerve-core";
import { err, ok } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
/** Parent → Worker: trigger one compute cycle for a sense */
export type ComputeMessage = {
@@ -148,76 +148,115 @@ function validateResumeThreadMsg(obj: Record<string, unknown>): string | null {
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
if (raw === null || typeof raw !== "object") {
if (!isPlainRecord(raw)) {
return err(new Error("IPC message is not an object"));
}
const obj = raw as Record<string, unknown>;
const obj = raw;
if (typeof obj.type !== "string") {
return err(new Error("IPC message missing string 'type' field"));
}
if (!PARENT_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
}
if (obj.type === "compute") {
if (typeof obj.sense !== "string") {
return err(new Error("IPC 'compute' message missing string 'sense' field"));
}
return ok({ type: "compute", sense: obj.sense });
}
if (obj.type === "shutdown") {
return ok({ type: "shutdown" });
}
if (obj.type === "health-request") {
return ok({ type: "health-request" });
}
if (obj.type === "start-thread") {
const errMsg = validateStartThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
// Field types are validated above; `Record<string, unknown>` values stay `unknown` to TypeScript.
return ok({
type: "start-thread",
runId: obj.runId,
workflow: obj.workflow,
prompt: obj.prompt,
maxRounds: obj.maxRounds,
} as StartThreadMessage);
}
if (obj.type === "resume-thread") {
const errMsg = validateResumeThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
// Elements are validated as plain objects by the kernel; trust the wire shape here.
return ok({
type: "resume-thread",
runId: obj.runId,
messages: obj.messages as ResumeThreadMessage["messages"],
maxRounds: obj.maxRounds,
} as ResumeThreadMessage);
}
return ok(raw as ParentToWorkerMessage);
return err(new Error(`Unhandled IPC message type: "${obj.type}"`));
}
function parseSignalMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
function parseSignalMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'signal' message missing string 'sense' field"));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'signal' message missing 'payload' field"));
}
return ok(raw as SignalMessage);
return ok({
type: "signal",
sense: obj.sense,
payload: obj.payload,
});
}
function parseErrorMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
function parseErrorMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'error' message missing string 'sense' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'error' message missing string 'error' field"));
}
return ok(raw as ErrorMessage);
return ok({
type: "error",
sense: obj.sense,
error: obj.error,
});
}
function parseHealthResponseMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
function parseHealthResponseMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (!Array.isArray(obj.senses)) {
return err(new Error("Worker 'health-response' message missing 'senses' array"));
}
if (typeof obj.inFlightCount !== "number") {
return err(new Error("Worker 'health-response' message missing 'inFlightCount' number"));
}
return ok(raw as HealthResponseMessage);
return ok({
type: "health-response",
// Kernel only sends string[] today; keep accepting any array elements without filtering.
senses: obj.senses as string[],
inFlightCount: obj.inFlightCount,
});
}
const THREAD_EVENT_TYPES = new Set<string>([
"queued",
"started",
"step_complete",
"completed",
"failed",
]);
function isThreadEventType(value: string): value is ThreadEventType {
switch (value) {
case "queued":
case "started":
case "step_complete":
case "completed":
case "failed":
return true;
default:
return false;
}
}
function parseThreadEventMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
function parseThreadEventMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-event' message missing string 'runId' field"));
}
if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) {
if (typeof obj.eventType !== "string" || !isThreadEventType(obj.eventType)) {
return err(
new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`),
);
@@ -225,20 +264,26 @@ function parseThreadEventMsg(
if (!("payload" in obj)) {
return err(new Error("Worker 'thread-event' message missing 'payload' field"));
}
return ok(raw as ThreadEventMessage);
return ok({
type: "thread-event",
runId: obj.runId,
eventType: obj.eventType,
payload: obj.payload,
});
}
function parseWorkflowErrorMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
function parseWorkflowErrorMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'runId' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
}
return ok(raw as WorkflowErrorMessage);
return ok({
type: "workflow-error",
runId: obj.runId,
error: obj.error,
});
}
const WORKER_MSG_TYPES = new Set([
@@ -251,17 +296,14 @@ const WORKER_MSG_TYPES = new Set([
"thread-workflow-message",
]);
function parseThreadWorkflowMessageMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
function parseThreadWorkflowMessageMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-workflow-message' missing string 'runId' field"));
}
if (obj.message === null || typeof obj.message !== "object") {
if (!isPlainRecord(obj.message)) {
return err(new Error("Worker 'thread-workflow-message' missing object 'message' field"));
}
const msg = obj.message as Record<string, unknown>;
const msg = obj.message;
if (typeof msg.role !== "string") {
return err(new Error("Worker 'thread-workflow-message' message missing string 'role' field"));
}
@@ -275,26 +317,35 @@ function parseThreadWorkflowMessageMsg(
new Error("Worker 'thread-workflow-message' message missing number 'timestamp' field"),
);
}
return ok(raw as ThreadWorkflowMessageMessage);
return ok({
type: "thread-workflow-message",
runId: obj.runId,
message: {
role: msg.role,
content: msg.content,
meta: "meta" in msg ? msg.meta : undefined,
timestamp: msg.timestamp,
},
});
}
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (raw === null || typeof raw !== "object") {
if (!isPlainRecord(raw)) {
return err(new Error("Worker IPC message is not an object"));
}
const obj = raw as Record<string, unknown>;
const obj = raw;
if (typeof obj.type !== "string") {
return err(new Error("Worker IPC message missing string 'type' field"));
}
if (!WORKER_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
}
if (obj.type === "signal") return parseSignalMsg(obj, raw);
if (obj.type === "error") return parseErrorMsg(obj, raw);
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw);
if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj, raw);
if (obj.type === "signal") return parseSignalMsg(obj);
if (obj.type === "error") return parseErrorMsg(obj);
if (obj.type === "health-response") return parseHealthResponseMsg(obj);
if (obj.type === "thread-event") return parseThreadEventMsg(obj);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj);
if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj);
return ok({ type: "ready" });
}
+2 -1
View File
@@ -100,7 +100,8 @@ export function createKernel(
}
let stopped = false;
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
/** Assigned before workers start; `handleWorkerMessage` only runs after this is set. */
let scheduler!: ReflexScheduler;
let readyResolve: (() => void) | undefined;
const ready = new Promise<void>((resolve) => {
+10 -10
View File
@@ -6,7 +6,7 @@ import { drizzle } from "drizzle-orm/node-sqlite";
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
import type { Result } from "@uncaged/nerve-core";
import { err, ok } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
import type { BlobStore } from "@uncaged/nerve-store";
@@ -108,8 +108,9 @@ export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Resu
const filesResult = listMigrationFiles(migrationsDir);
if (!filesResult.ok) return filesResult;
const migrationRows = sqlite.prepare("SELECT name FROM _migrations").all();
const applied = new Set<string>(
(sqlite.prepare("SELECT name FROM _migrations").all() as Array<{ name: string }>).map(
migrationRows.filter((r): r is { name: string } => isPlainRecord(r) && typeof r.name === "string").map(
(r) => r.name,
),
);
@@ -145,6 +146,7 @@ export function openSenseDb(
const migResult = runMigrations(sqlite, migrationsDir);
if (!migResult.ok) return migResult;
// Drizzle infers a schema-specific DB type; senses are schema-agnostic at this layer.
const db = drizzle({ client: sqlite }) as DrizzleDB;
return ok({ sqlite, db });
}
@@ -162,6 +164,7 @@ export function openPeerDb(dbPath: string): Result<DrizzleDB> {
return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`));
}
// Same schema-agnostic Drizzle wrapper as openSenseDb.
return ok(drizzle({ client: sqlite }) as DrizzleDB);
}
@@ -180,18 +183,13 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
return err(new Error(`Failed to import sense module "${senseIndexPath}": ${msg}`));
}
if (
mod === null ||
typeof mod !== "object" ||
!("compute" in mod) ||
typeof (mod as Record<string, unknown>).compute !== "function"
) {
if (!isPlainRecord(mod) || !("compute" in mod) || typeof mod.compute !== "function") {
return err(
new Error(`Sense module "${senseIndexPath}" must export a named "compute" function`),
);
}
return ok((mod as { compute: ComputeFn }).compute);
return ok(mod.compute as ComputeFn);
}
/**
@@ -232,7 +230,9 @@ export async function executeCompute(
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
if (controller.signal.aborted) {
return err(new Error(`compute("${runtime.name}") timed out after ${timeoutMs as number}ms`));
return err(
new Error(`compute("${runtime.name}") timed out after ${String(timeoutMs ?? "?")}ms`),
);
}
return err(new Error(`compute("${runtime.name}") threw: ${msg}`));
} finally {
+3 -3
View File
@@ -12,7 +12,7 @@ import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig, WorkflowMessage } from "@uncaged/nerve-core";
import { START } from "@uncaged/nerve-core";
import { START, isPlainRecord } from "@uncaged/nerve-core";
import type {
ResumeThreadMessage,
@@ -91,8 +91,8 @@ function readLaunchFromTriggerPayload(
raw: unknown,
engineDefaultMaxRounds: number,
): { prompt: string; maxRounds: number } {
if (raw !== null && typeof raw === "object" && !Array.isArray(raw)) {
const o = raw as Record<string, unknown>;
if (isPlainRecord(raw)) {
const o = raw;
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
return { prompt: o.prompt, maxRounds: o.maxRounds };
}
+29 -20
View File
@@ -12,8 +12,14 @@
import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type { RoleMeta, WorkflowDefinition, WorkflowMessage } from "@uncaged/nerve-core";
import { END, START } from "@uncaged/nerve-core";
import type {
Moderator,
RoleMeta,
StartSignal,
WorkflowDefinition,
WorkflowMessage,
} from "@uncaged/nerve-core";
import { END, START, isPlainRecord } from "@uncaged/nerve-core";
import type {
ThreadEventType,
@@ -23,6 +29,8 @@ import type {
import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
type ModeratorInput = Parameters<Moderator<RoleMeta>>[0];
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
@@ -93,21 +101,17 @@ async function runThread(
return;
}
const lastSignal =
const lastSignal: ModeratorInput =
lastMsg.role === START
? {
role: START,
content: lastMsg.content,
meta: lastMsg.meta as { maxRounds: number },
meta: lastMsg.meta as StartSignal["meta"],
timestamp: lastMsg.timestamp,
}
: { role: lastMsg.role, meta: lastMsg.meta as Record<string, unknown> };
let nextRole = def.moderator(
lastSignal as Parameters<typeof def.moderator>[0],
roleRound,
maxRounds,
);
let nextRole = def.moderator(lastSignal, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
@@ -150,8 +154,8 @@ async function runThread(
roleRound += 1;
const signal = { role: nextRole, meta: result.meta };
nextRole = def.moderator(signal as Parameters<typeof def.moderator>[0], roleRound, maxRounds);
const signal: ModeratorInput = { role: nextRole, meta: result.meta };
nextRole = def.moderator(signal, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
@@ -166,6 +170,17 @@ async function runThread(
// Workflow definition loader
// ---------------------------------------------------------------------------
function isWorkflowDefinitionShape(def: unknown): def is WorkflowDefinition<RoleMeta> {
if (!isPlainRecord(def)) return false;
return (
typeof def.moderator === "function" &&
typeof def.roles === "object" &&
def.roles !== null &&
!Array.isArray(def.roles) &&
typeof def.name === "string"
);
}
async function loadWorkflowDefinition(
nerveRoot: string,
workflowName: string,
@@ -186,19 +201,13 @@ async function loadWorkflowDefinition(
const mod = await import(indexPath);
const def: unknown = mod.default ?? mod;
if (
def === null ||
typeof def !== "object" ||
typeof (def as WorkflowDefinition<RoleMeta>).moderator !== "function" ||
typeof (def as WorkflowDefinition<RoleMeta>).roles !== "object" ||
typeof (def as WorkflowDefinition<RoleMeta>).name !== "string"
) {
if (!isWorkflowDefinitionShape(def)) {
throw new Error(
`Workflow "${workflowName}" must export a WorkflowDefinition with "name", "roles", and "moderator".`,
);
}
return def as WorkflowDefinition<RoleMeta>;
return def;
}
// ---------------------------------------------------------------------------
@@ -253,7 +262,7 @@ function handleMessage(
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, runId, maxRounds, messages as WorkflowMessage[], null))
.then(() => runThread(def, runId, maxRounds, messages, null))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
+19 -18
View File
@@ -11,6 +11,8 @@ import { mkdirSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { DatabaseSync, type StatementSync } from "node:sqlite";
import { isPlainRecord } from "@uncaged/nerve-core";
import {
DEFAULT_LOG_RETENTION_MS,
LOG_ARCHIVE_META_KEY,
@@ -68,11 +70,15 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
"interrupted",
]);
function isWorkflowRunStatus(value: string): value is WorkflowRunStatus {
return VALID_WORKFLOW_STATUSES.has(value);
}
function validateWorkflowRunStatus(status: string): WorkflowRunStatus {
if (!VALID_WORKFLOW_STATUSES.has(status)) {
if (!isWorkflowRunStatus(status)) {
throw new Error(`Invalid workflow run status from DB: "${status}"`);
}
return status as WorkflowRunStatus;
return status;
}
/** One row in the workflow_runs materialized table. */
@@ -508,10 +514,9 @@ export function createLogStore(dbPath: string): LogStore {
const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined;
if (row === undefined || row.payload === null) return null;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (parsed !== null && typeof parsed === "object") {
const obj = parsed as Record<string, unknown>;
return obj.triggerPayload ?? null;
const parsed: unknown = JSON.parse(row.payload);
if (isPlainRecord(parsed)) {
return parsed.triggerPayload ?? null;
}
} catch {
// malformed
@@ -525,12 +530,8 @@ export function createLogStore(dbPath: string): LogStore {
for (const row of rows) {
if (row.payload === null) continue;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (
parsed !== null &&
typeof parsed === "object" &&
typeof (parsed as Record<string, unknown>).type === "string"
) {
const parsed: unknown = JSON.parse(row.payload);
if (isPlainRecord(parsed) && typeof parsed.type === "string") {
result.push(parsed as { type: string; [key: string]: unknown });
}
} catch {
@@ -544,9 +545,9 @@ export function createLogStore(dbPath: string): LogStore {
payload: string,
): { role: string; content: string; meta: unknown; timestamp: number } | null {
try {
const parsed = JSON.parse(payload) as unknown;
if (parsed === null || typeof parsed !== "object") return null;
const obj = parsed as Record<string, unknown>;
const parsed: unknown = JSON.parse(payload);
if (!isPlainRecord(parsed)) return null;
const obj = parsed;
if (typeof obj.role !== "string" || typeof obj.content !== "string") return null;
return {
role: obj.role,
@@ -584,9 +585,9 @@ export function createLogStore(dbPath: string): LogStore {
fallbackTs: number,
): { role: string; content: string; meta: unknown; timestamp: number } | null {
try {
const parsed = JSON.parse(payload) as unknown;
if (parsed === null || typeof parsed !== "object") return null;
const obj = parsed as Record<string, unknown>;
const parsed: unknown = JSON.parse(payload);
if (!isPlainRecord(parsed)) return null;
const obj = parsed;
if (typeof obj.role === "string" && typeof obj.content === "string") {
return {
role: obj.role,