diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index 0c80841..f29ab91 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -14,6 +14,7 @@ export type {
RoleMeta,
StartSignal,
RoleSignal,
+ ModeratorContext,
Moderator,
WorkflowDefinition,
SenseResult,
diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts
index d3768ec..d565548 100644
--- a/packages/core/src/types.ts
+++ b/packages/core/src/types.ts
@@ -76,16 +76,20 @@ export type WorkflowMessage = {
export type RoleResult = { content: string; meta: Meta };
/**
- * A Role is a pure async function: receives the full message chain,
- * returns typed content + meta. Implementation can be an agent, LLM call,
+ * A Role is a pure async function: receives the engine start frame plus prior
+ * role messages only (the start frame is not included in `messages`).
+ * Returns typed content + meta. Implementation can be an agent, LLM call,
* script, HTTP request, etc.
*/
-export type Role = (messages: WorkflowMessage[]) => Promise>;
+export type Role = (
+ start: StartSignal,
+ messages: WorkflowMessage[],
+) => Promise>;
/** Maps role names to their meta types — the single generic that drives all inference. */
export type RoleMeta = Record>;
-/** First message in the thread chain (`messages[0]`) — passed to the moderator on start. */
+/** Engine start frame: prompt, max rounds cap, and timestamps for the thread. */
export type StartSignal = {
role: START;
content: string;
@@ -99,11 +103,19 @@ export type RoleSignal = {
}[keyof M & string];
/**
- * The moderator — a pure routing function. Receives the last signal,
+ * Moderator input: either the initial start frame or a role signal after a step.
+ * Lets implementations branch on `context.kind` with full typing for each arm.
+ */
+export type ModeratorContext =
+ | { kind: "start"; start: StartSignal }
+ | { kind: "step"; signal: RoleSignal };
+
+/**
+ * The moderator — a pure routing function. Receives start vs step context,
* current round, and maxRounds. Returns the next role name or END.
*/
export type Moderator = (
- signal: StartSignal | RoleSignal,
+ context: ModeratorContext,
round: number,
maxRounds: number,
) => (keyof M & string) | END;
diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts
index 3264c02..b8dbcb6 100644
--- a/packages/daemon/src/workflow-worker.ts
+++ b/packages/daemon/src/workflow-worker.ts
@@ -13,7 +13,7 @@ import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type {
- Moderator,
+ ModeratorContext,
RoleMeta,
StartSignal,
WorkflowDefinition,
@@ -29,8 +29,6 @@ import type {
import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
-type ModeratorInput = Parameters>[0];
-
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
@@ -87,42 +85,83 @@ function validateRoleResult(
return true;
}
-function buildInitialLastSignal(lastMsg: WorkflowMessage): ModeratorInput {
- if (lastMsg.role === START) {
- return {
- role: START,
- content: lastMsg.content,
- meta: lastMsg.meta as StartSignal["meta"],
- timestamp: lastMsg.timestamp,
- };
- }
- return { role: lastMsg.role, meta: lastMsg.meta as Record };
+function isStartMeta(meta: unknown): meta is StartSignal["meta"] {
+ return isPlainRecord(meta) && typeof meta.maxRounds === "number";
}
-function initChain(
+function startSignalFromWorkflowMessage(
+ msg: WorkflowMessage,
+ maxRoundsFallback: number,
+): StartSignal {
+ if (msg.role !== START) {
+ return {
+ role: START,
+ content: "",
+ meta: { maxRounds: maxRoundsFallback },
+ timestamp: Date.now(),
+ };
+ }
+ const meta = isStartMeta(msg.meta) ? msg.meta : { maxRounds: maxRoundsFallback };
+ return {
+ role: START,
+ content: msg.content,
+ meta,
+ timestamp: msg.timestamp,
+ };
+}
+
+type ThreadMessagesState = {
+ start: StartSignal;
+ /** Role outputs only; never includes the `__start__` frame. */
+ messages: WorkflowMessage[];
+};
+
+function initThreadMessages(
runId: string,
resumeMessages: WorkflowMessage[],
freshPrompt: string | null,
maxRounds: number,
-): WorkflowMessage[] {
+): ThreadMessagesState {
if (resumeMessages.length > 0) {
- return [...resumeMessages];
+ const [first, ...rest] = resumeMessages;
+ if (first.role === START) {
+ return {
+ start: startSignalFromWorkflowMessage(first, maxRounds),
+ messages: [...rest],
+ };
+ }
+ const prompt = freshPrompt ?? "";
+ return {
+ start: {
+ role: START,
+ content: prompt,
+ meta: { maxRounds },
+ timestamp: Date.now(),
+ },
+ messages: [...resumeMessages],
+ };
}
const prompt = freshPrompt ?? "";
- const startMsg: WorkflowMessage = {
+ const start: StartSignal = {
role: START,
content: prompt,
meta: { maxRounds },
timestamp: Date.now(),
};
- sendWorkflowMessage(runId, startMsg);
- return [startMsg];
+ sendWorkflowMessage(runId, {
+ role: start.role,
+ content: start.content,
+ meta: start.meta,
+ timestamp: start.timestamp,
+ });
+ return { start, messages: [] };
}
async function executeRole(
def: WorkflowDefinition,
nextRole: string,
- chain: WorkflowMessage[],
+ start: StartSignal,
+ messages: WorkflowMessage[],
runId: string,
): Promise<{ content: string; meta: Record } | null> {
const role = def.roles[nextRole];
@@ -133,7 +172,7 @@ async function executeRole(
let result: { content: string; meta: Record };
try {
- result = await role(chain);
+ result = await role(start, messages);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
@@ -151,16 +190,15 @@ async function runThread(
resumeMessages: WorkflowMessage[] = [],
freshPrompt: string | null = null,
): Promise {
- const chain = initChain(runId, resumeMessages, freshPrompt, maxRounds);
+ const { start, messages: roleMessages } = initThreadMessages(
+ runId,
+ resumeMessages,
+ freshPrompt,
+ maxRounds,
+ );
- let roleRound = chain.filter((m) => m.role !== START).length;
- const lastMsg = chain[chain.length - 1];
- if (lastMsg === undefined) {
- sendWorkflowError(runId, "empty workflow message chain");
- return;
- }
-
- let nextRole = def.moderator(buildInitialLastSignal(lastMsg), roleRound, maxRounds);
+ let roleRound = roleMessages.length;
+ let nextRole = def.moderator({ kind: "start", start }, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);
@@ -168,7 +206,7 @@ async function runThread(
}
while (roleRound < maxRounds) {
- const result = await executeRole(def, nextRole, chain, runId);
+ const result = await executeRole(def, nextRole, start, roleMessages, runId);
if (result === null) return;
const message: WorkflowMessage = {
@@ -177,13 +215,16 @@ async function runThread(
meta: result.meta,
timestamp: Date.now(),
};
- chain.push(message);
+ roleMessages.push(message);
sendWorkflowMessage(runId, message);
roleRound += 1;
- const signal: ModeratorInput = { role: nextRole, meta: result.meta };
- nextRole = def.moderator(signal, roleRound, maxRounds);
+ const stepContext: ModeratorContext = {
+ kind: "step",
+ signal: { role: nextRole, meta: result.meta },
+ };
+ nextRole = def.moderator(stepContext, roleRound, maxRounds);
if (nextRole === END) {
sendThreadEvent(runId, "completed", null);