refactor(core,daemon): extract StartSignal as independent Role parameter

- Role<Meta> now takes (start: StartSignal, messages: WorkflowMessage[])
- messages no longer contains the __start__ frame
- Add ModeratorContext<M> discriminated union (kind: start | step)
- Moderator receives typed context instead of raw StartSignal | RoleSignal union
- workflow-worker separates start from role messages throughout

Refs #100
This commit is contained in:
2026-04-24 23:14:45 +00:00
parent b3b0dad2bb
commit e9e6df2f5a
3 changed files with 94 additions and 40 deletions
+1
View File
@@ -14,6 +14,7 @@ export type {
RoleMeta,
StartSignal,
RoleSignal,
ModeratorContext,
Moderator,
WorkflowDefinition,
SenseResult,
+18 -6
View File
@@ -76,16 +76,20 @@ export type WorkflowMessage = {
export type RoleResult<Meta> = { content: string; meta: Meta };
/**
* A Role is a pure async function: receives the full message chain,
* returns typed content + meta. Implementation can be an agent, LLM call,
* A Role is a pure async function: receives the engine start frame plus prior
* role messages only (the start frame is not included in `messages`).
* Returns typed content + meta. Implementation can be an agent, LLM call,
* script, HTTP request, etc.
*/
export type Role<Meta> = (messages: WorkflowMessage[]) => Promise<RoleResult<Meta>>;
export type Role<Meta> = (
start: StartSignal,
messages: WorkflowMessage[],
) => Promise<RoleResult<Meta>>;
/** Maps role names to their meta types — the single generic that drives all inference. */
export type RoleMeta = Record<string, Record<string, unknown>>;
/** First message in the thread chain (`messages[0]`) — passed to the moderator on start. */
/** Engine start frame: prompt, max rounds cap, and timestamps for the thread. */
export type StartSignal = {
role: START;
content: string;
@@ -99,11 +103,19 @@ export type RoleSignal<M extends RoleMeta> = {
}[keyof M & string];
/**
* The moderator — a pure routing function. Receives the last signal,
* Moderator input: either the initial start frame or a role signal after a step.
* Lets implementations branch on `context.kind` with full typing for each arm.
*/
export type ModeratorContext<M extends RoleMeta> =
| { kind: "start"; start: StartSignal }
| { kind: "step"; signal: RoleSignal<M> };
/**
* The moderator — a pure routing function. Receives start vs step context,
* current round, and maxRounds. Returns the next role name or END.
*/
export type Moderator<M extends RoleMeta> = (
signal: StartSignal | RoleSignal<M>,
context: ModeratorContext<M>,
round: number,
maxRounds: number,
) => (keyof M & string) | END;
+75 -34
View File
@@ -13,7 +13,7 @@ import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type {
Moderator,
ModeratorContext,
RoleMeta,
StartSignal,
WorkflowDefinition,
@@ -29,8 +29,6 @@ import type {
import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
type ModeratorInput = Parameters<Moderator<RoleMeta>>[0];
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
@@ -87,42 +85,83 @@ function validateRoleResult(
return true;
}
function buildInitialLastSignal(lastMsg: WorkflowMessage): ModeratorInput {
if (lastMsg.role === START) {
return {
role: START,
content: lastMsg.content,
meta: lastMsg.meta as StartSignal["meta"],
timestamp: lastMsg.timestamp,
};
}
return { role: lastMsg.role, meta: lastMsg.meta as Record<string, unknown> };
function isStartMeta(meta: unknown): meta is StartSignal["meta"] {
return isPlainRecord(meta) && typeof meta.maxRounds === "number";
}
function initChain(
function startSignalFromWorkflowMessage(
msg: WorkflowMessage,
maxRoundsFallback: number,
): StartSignal {
if (msg.role !== START) {
return {
role: START,
content: "",
meta: { maxRounds: maxRoundsFallback },
timestamp: Date.now(),
};
}
const meta = isStartMeta(msg.meta) ? msg.meta : { maxRounds: maxRoundsFallback };
return {
role: START,
content: msg.content,
meta,
timestamp: msg.timestamp,
};
}
type ThreadMessagesState = {
start: StartSignal;
/** Role outputs only; never includes the `__start__` frame. */
messages: WorkflowMessage[];
};
function initThreadMessages(
runId: string,
resumeMessages: WorkflowMessage[],
freshPrompt: string | null,
maxRounds: number,
): WorkflowMessage[] {
): ThreadMessagesState {
if (resumeMessages.length > 0) {
return [...resumeMessages];
const [first, ...rest] = resumeMessages;
if (first.role === START) {
return {
start: startSignalFromWorkflowMessage(first, maxRounds),
messages: [...rest],
};
}
const prompt = freshPrompt ?? "";
return {
start: {
role: START,
content: prompt,
meta: { maxRounds },
timestamp: Date.now(),
},
messages: [...resumeMessages],
};
}
const prompt = freshPrompt ?? "";
const startMsg: WorkflowMessage = {
const start: StartSignal = {
role: START,
content: prompt,
meta: { maxRounds },
timestamp: Date.now(),
};
sendWorkflowMessage(runId, startMsg);
return [startMsg];
sendWorkflowMessage(runId, {
role: start.role,
content: start.content,
meta: start.meta,
timestamp: start.timestamp,
});
return { start, messages: [] };
}
async function executeRole(
def: WorkflowDefinition<RoleMeta>,
nextRole: string,
chain: WorkflowMessage[],
start: StartSignal,
messages: WorkflowMessage[],
runId: string,
): Promise<{ content: string; meta: Record<string, unknown> } | null> {
const role = def.roles[nextRole];
@@ -133,7 +172,7 @@ async function executeRole(
let result: { content: string; meta: Record<string, unknown> };
try {
result = await role(chain);
result = await role(start, messages);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
@@ -151,16 +190,15 @@ async function runThread(
resumeMessages: WorkflowMessage[] = [],
freshPrompt: string | null = null,
): Promise<void> {
const chain = initChain(runId, resumeMessages, freshPrompt, maxRounds);
const { start, messages: roleMessages } = initThreadMessages(
runId,
resumeMessages,
freshPrompt,
maxRounds,
);
let roleRound = chain.filter((m) => m.role !== START).length;
const lastMsg = chain[chain.length - 1];
if (lastMsg === undefined) {
sendWorkflowError(runId, "empty workflow message chain");
return;
}
let nextRole = def.moderator(buildInitialLastSignal(lastMsg), roleRound, maxRounds);
let roleRound = roleMessages.length;
let nextRole = def.moderator({ kind: "start", start }, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
@@ -168,7 +206,7 @@ async function runThread(
}
while (roleRound < maxRounds) {
const result = await executeRole(def, nextRole, chain, runId);
const result = await executeRole(def, nextRole, start, roleMessages, runId);
if (result === null) return;
const message: WorkflowMessage = {
@@ -177,13 +215,16 @@ async function runThread(
meta: result.meta,
timestamp: Date.now(),
};
chain.push(message);
roleMessages.push(message);
sendWorkflowMessage(runId, message);
roleRound += 1;
const signal: ModeratorInput = { role: nextRole, meta: result.meta };
nextRole = def.moderator(signal, roleRound, maxRounds);
const stepContext: ModeratorContext<RoleMeta> = {
kind: "step",
signal: { role: nextRole, meta: result.meta },
};
nextRole = def.moderator(stepContext, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);