Compare commits

...

12 Commits

Author SHA1 Message Date
xiaoju 21be4997df feat(workflow-utils): add dryRunDefaults option to llmExtract
Lets workflow authors provide semantically meaningful mock data for dryRun mode. Falls back to schemaDefaults() when not provided.

Fixes #130

小橘 🍊(NEKO Team)

Made-with: Cursor
2026-04-25 04:40:32 +00:00
xiaomo ce20d73ab6 Merge pull request 'fix(workflow-utils): llmExtract dryRun returns schema-shaped defaults' (#126) from fix/123-llmextract-dryrun-defaults into main 2026-04-25 04:35:45 +00:00
xiaoju 7c999a0689 fix(workflow-utils): dryRun llmExtract returns schema-shaped defaults
Add schemaDefaults() from Zod def types; export from package; tests for nested/array/enum/optional.

Made-with: Cursor
2026-04-25 04:31:46 +00:00
xiaomo 111b7e2734 Merge pull request 'feat: workflow exit codes & kill mechanism' (#122) from feat/121-workflow-exit-codes into main 2026-04-25 04:03:29 +00:00
xiaoju 01d7435c4a feat: workflow exit codes & kill mechanism
- Add exit_code to workflow_runs (0=success, 1=role error, 2=maxRounds, 137=killed, 255=crash)
- Expand status enum: started/completed/failed/killed
- Add kill-thread IPC message for graceful workflow termination
- Add 'nerve workflow kill <runId>' CLI command
- Show exit_code in 'nerve workflow list' output

Fixes #121
2026-04-25 03:57:26 +00:00
xiaoju 889bbbb474 Merge pull request 'refactor(core): SenseResult<T> generic + split types.ts' (#118) from refactor/111-split-types-generify-sense-result into main 2026-04-25 02:59:48 +00:00
xiaoju 418ae6a073 refactor(core): SenseResult generic + split types.ts into config/sense/workflow
- SenseResult<T = unknown> with payload: T
- types.ts split into config.ts (types), sense.ts, workflow.ts
- Original config.ts (parseNerveConfig) moved to parse-nerve-config.ts
- index.ts re-exports from new modules, external API unchanged
- daemon-ipc-protocol.ts imports SenseInfo from sense.ts

Fixes #111
2026-04-25 02:56:55 +00:00
xiaoju c6f56155c8 Merge pull request 'refactor(core): restructure ModeratorContext to { start, steps }' (#117) from refactor/110-moderator-context-restructure into main 2026-04-25 02:51:50 +00:00
xiaoju 3ce9e3a846 refactor(core): restructure ModeratorContext to { start, steps }
- ModeratorContext: discriminated union → { start: StartStep; steps: RoleStep<M>[] }
- Moderator signature: (context, round, maxRounds) → (context)
- round derivable from steps.length, maxRounds from start.meta.maxRounds
- workflow-worker.ts: build steps array, pass full context to moderator
- Remove unused ModeratorContext import from workflow-worker
- Update README.md

Refs #110
2026-04-25 02:48:28 +00:00
xiaoju 0fff8ef954 Merge pull request 'refactor(core): rename RoleSignal → RoleStep, StartSignal → StartStep' (#116) from refactor/109-role-step into main 2026-04-25 02:37:03 +00:00
xiaoju beada2ae09 refactor(core): rename RoleSignal → RoleStep, StartSignal → StartStep
- RoleStep now includes content and timestamp fields (aligned with StartStep)
- ModeratorContext.signal → ModeratorContext.step
- workflow-utils: start-signal.ts → start-step.ts, isDryRun updated

Fixes #109
2026-04-25 02:34:33 +00:00
xiaoju 47d23bc1a7 Merge pull request 'refactor(store): rename LogEntry.ts → LogEntry.timestamp' (#114) from refactor/113-logentry-timestamp into main 2026-04-25 02:28:38 +00:00
27 changed files with 1044 additions and 550 deletions
+2 -1
View File
@@ -45,7 +45,7 @@ function upsertRun(
): void {
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: timestampMs },
{ runId, workflow, status, timestamp: timestampMs },
{ runId, workflow, status, timestamp: timestampMs, exitCode: null },
);
}
@@ -83,6 +83,7 @@ describe("statusIcon", () => {
["crashed", "💥"],
["dropped", "🗑"],
["interrupted", "⚠️"],
["killed", "🛑"],
] as const)("maps status=%s to icon=%s", (status, icon) => {
expect(statusIcon(status)).toBe(icon);
});
+46 -2
View File
@@ -7,7 +7,7 @@ import { defineCommand } from "citty";
import { stringify } from "yaml";
import type { LogStore, ThreadRoundRow, WorkflowRun } from "@uncaged/nerve-store";
import { triggerWorkflowViaDaemon } from "../daemon-client.js";
import { killWorkflowViaDaemon, triggerWorkflowViaDaemon } from "../daemon-client.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
@@ -59,6 +59,8 @@ export function statusIcon(status: WorkflowRun["status"]): string {
return "🗑";
case "interrupted":
return "⚠️";
case "killed":
return "🛑";
default: {
const _exhaustive: never = status;
return `?(${_exhaustive})`;
@@ -79,7 +81,8 @@ export function getAllWorkflowRuns(store: LogStore, filterWorkflow: string | nul
*/
export function formatRunLine(run: WorkflowRun): string {
const icon = statusIcon(run.status);
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status} timestamp=${formatTs(run.timestamp)}\n`;
const exitCodeStr = run.exitCode !== null ? ` exit_code=${run.exitCode}` : "";
return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status}${exitCodeStr} timestamp=${formatTs(run.timestamp)}\n`;
}
/**
@@ -563,6 +566,46 @@ const workflowTriggerCommand = defineCommand({
},
});
// ---------------------------------------------------------------------------
// nerve workflow kill <runId>
// ---------------------------------------------------------------------------
const workflowKillCommand = defineCommand({
meta: {
name: "kill",
description: "Kill a running or queued workflow thread by runId",
},
args: {
runId: {
type: "positional",
description: "The run ID to kill",
},
},
async run({ args }) {
if (!isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n");
process.exit(1);
}
const socketPath = getSocketPath();
let response: DaemonIpcTriggerResponse;
try {
response = await killWorkflowViaDaemon(socketPath, args.runId);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`);
process.exit(1);
}
if (!response.ok) {
process.stderr.write(`❌ Kill failed: ${response.error}\n`);
process.exit(1);
}
process.stdout.write(`✅ Kill signal sent for run "${args.runId}".\n`);
},
});
// ---------------------------------------------------------------------------
// nerve workflow (parent command)
// ---------------------------------------------------------------------------
@@ -577,5 +620,6 @@ export const workflowCommand = defineCommand({
inspect: workflowInspectCommand,
thread: workflowThreadCommand,
trigger: workflowTriggerCommand,
kill: workflowKillCommand,
},
});
+12
View File
@@ -167,3 +167,15 @@ export function listSensesViaDaemon(socketPath: string): Promise<DaemonIpcListSe
const message: DaemonIpcRequest = { type: "list-senses" };
return sendAndReceive(socketPath, message, parseListSensesResponse);
}
/**
* Send a kill-workflow message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors.
*/
export function killWorkflowViaDaemon(
socketPath: string,
runId: string,
): Promise<DaemonIpcTriggerResponse> {
const message: DaemonIpcRequest = { type: "kill-workflow", runId };
return sendAndReceive(socketPath, message, parseDaemonResponse);
}
+1 -1
View File
@@ -8,7 +8,7 @@ Shared types and configuration parser for the [nerve](../../README.md) observati
- **Config parser** — `parseNerveConfig(yaml)` validates and parses `nerve.yaml` into `NerveConfig` (rejects reflex entries that declare a `workflow` key; reflexes only schedule senses)
- **Sense → workflow routing** — `parseSenseWorkflowDirective`, `routeSenseComputeOutput`, and types `ParsedSenseWorkflowDirective`, `SenseComputeRoute`
- **Daemon IPC protocol** — request/response types (`DaemonIpcRequest`, `DaemonIpcResponse`, …) and `parseDaemonIpcRequest` for newline-delimited JSON on the CLI ↔ daemon socket
- **Workflow automaton types** — `START` / `END` sentinel constants, `WorkflowMessage`, `StartSignal`, `RoleSignal`, `Moderator`, `WorkflowDefinition`, `Role`, `SenseResult`, plus `DEFAULT_ENGINE_MAX_ROUNDS`
- **Workflow automaton types** — `START` / `END` sentinel constants, `WorkflowMessage`, `StartStep`, `RoleStep`, `ModeratorContext` (`start` + `steps`; empty `steps` on first moderator call), `Moderator` (single `context` argument), `WorkflowDefinition`, `Role`, `SenseResult`, plus `DEFAULT_ENGINE_MAX_ROUNDS`
- **Result type** — `Result<T>` with `ok()` / `err()` helpers for explicit error handling (no thrown exceptions for parse paths)
## Usage
+1 -1
View File
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { parseNerveConfig } from "../config.js";
import { parseNerveConfig } from "../parse-nerve-config.js";
const VALID_CONFIG = `
senses:
+30 -295
View File
@@ -1,302 +1,37 @@
import { parse } from "yaml";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./types.js";
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js";
const DURATION_RE = /^(\d+)([smh])$/;
const DURATION_MULTIPLIERS: Record<string, number> = {
s: 1_000,
m: 60_000,
h: 3_600_000,
export type SenseConfig = {
group: string;
throttle: number | null;
timeout: number | null;
gracePeriod: number | null;
};
function parseDurationToMs(value: string): number | null {
const match = DURATION_RE.exec(value);
if (!match) return null;
return Number(match[1]) * DURATION_MULTIPLIERS[match[2]];
}
export type SenseReflexConfig = {
kind: "sense";
sense: string;
interval: number | null;
on: string[];
};
function isValidGroupName(value: string): boolean {
return /^[a-zA-Z0-9_-]+$/.test(value);
}
/** Reflexes only schedule Senses; workflow launches come from Sense return values. */
export type ReflexConfig = SenseReflexConfig;
function parseDurationField(field: unknown, label: string): Result<number | null> {
if (field === undefined || field === null) return ok(null);
if (typeof field !== "string") {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
const ms = parseDurationToMs(field);
if (ms === null) {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
return ok(ms);
}
export type DropOverflowConfig = {
concurrency: number;
overflow: "drop";
};
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`senses.${name}: must be an object`));
}
export type QueueOverflowConfig = {
concurrency: number;
overflow: "queue";
maxQueue: number;
};
const obj = raw;
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
if (typeof obj.group !== "string" || obj.group.trim() === "") {
return err(new Error(`senses.${name}.group: required string`));
}
if (!isValidGroupName(obj.group)) {
return err(
new Error(
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
),
);
}
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
if (!throttleResult.ok) return throttleResult;
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
if (!timeoutResult.ok) return timeoutResult;
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
if (!graceResult.ok) return graceResult;
return ok({
group: obj.group,
throttle: throttleResult.value,
timeout: timeoutResult.value,
gracePeriod: graceResult.value,
});
}
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
if (obj.on === undefined || obj.on === null) return ok([]);
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
}
return ok(obj.on);
}
function parseSenseReflex(
index: number,
obj: Record<string, unknown>,
senseNames: Set<string>,
on: string[],
): Result<ReflexConfig> {
if (typeof obj.sense !== "string") {
return err(new Error(`reflexes[${index}].sense: must be a string`));
}
if (!senseNames.has(obj.sense)) {
return err(new Error(`reflexes[${index}].sense: "${obj.sense}" not found in senses`));
}
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
if (!intervalResult.ok) return intervalResult;
if (intervalResult.value === null && on.length === 0) {
return err(
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
);
}
return ok({
kind: "sense" as const,
sense: obj.sense,
interval: intervalResult.value,
on,
});
}
function validateReflexConfig(
index: number,
raw: unknown,
senseNames: Set<string>,
): Result<ReflexConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`reflexes[${index}]: must be an object`));
}
const obj = raw;
const hasSense = obj.sense !== undefined;
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
if (hasWorkflowKey) {
return err(
new Error(
`reflexes[${index}]: YAML "workflow" entries are not supported — start workflows from a Sense compute return value using a "workflow" string field (format: name|maxRounds|prompt)`,
),
);
}
if (!hasSense) {
return err(new Error(`reflexes[${index}]: must include "sense"`));
}
const onResult = parseOnField(index, obj);
if (!onResult.ok) return onResult;
return parseSenseReflex(index, obj, senseNames, onResult.value);
}
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
if (obj.max_rounds === undefined || obj.max_rounds === null) {
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
}
if (
typeof obj.max_rounds !== "number" ||
!Number.isInteger(obj.max_rounds) ||
obj.max_rounds < 1
) {
return err(new Error("max_rounds: must be a positive integer"));
}
return ok(obj.max_rounds);
}
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`workflows.${name}: must be an object`));
}
const obj = raw;
if (
typeof obj.concurrency !== "number" ||
!Number.isInteger(obj.concurrency) ||
obj.concurrency < 1
) {
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
}
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
}
if (obj.overflow === "drop") {
if (obj.max_queue !== undefined && obj.max_queue !== null) {
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
}
return ok({
concurrency: obj.concurrency,
overflow: "drop" as const,
});
}
// overflow: "queue"
let maxQueue = 100; // default
if (obj.max_queue !== undefined && obj.max_queue !== null) {
if (
typeof obj.max_queue !== "number" ||
!Number.isInteger(obj.max_queue) ||
obj.max_queue < 1
) {
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
}
maxQueue = obj.max_queue;
}
return ok({
concurrency: obj.concurrency,
overflow: "queue" as const,
maxQueue,
});
}
function parseSenses(
obj: Record<string, unknown>,
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
if (!isPlainRecord(obj.senses)) {
return err(new Error("senses: required object"));
}
const sensesRaw = obj.senses;
const senses: Record<string, SenseConfig> = {};
const senseNames = new Set(Object.keys(sensesRaw));
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
const result = validateSenseConfig(name, senseRaw);
if (!result.ok) return result;
senses[name] = result.value;
}
return ok({ senses, senseNames });
}
function parseReflexes(
obj: Record<string, unknown>,
senseNames: Set<string>,
): Result<ReflexConfig[]> {
if (!Array.isArray(obj.reflexes)) {
return err(new Error("reflexes: required array"));
}
const reflexes: ReflexConfig[] = [];
for (let i = 0; i < obj.reflexes.length; i++) {
const result = validateReflexConfig(i, obj.reflexes[i], senseNames);
if (!result.ok) return result;
reflexes.push(result.value);
}
return ok(reflexes);
}
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
if (obj.workflows === undefined || obj.workflows === null) return ok({});
if (!isPlainRecord(obj.workflows)) {
return err(new Error("workflows: must be an object if provided"));
}
const workflowsRaw = obj.workflows;
const workflows: Record<string, WorkflowConfig> = {};
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
const result = validateWorkflowConfig(name, wfRaw);
if (!result.ok) return result;
workflows[name] = result.value;
}
return ok(workflows);
}
export function parseNerveConfig(raw: string): Result<NerveConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (!isPlainRecord(parsed)) {
return err(new Error("Config must be a YAML object"));
}
const obj = parsed;
const sensesResult = parseSenses(obj);
if (!sensesResult.ok) return sensesResult;
const { senses, senseNames } = sensesResult.value;
const reflexesResult = parseReflexes(obj, senseNames);
if (!reflexesResult.ok) return reflexesResult;
const workflowsResult = parseWorkflows(obj);
if (!workflowsResult.ok) return workflowsResult;
const maxRoundsResult = parseEngineMaxRounds(obj);
if (!maxRoundsResult.ok) return maxRoundsResult;
return ok({
maxRounds: maxRoundsResult.value,
senses,
reflexes: reflexesResult.value,
workflows: workflowsResult.value,
});
}
export type NerveConfig = {
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
maxRounds: number;
senses: Record<string, SenseConfig>;
reflexes: ReflexConfig[];
workflows: Record<string, WorkflowConfig>;
};
+13 -2
View File
@@ -5,7 +5,7 @@
*/
import { isPlainRecord } from "./is-plain-record.js";
import type { SenseInfo } from "./types.js";
import type { SenseInfo } from "./sense.js";
/** Client → daemon: start a workflow run. */
export type DaemonIpcTriggerWorkflowRequest = {
@@ -27,11 +27,18 @@ export type DaemonIpcListSensesRequest = {
type: "list-senses";
};
/** Client → daemon: kill a running or queued workflow thread by runId. */
export type DaemonIpcKillWorkflowRequest = {
type: "kill-workflow";
runId: string;
};
/** Union of all JSON requests the daemon IPC server accepts. */
export type DaemonIpcRequest =
| DaemonIpcTriggerWorkflowRequest
| DaemonIpcTriggerSenseRequest
| DaemonIpcListSensesRequest;
| DaemonIpcListSensesRequest
| DaemonIpcKillWorkflowRequest;
/** Successful trigger / trigger-sense reply (no body). */
export type DaemonIpcTriggerOkResponse = { ok: true };
@@ -87,6 +94,10 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
if (req.type === "list-senses") {
return { type: "list-senses" };
}
if (req.type === "kill-workflow") {
if (typeof req.runId !== "string" || req.runId.length === 0) return null;
return { type: "kill-workflow", runId: req.runId };
}
return null;
} catch {
return null;
+9 -8
View File
@@ -1,28 +1,28 @@
export type {
Signal,
SenseConfig,
SenseInfo,
SenseReflexConfig,
ReflexConfig,
DropOverflowConfig,
QueueOverflowConfig,
WorkflowConfig,
NerveConfig,
} from "./config.js";
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
export type {
WorkflowMessage,
RoleResult,
Role,
RoleMeta,
StartSignal,
RoleSignal,
StartStep,
RoleStep,
ModeratorContext,
Moderator,
WorkflowDefinition,
SenseResult,
} from "./types.js";
export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./types.js";
} from "./workflow.js";
export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
export type { Result } from "./result.js";
export { ok, err } from "./result.js";
export { parseNerveConfig } from "./config.js";
export { parseNerveConfig } from "./parse-nerve-config.js";
export { isPlainRecord } from "./is-plain-record.js";
export type {
@@ -38,6 +38,7 @@ export type {
DaemonIpcTriggerWorkflowRequest,
DaemonIpcTriggerSenseRequest,
DaemonIpcListSensesRequest,
DaemonIpcKillWorkflowRequest,
DaemonIpcRequest,
DaemonIpcTriggerOkResponse,
DaemonIpcErrorResponse,
+302
View File
@@ -0,0 +1,302 @@
import { parse } from "yaml";
import type { NerveConfig, ReflexConfig, SenseConfig, WorkflowConfig } from "./config.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
const DURATION_RE = /^(\d+)([smh])$/;
const DURATION_MULTIPLIERS: Record<string, number> = {
s: 1_000,
m: 60_000,
h: 3_600_000,
};
function parseDurationToMs(value: string): number | null {
const match = DURATION_RE.exec(value);
if (!match) return null;
return Number(match[1]) * DURATION_MULTIPLIERS[match[2]];
}
function isValidGroupName(value: string): boolean {
return /^[a-zA-Z0-9_-]+$/.test(value);
}
function parseDurationField(field: unknown, label: string): Result<number | null> {
if (field === undefined || field === null) return ok(null);
if (typeof field !== "string") {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
const ms = parseDurationToMs(field);
if (ms === null) {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
return ok(ms);
}
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`senses.${name}: must be an object`));
}
const obj = raw;
if (typeof obj.group !== "string" || obj.group.trim() === "") {
return err(new Error(`senses.${name}.group: required string`));
}
if (!isValidGroupName(obj.group)) {
return err(
new Error(
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
),
);
}
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
if (!throttleResult.ok) return throttleResult;
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
if (!timeoutResult.ok) return timeoutResult;
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
if (!graceResult.ok) return graceResult;
return ok({
group: obj.group,
throttle: throttleResult.value,
timeout: timeoutResult.value,
gracePeriod: graceResult.value,
});
}
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
if (obj.on === undefined || obj.on === null) return ok([]);
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
}
return ok(obj.on);
}
function parseSenseReflex(
index: number,
obj: Record<string, unknown>,
senseNames: Set<string>,
on: string[],
): Result<ReflexConfig> {
if (typeof obj.sense !== "string") {
return err(new Error(`reflexes[${index}].sense: must be a string`));
}
if (!senseNames.has(obj.sense)) {
return err(new Error(`reflexes[${index}].sense: "${obj.sense}" not found in senses`));
}
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
if (!intervalResult.ok) return intervalResult;
if (intervalResult.value === null && on.length === 0) {
return err(
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
);
}
return ok({
kind: "sense" as const,
sense: obj.sense,
interval: intervalResult.value,
on,
});
}
function validateReflexConfig(
index: number,
raw: unknown,
senseNames: Set<string>,
): Result<ReflexConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`reflexes[${index}]: must be an object`));
}
const obj = raw;
const hasSense = obj.sense !== undefined;
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
if (hasWorkflowKey) {
return err(
new Error(
`reflexes[${index}]: YAML "workflow" entries are not supported — start workflows from a Sense compute return value using a "workflow" string field (format: name|maxRounds|prompt)`,
),
);
}
if (!hasSense) {
return err(new Error(`reflexes[${index}]: must include "sense"`));
}
const onResult = parseOnField(index, obj);
if (!onResult.ok) return onResult;
return parseSenseReflex(index, obj, senseNames, onResult.value);
}
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
if (obj.max_rounds === undefined || obj.max_rounds === null) {
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
}
if (
typeof obj.max_rounds !== "number" ||
!Number.isInteger(obj.max_rounds) ||
obj.max_rounds < 1
) {
return err(new Error("max_rounds: must be a positive integer"));
}
return ok(obj.max_rounds);
}
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`workflows.${name}: must be an object`));
}
const obj = raw;
if (
typeof obj.concurrency !== "number" ||
!Number.isInteger(obj.concurrency) ||
obj.concurrency < 1
) {
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
}
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
}
if (obj.overflow === "drop") {
if (obj.max_queue !== undefined && obj.max_queue !== null) {
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
}
return ok({
concurrency: obj.concurrency,
overflow: "drop" as const,
});
}
// overflow: "queue"
let maxQueue = 100; // default
if (obj.max_queue !== undefined && obj.max_queue !== null) {
if (
typeof obj.max_queue !== "number" ||
!Number.isInteger(obj.max_queue) ||
obj.max_queue < 1
) {
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
}
maxQueue = obj.max_queue;
}
return ok({
concurrency: obj.concurrency,
overflow: "queue" as const,
maxQueue,
});
}
function parseSenses(
obj: Record<string, unknown>,
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
if (!isPlainRecord(obj.senses)) {
return err(new Error("senses: required object"));
}
const sensesRaw = obj.senses;
const senses: Record<string, SenseConfig> = {};
const senseNames = new Set(Object.keys(sensesRaw));
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
const result = validateSenseConfig(name, senseRaw);
if (!result.ok) return result;
senses[name] = result.value;
}
return ok({ senses, senseNames });
}
function parseReflexes(
obj: Record<string, unknown>,
senseNames: Set<string>,
): Result<ReflexConfig[]> {
if (!Array.isArray(obj.reflexes)) {
return err(new Error("reflexes: required array"));
}
const reflexes: ReflexConfig[] = [];
for (let i = 0; i < obj.reflexes.length; i++) {
const result = validateReflexConfig(i, obj.reflexes[i], senseNames);
if (!result.ok) return result;
reflexes.push(result.value);
}
return ok(reflexes);
}
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
if (obj.workflows === undefined || obj.workflows === null) return ok({});
if (!isPlainRecord(obj.workflows)) {
return err(new Error("workflows: must be an object if provided"));
}
const workflowsRaw = obj.workflows;
const workflows: Record<string, WorkflowConfig> = {};
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
const result = validateWorkflowConfig(name, wfRaw);
if (!result.ok) return result;
workflows[name] = result.value;
}
return ok(workflows);
}
export function parseNerveConfig(raw: string): Result<NerveConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (!isPlainRecord(parsed)) {
return err(new Error("Config must be a YAML object"));
}
const obj = parsed;
const sensesResult = parseSenses(obj);
if (!sensesResult.ok) return sensesResult;
const { senses, senseNames } = sensesResult.value;
const reflexesResult = parseReflexes(obj, senseNames);
if (!reflexesResult.ok) return reflexesResult;
const workflowsResult = parseWorkflows(obj);
if (!workflowsResult.ok) return workflowsResult;
const maxRoundsResult = parseEngineMaxRounds(obj);
if (!maxRoundsResult.ok) return maxRoundsResult;
return ok({
maxRounds: maxRoundsResult.value,
senses,
reflexes: reflexesResult.value,
workflows: workflowsResult.value,
});
}
+21
View File
@@ -0,0 +1,21 @@
export type Signal = {
id: number;
senseId: string;
payload: unknown;
timestamp: number;
};
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTimestamp: number | null;
};
/** The result of a Sense compute — payload plus optional workflow directive. */
export type SenseResult<T = unknown> = {
payload: T;
workflow: string | null;
};
-134
View File
@@ -1,134 +0,0 @@
export type Signal = {
id: number;
senseId: string;
payload: unknown;
timestamp: number;
};
export type SenseConfig = {
group: string;
throttle: number | null;
timeout: number | null;
gracePeriod: number | null;
};
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTimestamp: number | null;
};
export type SenseReflexConfig = {
kind: "sense";
sense: string;
interval: number | null;
on: string[];
};
/** Reflexes only schedule Senses; workflow launches come from Sense return values. */
export type ReflexConfig = SenseReflexConfig;
export type DropOverflowConfig = {
concurrency: number;
overflow: "drop";
};
export type QueueOverflowConfig = {
concurrency: number;
overflow: "queue";
maxQueue: number;
};
export type WorkflowConfig = DropOverflowConfig | QueueOverflowConfig;
export type NerveConfig = {
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
maxRounds: number;
senses: Record<string, SenseConfig>;
reflexes: ReflexConfig[];
workflows: Record<string, WorkflowConfig>;
};
// ---------------------------------------------------------------------------
// Workflow Automaton types (issue #80)
// ---------------------------------------------------------------------------
export const START = "__start__" as const;
export const END = "__end__" as const;
export type START = typeof START;
export type END = typeof END;
/** Engine-wide fallback for max moderator rounds when not specified in config. */
export const DEFAULT_ENGINE_MAX_ROUNDS = 100;
/** A single message in the workflow conversation chain (runtime, type-erased). */
export type WorkflowMessage = {
role: string;
content: string;
meta: unknown;
timestamp: number;
};
/** The typed output of a Role execution. */
export type RoleResult<Meta> = { content: string; meta: Meta };
/**
* A Role is a pure async function: receives the engine start frame plus prior
* role messages only (the start frame is not included in `messages`).
* Returns typed content + meta. Implementation can be an agent, LLM call,
* script, HTTP request, etc.
*/
export type Role<Meta> = (
start: StartSignal,
messages: WorkflowMessage[],
) => Promise<RoleResult<Meta>>;
/** Maps role names to their meta types — the single generic that drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
/** Engine start frame: prompt, max rounds cap, dry-run flag, and timestamps for the thread. */
export type StartSignal = {
role: START;
content: string;
meta: { maxRounds: number; dryRun: boolean };
timestamp: number;
};
/** A discriminated union of signals from each role, derived from the meta map. */
export type RoleSignal<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K] };
}[keyof M & string];
/**
* Moderator input: either the initial start frame or a role signal after a step.
* Lets implementations branch on `context.kind` with full typing for each arm.
*/
export type ModeratorContext<M extends RoleMeta> =
| { kind: "start"; start: StartSignal }
| { kind: "step"; signal: RoleSignal<M> };
/**
* The moderator — a pure routing function. Receives start vs step context,
* current round, and maxRounds. Returns the next role name or END.
*/
export type Moderator<M extends RoleMeta> = (
context: ModeratorContext<M>,
round: number,
maxRounds: number,
) => (keyof M & string) | END;
/** The complete definition of a workflow, as authored by users. */
export type WorkflowDefinition<M extends RoleMeta> = {
name: string;
roles: { [K in keyof M & string]: Role<M[K]> };
moderator: Moderator<M>;
};
/** The result of a Sense compute — payload plus optional workflow directive. */
export type SenseResult = {
payload: unknown;
workflow: string | null;
};
+75
View File
@@ -0,0 +1,75 @@
// ---------------------------------------------------------------------------
// Workflow Automaton types (issue #80)
// ---------------------------------------------------------------------------
export const START = "__start__" as const;
export const END = "__end__" as const;
export type START = typeof START;
export type END = typeof END;
/** Engine-wide fallback for max moderator rounds when not specified in config. */
export const DEFAULT_ENGINE_MAX_ROUNDS = 100;
/** A single message in the workflow conversation chain (runtime, type-erased). */
export type WorkflowMessage = {
role: string;
content: string;
meta: unknown;
timestamp: number;
};
/** The typed output of a Role execution. */
export type RoleResult<Meta> = { content: string; meta: Meta };
/**
* A Role is a pure async function: receives the engine start frame plus prior
* role messages only (the start frame is not included in `messages`).
* Returns typed content + meta. Implementation can be an agent, LLM call,
* script, HTTP request, etc.
*/
export type Role<Meta> = (
start: StartStep,
messages: WorkflowMessage[],
) => Promise<RoleResult<Meta>>;
/** Maps role names to their meta types — the single generic that drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
/** Engine start frame: prompt, max rounds cap, dry-run flag, and timestamps for the thread. */
export type StartStep = {
role: START;
content: string;
meta: { maxRounds: number; dryRun: boolean };
timestamp: number;
};
/** A discriminated union of role steps after each execution, aligned with `StartStep` shape. */
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
}[keyof M & string];
/**
* Moderator input: the complete workflow history.
* Contains the start frame and all role steps so far.
* On initial call, `steps` is empty — moderator can check `steps.length === 0`.
* Round count is `steps.length`; maxRounds is in `start.meta.maxRounds`.
*/
export type ModeratorContext<M extends RoleMeta> = {
start: StartStep;
steps: RoleStep<M>[];
};
/**
* The moderator — a pure routing function. Receives the full workflow context
* (start frame + all prior steps). Returns the next role name or END.
*/
export type Moderator<M extends RoleMeta> = (
context: ModeratorContext<M>,
) => (keyof M & string) | END;
/** The complete definition of a workflow, as authored by users. */
export type WorkflowDefinition<M extends RoleMeta> = {
name: string;
roles: { [K in keyof M & string]: Role<M[K]> };
moderator: Moderator<M>;
};
@@ -76,6 +76,7 @@ function makeLogStore(
timestamp: number;
}> = [],
) {
const runsWithExitCode = activeRuns.map((r) => ({ ...r, exitCode: null }));
const store = {
append: vi.fn(),
query: vi.fn(() => []),
@@ -86,9 +87,9 @@ function makeLogStore(
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn((_workflowName?: string) => {
if (_workflowName !== undefined) {
return activeRuns.filter((r) => r.workflow === _workflowName);
return runsWithExitCode.filter((r) => r.workflow === _workflowName);
}
return activeRuns;
return runsWithExitCode;
}),
getTriggerPayload: vi.fn((): unknown => ({ value: 42 })),
getThreadEvents: vi.fn(
+6
View File
@@ -67,6 +67,12 @@ export function createDaemonIpcServer(
const senses = opts.listSenses();
const resp: DaemonIpcResponse = { ok: true, senses };
socket.write(`${JSON.stringify(resp)}\n`);
} else if (req.type === "kill-workflow") {
const found = workflowManager.killThread(req.runId);
const resp: DaemonIpcResponse = found
? { ok: true }
: { ok: false, error: `Run not found or already finished: ${req.runId}` };
socket.write(`${JSON.stringify(resp)}\n`);
} else {
const _exhaustive: never = req;
void _exhaustive;
+27 -2
View File
@@ -50,13 +50,20 @@ export type ResumeThreadMessage = {
dryRun: boolean;
};
/** Parent → Workflow Worker: kill a specific running thread */
export type KillThreadMessage = {
type: "kill-thread";
runId: string;
};
/** Union of all messages the parent sends to a worker */
export type ParentToWorkerMessage =
| ComputeMessage
| ShutdownMessage
| HealthRequestMessage
| StartThreadMessage
| ResumeThreadMessage;
| ResumeThreadMessage
| KillThreadMessage;
/** Worker → Parent: compute produced a signal */
export type SignalMessage = {
@@ -89,7 +96,13 @@ export type HealthResponseMessage = {
// ---------------------------------------------------------------------------
/** Valid lifecycle event types for a workflow thread. */
export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed";
export type ThreadEventType =
| "queued"
| "started"
| "step_complete"
| "completed"
| "failed"
| "killed";
/**
* Workflow Worker → Parent: a thread lifecycle event.
@@ -106,6 +119,8 @@ export type WorkflowErrorMessage = {
type: "workflow-error";
runId: string;
error: string;
/** Exit code conveying the failure reason (1=role error, 2=maxRounds exhausted). */
exitCode: number;
};
/** Workflow Worker → Parent: a WorkflowMessage produced by a role (for crash recovery). */
@@ -132,6 +147,7 @@ const PARENT_MSG_TYPES = new Set([
"health-request",
"start-thread",
"resume-thread",
"kill-thread",
]);
function validateStartThreadMsg(obj: Record<string, unknown>): string | null {
@@ -201,6 +217,12 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
dryRun: obj.dryRun,
} as ResumeThreadMessage);
}
if (obj.type === "kill-thread") {
if (typeof obj.runId !== "string") {
return err(new Error("'kill-thread' message missing string 'runId'"));
}
return ok({ type: "kill-thread", runId: obj.runId } as KillThreadMessage);
}
return err(new Error(`Unhandled IPC message type: "${obj.type}"`));
}
@@ -254,6 +276,7 @@ function isThreadEventType(value: string): value is ThreadEventType {
case "step_complete":
case "completed":
case "failed":
case "killed":
return true;
default:
return false;
@@ -287,10 +310,12 @@ function parseWorkflowErrorMsg(obj: Record<string, unknown>): Result<WorkerToPar
if (typeof obj.error !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
}
const exitCode = typeof obj.exitCode === "number" ? obj.exitCode : 1;
return ok({
type: "workflow-error",
runId: obj.runId,
error: obj.error,
exitCode,
});
}
+52 -8
View File
@@ -16,6 +16,7 @@ import { START, isPlainRecord } from "@uncaged/nerve-core";
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
import type {
KillThreadMessage,
ResumeThreadMessage,
ShutdownMessage,
StartThreadMessage,
@@ -37,6 +38,11 @@ export type WorkflowLaunchParams = {
export type WorkflowManager = {
/** Trigger a new workflow thread (Sense-driven launch or CLI / IPC). */
startWorkflow: (workflowName: string, launch: WorkflowLaunchParams) => void;
/**
* Kill a running or queued workflow thread by runId.
* Returns true if the thread was found, false if not found.
*/
killThread: (runId: string) => boolean;
/** Number of currently active (running) threads for a workflow. */
activeCount: (workflowName: string) => number;
/** Number of pending queued threads waiting to run for a workflow. */
@@ -181,6 +187,16 @@ function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void
}
}
function sendKillThread(worker: ChildProcess, runId: string): void {
if (worker.connected === false) return;
const msg: KillThreadMessage = { type: "kill-thread", runId };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
@@ -229,15 +245,24 @@ export function createWorkflowManager(
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
killed: "killed",
};
return map[eventType] ?? null;
}
function extractExitCode(payload: unknown): number | null {
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
return payload.exitCode;
}
return null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload?: unknown,
exitCode: number | null = null,
): void {
const timestamp = Date.now();
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
@@ -252,7 +277,7 @@ export function createWorkflowManager(
payload: serialised,
timestamp,
},
{ runId, workflow: workflowName, status, timestamp },
{ runId, workflow: workflowName, status, timestamp, exitCode },
);
} else {
logStore.append({
@@ -307,13 +332,11 @@ export function createWorkflowManager(
const state = states.get(workflowName);
if (state === undefined) return;
if (msg.eventType === "completed" || msg.eventType === "failed") {
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
if (msg.eventType === "completed" || msg.eventType === "failed") {
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload);
const exitCode = extractExitCode(msg.payload);
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
}
}
@@ -399,7 +422,7 @@ export function createWorkflowManager(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
);
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "crashed");
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
}
}
@@ -460,7 +483,7 @@ export function createWorkflowManager(
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
return;
}
@@ -541,6 +564,26 @@ export function createWorkflowManager(
return entry;
}
function killThread(runId: string): boolean {
for (const [workflowName, state] of states) {
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
if (queueIdx !== -1) {
state.queue.splice(queueIdx, 1);
logWorkflowEvent(workflowName, runId, "killed", { exitCode: 137 }, 137);
return true;
}
if (state.active.has(runId)) {
const workerEntry = workers.get(workflowName);
if (workerEntry !== undefined) {
sendKillThread(workerEntry.process, runId);
}
return true;
}
}
return false;
}
function startWorkflow(workflowName: string, launch: WorkflowLaunchParams): void {
if (stopped) return;
@@ -652,6 +695,7 @@ export function createWorkflowManager(
return {
startWorkflow,
killThread,
activeCount,
queueLength,
totalActiveCount,
+76 -36
View File
@@ -12,13 +12,7 @@
import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type {
ModeratorContext,
RoleMeta,
StartSignal,
WorkflowDefinition,
WorkflowMessage,
} from "@uncaged/nerve-core";
import type { RoleMeta, StartStep, WorkflowDefinition, WorkflowMessage } from "@uncaged/nerve-core";
import { END, START, isPlainRecord } from "@uncaged/nerve-core";
import type {
@@ -47,8 +41,8 @@ function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unk
send({ type: "thread-event", runId, eventType, payload });
}
function sendWorkflowError(runId: string, error: string): void {
send({ type: "workflow-error", runId, error });
function sendWorkflowError(runId: string, error: string, exitCode = 1): void {
send({ type: "workflow-error", runId, error, exitCode });
}
function sendWorkflowMessage(runId: string, message: WorkflowMessage): void {
@@ -85,13 +79,13 @@ function validateRoleResult(
return true;
}
function isStartMeta(meta: unknown): meta is StartSignal["meta"] {
function isStartMeta(meta: unknown): meta is StartStep["meta"] {
return (
isPlainRecord(meta) && typeof meta.maxRounds === "number" && typeof meta.dryRun === "boolean"
);
}
function normalizeStartMeta(meta: unknown, maxRoundsFallback: number): StartSignal["meta"] {
function normalizeStartMeta(meta: unknown, maxRoundsFallback: number): StartStep["meta"] {
if (!isPlainRecord(meta)) {
return { maxRounds: maxRoundsFallback, dryRun: false };
}
@@ -100,10 +94,7 @@ function normalizeStartMeta(meta: unknown, maxRoundsFallback: number): StartSign
return { maxRounds, dryRun };
}
function startSignalFromWorkflowMessage(
msg: WorkflowMessage,
maxRoundsFallback: number,
): StartSignal {
function startStepFromWorkflowMessage(msg: WorkflowMessage, maxRoundsFallback: number): StartStep {
if (msg.role !== START) {
return {
role: START,
@@ -122,7 +113,7 @@ function startSignalFromWorkflowMessage(
}
type ThreadMessagesState = {
start: StartSignal;
start: StartStep;
/** Role outputs only; never includes the `__start__` frame. */
messages: WorkflowMessage[];
};
@@ -138,7 +129,7 @@ function initThreadMessages(
const [first, ...rest] = resumeMessages;
if (first.role === START) {
return {
start: startSignalFromWorkflowMessage(first, maxRounds),
start: startStepFromWorkflowMessage(first, maxRounds),
messages: [...rest],
};
}
@@ -154,7 +145,7 @@ function initThreadMessages(
};
}
const prompt = freshPrompt ?? "";
const start: StartSignal = {
const start: StartStep = {
role: START,
content: prompt,
meta: { maxRounds, dryRun },
@@ -172,7 +163,7 @@ function initThreadMessages(
async function executeRole(
def: WorkflowDefinition<RoleMeta>,
nextRole: string,
start: StartSignal,
start: StartStep,
messages: WorkflowMessage[],
runId: string,
): Promise<{ content: string; meta: Record<string, unknown> } | null> {
@@ -187,7 +178,7 @@ async function executeRole(
result = await role(start, messages);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
sendThreadEvent(runId, "failed", { error: errMsg, exitCode: 1 });
return null;
}
@@ -195,10 +186,13 @@ async function executeRole(
return result;
}
type KillFlag = { value: boolean };
async function runThread(
def: WorkflowDefinition<RoleMeta>,
runId: string,
maxRounds: number,
killFlag: KillFlag,
resumeMessages: WorkflowMessage[] = [],
freshPrompt: string | null = null,
dryRun = false,
@@ -211,16 +205,43 @@ async function runThread(
dryRun,
);
let roleRound = roleMessages.length;
let nextRole = def.moderator({ kind: "start", start }, roleRound, maxRounds);
const steps: Array<{
role: string;
meta: Record<string, unknown>;
content: string;
timestamp: number;
}> = [];
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
// Rebuild steps from any resumed messages
for (const msg of roleMessages) {
steps.push({
role: msg.role,
meta: msg.meta as Record<string, unknown>,
content: msg.content,
timestamp: msg.timestamp,
});
}
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
return;
}
while (roleRound < maxRounds) {
let nextRole = def.moderator({ start, steps });
if (nextRole === END) {
sendThreadEvent(runId, "completed", { exitCode: 0 });
return;
}
while (steps.length < maxRounds) {
const result = await executeRole(def, nextRole, start, roleMessages, runId);
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
return;
}
if (result === null) return;
const message: WorkflowMessage = {
@@ -232,21 +253,22 @@ async function runThread(
roleMessages.push(message);
sendWorkflowMessage(runId, message);
roleRound += 1;
steps.push({
role: nextRole,
meta: result.meta,
content: result.content,
timestamp: message.timestamp,
});
const stepContext: ModeratorContext<RoleMeta> = {
kind: "step",
signal: { role: nextRole, meta: result.meta },
};
nextRole = def.moderator(stepContext, roleRound, maxRounds);
nextRole = def.moderator({ start, steps });
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
sendThreadEvent(runId, "completed", { exitCode: 0 });
return;
}
}
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`);
sendWorkflowError(runId, `Thread exceeded maximum rounds (${maxRounds})`, 2);
}
// ---------------------------------------------------------------------------
@@ -301,6 +323,7 @@ function handleMessage(
raw: unknown,
def: WorkflowDefinition<RoleMeta>,
inFlight: Map<string, Promise<void>>,
killFlags: Map<string, KillFlag>,
shuttingDown: { value: boolean },
): void {
const parseResult = parseParentMessage(raw);
@@ -324,15 +347,19 @@ function handleMessage(
if (shuttingDown.value) return;
const { runId, prompt, maxRounds, dryRun } = msg;
const killFlag: KillFlag = { value: false };
killFlags.set(runId, killFlag);
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, runId, maxRounds, [], prompt, dryRun))
.then(() => runThread(def, runId, maxRounds, killFlag, [], prompt, dryRun))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
killFlags.delete(runId);
});
inFlight.set(runId, next);
@@ -343,20 +370,32 @@ function handleMessage(
if (shuttingDown.value) return;
const { runId, messages, maxRounds, dryRun } = msg;
const killFlag: KillFlag = { value: false };
killFlags.set(runId, killFlag);
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, runId, maxRounds, messages, null, dryRun))
.then(() => runThread(def, runId, maxRounds, killFlag, messages, null, dryRun))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
killFlags.delete(runId);
});
inFlight.set(runId, next);
return;
}
if (msg.type === "kill-thread") {
const flag = killFlags.get(msg.runId);
if (flag !== undefined) {
flag.value = true;
}
return;
}
}
// ---------------------------------------------------------------------------
@@ -374,12 +413,13 @@ async function bootstrap(nerveRoot: string, workflowName: string): Promise<void>
}
const inFlight = new Map<string, Promise<void>>();
const killFlags = new Map<string, KillFlag>();
const shuttingDown = { value: false };
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, def, inFlight, shuttingDown);
handleMessage(raw, def, inFlight, killFlags, shuttingDown);
});
}
@@ -41,7 +41,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: payload }),
timestamp: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-1", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
const result = store.getTriggerPayload("run-1");
@@ -57,7 +57,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: null,
timestamp: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
expect(store.getTriggerPayload("run-2")).toBeNull();
@@ -148,7 +148,7 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
payload: JSON.stringify({ triggerPayload: {} }),
timestamp: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000 },
{ runId: "run-6", workflow: "my-wf", status: "started", timestamp: 1000, exitCode: null },
);
store.append({
source: "workflow",
@@ -31,6 +31,7 @@ describe("LogStore — workflow_runs", () => {
workflow: "cleanup",
status: "started",
timestamp: 1000,
exitCode: null,
};
const entry = store.upsertWorkflowRun(
@@ -53,12 +54,18 @@ describe("LogStore — workflow_runs", () => {
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", timestamp: 1000, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, timestamp: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", timestamp: 2000 },
{
runId: "run-2",
workflow: "cleanup",
status: "completed",
timestamp: 2000,
exitCode: null,
},
);
const stored = store.getWorkflowRun("run-2");
@@ -79,7 +86,7 @@ describe("LogStore — workflow_runs", () => {
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp },
{ runId: "run-3", workflow: "cleanup", status, timestamp, exitCode: null },
);
}
@@ -98,11 +105,23 @@ describe("LogStore — workflow_runs", () => {
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, timestamp: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", timestamp: 100 },
{
runId: "run-4",
workflow: "code-review",
status: "queued",
timestamp: 100,
exitCode: null,
},
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, timestamp: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", timestamp: 200 },
{
runId: "run-4",
workflow: "code-review",
status: "started",
timestamp: 200,
exitCode: null,
},
);
const run = store.getWorkflowRun("run-4");
@@ -115,19 +134,19 @@ describe("LogStore — workflow_runs", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", timestamp: 100, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", timestamp: 200, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", timestamp: 300, exitCode: null },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", timestamp: 400, exitCode: null },
);
});
@@ -171,13 +190,13 @@ describe("LogStore — workflow_runs", () => {
});
describe("all statuses are storable", () => {
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
it.each(["queued", "started", "completed", "failed", "crashed", "dropped", "killed"] as const)(
"stores status=%s",
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1 },
{ runId, workflow: "test", status, timestamp: 1, exitCode: null },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
},
+47 -34
View File
@@ -58,7 +58,8 @@ export type WorkflowRunStatus =
| "failed"
| "crashed"
| "dropped"
| "interrupted";
| "interrupted"
| "killed";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
@@ -68,6 +69,7 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
"crashed",
"dropped",
"interrupted",
"killed",
]);
function isWorkflowRunStatus(value: string): value is WorkflowRunStatus {
@@ -87,6 +89,7 @@ export type WorkflowRun = {
workflow: string;
status: WorkflowRunStatus;
timestamp: number;
exitCode: number | null;
};
/** One role-produced workflow-message row with 1-based round index (ROW_NUMBER over role messages only). */
@@ -192,10 +195,11 @@ CREATE TABLE IF NOT EXISTS meta (
);
CREATE TABLE IF NOT EXISTS workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
timestamp INTEGER NOT NULL,
exit_code INTEGER
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
@@ -332,6 +336,13 @@ export function createLogStore(dbPath: string): LogStore {
sqlite.exec("PRAGMA journal_mode=WAL");
sqlite.exec(SCHEMA_SQL);
// Migration: add exit_code column for existing databases
try {
sqlite.exec("ALTER TABLE workflow_runs ADD COLUMN exit_code INTEGER");
} catch {
// Column already exists — safe to ignore
}
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, timestamp) VALUES (@source, @type, @refId, @payload, @timestamp)",
);
@@ -342,11 +353,11 @@ export function createLogStore(dbPath: string): LogStore {
);
const upsertWorkflowRunStmt = sqlite.prepare(
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp) VALUES (@runId, @workflow, @status, @timestamp)",
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, timestamp, exit_code) VALUES (@runId, @workflow, @status, @timestamp, @exitCode)",
);
const getWorkflowRunStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE run_id = ?",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE run_id = ?",
);
const getTriggerPayloadStmt = sqlite.prepare(
@@ -386,19 +397,19 @@ export function createLogStore(dbPath: string): LogStore {
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY timestamp ASC",
);
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY timestamp ASC",
);
const getAllWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs ORDER BY timestamp DESC",
);
const getAllWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, timestamp FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
"SELECT run_id, workflow, status, timestamp, exit_code FROM workflow_runs WHERE workflow = ? ORDER BY timestamp DESC",
);
const minLogTsStmt = sqlite.prepare("SELECT MIN(timestamp) AS m FROM logs");
@@ -423,6 +434,7 @@ export function createLogStore(dbPath: string): LogStore {
workflow: run.workflow,
status: run.status,
timestamp: run.timestamp,
exitCode: run.exitCode,
});
return { ...entry, id: Number(info.lastInsertRowid) };
});
@@ -504,31 +516,37 @@ export function createLogStore(dbPath: string): LogStore {
return upsertWorkflowRunTx(entry, run);
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as
| { run_id: string; workflow: string; status: string; timestamp: number }
| undefined;
if (row === undefined) return null;
type SqlWorkflowRunRow = {
run_id: string;
workflow: string;
status: string;
timestamp: number;
exit_code: number | null;
};
function mapWorkflowRunRow(r: SqlWorkflowRunRow): WorkflowRun {
return {
runId: row.run_id,
workflow: row.workflow,
status: validateWorkflowRunStatus(row.status),
timestamp: row.timestamp,
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
exitCode: r.exit_code ?? null,
};
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as SqlWorkflowRunRow | undefined;
if (row === undefined) return null;
return mapWorkflowRunRow(row);
}
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
const rows = (
workflowName !== undefined
? getActiveWorkflowRunsByNameStmt.all(workflowName)
: getActiveWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getAllWorkflowRuns(workflowName: string | null): WorkflowRun[] {
@@ -536,13 +554,8 @@ export function createLogStore(dbPath: string): LogStore {
workflowName !== null
? getAllWorkflowRunsByNameStmt.all(workflowName)
: getAllWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; timestamp: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
timestamp: r.timestamp,
}));
) as SqlWorkflowRunRow[];
return rows.map(mapWorkflowRunRow);
}
function getTriggerPayload(runId: string): unknown {
@@ -108,7 +108,7 @@ describe("llmExtract", () => {
expect(result.error.kind).toBe("schema_validation_failed");
});
it("dryRun skips fetch and returns an empty stub value", async () => {
it("dryRun skips fetch and returns schema-shaped stub values", async () => {
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
@@ -125,6 +125,27 @@ describe("llmExtract", () => {
if (!result.ok) {
return;
}
expect(result.value).toEqual({});
expect(result.value).toEqual({ n: 0 });
});
it("dryRun applies dryRunDefaults over schema stub values", async () => {
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
const schema = z.object({ n: z.number() });
const result = await llmExtract({
text: "ignored",
schema,
provider: { baseUrl: "https://example.com", apiKey: "k", model: "m" },
dryRun: true,
dryRunDefaults: { n: 42 },
});
expect(fetchMock).not.toHaveBeenCalled();
expect(result.ok).toBe(true);
if (!result.ok) {
return;
}
expect(result.value).toEqual({ n: 42 });
});
});
@@ -0,0 +1,59 @@
import { describe, expect, it } from "vitest";
import { z } from "zod";
import { schemaDefaults } from "../schema-defaults.js";
describe("schemaDefaults", () => {
it("fills nested objects with primitive placeholders", () => {
const schema = z.object({
meta: z.object({
id: z.string(),
count: z.number(),
flag: z.boolean(),
}),
});
expect(schemaDefaults(schema)).toEqual({
meta: { id: "", count: 0, flag: false },
});
});
it("uses empty arrays for array fields", () => {
const schema = z.object({
roles: z.array(z.object({ name: z.string(), level: z.number() })),
});
const out = schemaDefaults(schema) as { roles: { name: string; level: number }[] };
expect(out.roles).toEqual([]);
expect(out.roles.map((r) => r.name)).toEqual([]);
});
it("uses the first enum value", () => {
const schema = z.object({
status: z.enum(["pending", "done", "failed"]),
code: z.nativeEnum({ A: 1, B: 2 }),
});
expect(schemaDefaults(schema)).toEqual({
status: "pending",
code: 1,
});
});
it("sets optional fields to undefined and omits exactOptional keys", () => {
const schema = z.object({
req: z.string(),
maybe: z.string().optional(),
exact: z.string().exactOptional(),
});
expect(schemaDefaults(schema)).toEqual({
req: "",
maybe: undefined,
});
expect(Object.keys(schemaDefaults(schema) as object).includes("exact")).toBe(false);
});
it("respects .default()", () => {
const schema = z.object({
n: z.number().default(42),
});
expect(schemaDefaults(schema)).toEqual({ n: 42 });
});
});
+2 -1
View File
@@ -11,6 +11,7 @@ export {
type LlmExtractOptions,
type LlmProvider,
} from "./llm-extract.js";
export { schemaDefaults } from "./schema-defaults.js";
export {
nerveCommandEnv,
spawnSafe,
@@ -19,4 +20,4 @@ export {
type SpawnResult,
type SpawnSafeOptions,
} from "./spawn-safe.js";
export { isDryRun } from "./start-signal.js";
export { isDryRun } from "./start-step.js";
+8 -1
View File
@@ -1,6 +1,8 @@
import { type Result, err, ok } from "@uncaged/nerve-core";
import { toJSONSchema, type z } from "zod";
import { schemaDefaults } from "./schema-defaults.js";
export type LlmProvider = {
baseUrl: string;
apiKey: string;
@@ -12,6 +14,7 @@ export type LlmExtractOptions<T> = {
schema: z.ZodType<T>;
provider: LlmProvider;
dryRun: boolean;
dryRunDefaults?: Partial<T>;
};
type LlmExtractOptionsInput<T> = LlmExtractOptions<T> | Omit<LlmExtractOptions<T>, "dryRun">;
@@ -20,6 +23,10 @@ function resolveLlmExtractDryRun<T>(options: LlmExtractOptionsInput<T>): boolean
return "dryRun" in options ? options.dryRun : false;
}
function buildLlmExtractDryRunValue<T>(options: LlmExtractOptionsInput<T>): T {
return { ...schemaDefaults(options.schema), ...(options.dryRunDefaults ?? {}) } as T;
}
export type LlmError =
| { kind: "http_error"; status: number; body: string }
| { kind: "invalid_response_json"; message: string }
@@ -102,7 +109,7 @@ export async function llmExtract<T>(
): Promise<Result<T, LlmError>> {
const dryRun = resolveLlmExtractDryRun(options);
if (dryRun) {
return ok({} as T);
return ok(buildLlmExtractDryRunValue(options));
}
const rawJsonSchema = toJSONSchema(options.schema) as Record<string, unknown>;
@@ -0,0 +1,190 @@
import type { z } from "zod";
type ZodTypeAny = z.ZodType;
type Def = Record<string, unknown> & { type: string };
type TypeHandler = (schema: ZodTypeAny, def: Def) => unknown;
function isPlainObject(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function isZodExactOptional(s: ZodTypeAny): boolean {
return s.constructor.name === "ZodExactOptional";
}
function resolveDefaultValue(defaultValue: unknown | (() => unknown)): unknown {
if (typeof defaultValue === "function") {
return (defaultValue as () => unknown)();
}
return defaultValue;
}
function mergeIntersection(left: unknown, right: unknown): unknown {
if (isPlainObject(left) && isPlainObject(right)) {
return { ...left, ...right };
}
return right;
}
function defaultsForObject(_schema: ZodTypeAny, def: Def): unknown {
const shape = def.shape as Record<string, ZodTypeAny> | undefined;
if (shape === undefined) {
return {};
}
const out: Record<string, unknown> = {};
for (const key of Object.keys(shape)) {
const child = shape[key];
const cdef = child.def as { type: string };
if (cdef.type === "optional") {
if (isZodExactOptional(child)) {
continue;
}
out[key] = undefined;
} else {
out[key] = schemaDefaultsInner(child);
}
}
return out;
}
function firstUnionOption(_schema: ZodTypeAny, def: Def): unknown {
const options = def.options as readonly ZodTypeAny[] | undefined;
if (options === undefined || options.length === 0) {
return null;
}
return schemaDefaultsInner(options[0]);
}
function defaultsFromNullable(_schema: ZodTypeAny, _def: Def): unknown {
return null;
}
function defaultsFromInner(_schema: ZodTypeAny, def: Def): unknown {
const inner = def.innerType as ZodTypeAny | undefined;
if (inner === undefined) {
return null;
}
return schemaDefaultsInner(inner);
}
function defaultsForPipe(_schema: ZodTypeAny, def: Def): unknown {
const out = def.out as ZodTypeAny | undefined;
if (out === undefined) {
return null;
}
return schemaDefaultsInner(out);
}
function defaultsForIntersection(_schema: ZodTypeAny, def: Def): unknown {
const left = def.left as ZodTypeAny | undefined;
const right = def.right as ZodTypeAny | undefined;
if (left === undefined || right === undefined) {
return null;
}
return mergeIntersection(schemaDefaultsInner(left), schemaDefaultsInner(right));
}
function defaultsForTuple(_schema: ZodTypeAny, def: Def): unknown {
const items = def.items as readonly ZodTypeAny[] | undefined;
if (items === undefined) {
return [];
}
return items.map((item) => schemaDefaultsInner(item));
}
function defaultsForLazy(schema: ZodTypeAny, def: Def): unknown {
const inner =
(schema as { _zod?: { innerType?: ZodTypeAny } })._zod?.innerType ??
(def.getter as (() => ZodTypeAny) | undefined)?.();
if (inner === undefined) {
return null;
}
return schemaDefaultsInner(inner);
}
function defaultsForPromise(_schema: ZodTypeAny, def: Def): unknown {
const inner = def.innerType as ZodTypeAny | undefined;
if (inner === undefined) {
return Promise.resolve(null);
}
return Promise.resolve(schemaDefaultsInner(inner));
}
function firstEnumValue(_schema: ZodTypeAny, def: Def): unknown {
const entries = def.entries as Record<string, string | number> | undefined;
if (entries === undefined) {
return null;
}
const values = Object.values(entries);
return values[0] ?? null;
}
function firstLiteralValue(_schema: ZodTypeAny, def: Def): unknown {
const values = def.values as unknown[] | undefined;
if (values === undefined || values.length === 0) {
return null;
}
return values[0];
}
const TYPE_HANDLERS: Record<string, TypeHandler> = {
string: () => "",
number: () => 0,
boolean: () => false,
bigint: () => 0n,
date: () => new Date(0),
symbol: () => Symbol(),
undefined: () => undefined,
null: () => null,
void: () => undefined,
any: () => null,
unknown: () => null,
never: () => undefined,
nan: () => Number.NaN,
array: () => [],
object: defaultsForObject,
record: () => ({}),
map: () => new Map(),
set: () => new Set(),
enum: firstEnumValue,
literal: firstLiteralValue,
optional: () => undefined,
nullable: defaultsFromNullable,
default: (_s, def) => resolveDefaultValue(def.defaultValue as unknown | (() => unknown)),
prefault: (_s, def) => resolveDefaultValue(def.defaultValue as unknown | (() => unknown)),
nonoptional: defaultsFromInner,
catch: defaultsFromInner,
success: () => false,
readonly: defaultsFromInner,
union: firstUnionOption,
xor: firstUnionOption,
intersection: defaultsForIntersection,
pipe: defaultsForPipe,
transform: () => null,
tuple: defaultsForTuple,
lazy: defaultsForLazy,
promise: defaultsForPromise,
file: () => new File([], ""),
function: () => null,
custom: () => null,
template_literal: () => "",
};
/**
* Produces a structurally valid placeholder that mirrors primitive/array/object
* shape for a Zod schema. Used for `llmExtract` dry runs so downstream code
* (e.g. `.roles.map`) does not throw on `undefined` fields.
*/
export function schemaDefaults(schema: z.ZodType): unknown {
return schemaDefaultsInner(schema as ZodTypeAny);
}
function schemaDefaultsInner(schema: ZodTypeAny): unknown {
const def = schema.def as Def;
const run = TYPE_HANDLERS[def.type];
if (run === undefined) {
return null;
}
return run(schema, def);
}
@@ -1,6 +0,0 @@
import type { StartSignal } from "@uncaged/nerve-core";
/** Returns the thread-level dry-run flag from the workflow start frame. */
export function isDryRun(start: StartSignal): boolean {
return start.meta.dryRun;
}
@@ -0,0 +1,6 @@
import type { StartStep } from "@uncaged/nerve-core";
/** Returns the thread-level dry-run flag from the workflow start frame. */
export function isDryRun(start: StartStep): boolean {
return start.meta.dryRun;
}