From e9e6df2f5a8def8d23a622bdff807ab0860fc95b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 24 Apr 2026 23:14:45 +0000 Subject: [PATCH] refactor(core,daemon): extract StartSignal as independent Role parameter - Role now takes (start: StartSignal, messages: WorkflowMessage[]) - messages no longer contains the __start__ frame - Add ModeratorContext discriminated union (kind: start | step) - Moderator receives typed context instead of raw StartSignal | RoleSignal union - workflow-worker separates start from role messages throughout Refs #100 --- packages/core/src/index.ts | 1 + packages/core/src/types.ts | 24 ++++-- packages/daemon/src/workflow-worker.ts | 109 +++++++++++++++++-------- 3 files changed, 94 insertions(+), 40 deletions(-) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 0c80841..f29ab91 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,6 +14,7 @@ export type { RoleMeta, StartSignal, RoleSignal, + ModeratorContext, Moderator, WorkflowDefinition, SenseResult, diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index d3768ec..d565548 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -76,16 +76,20 @@ export type WorkflowMessage = { export type RoleResult = { content: string; meta: Meta }; /** - * A Role is a pure async function: receives the full message chain, - * returns typed content + meta. Implementation can be an agent, LLM call, + * A Role is a pure async function: receives the engine start frame plus prior + * role messages only (the start frame is not included in `messages`). + * Returns typed content + meta. Implementation can be an agent, LLM call, * script, HTTP request, etc. */ -export type Role = (messages: WorkflowMessage[]) => Promise>; +export type Role = ( + start: StartSignal, + messages: WorkflowMessage[], +) => Promise>; /** Maps role names to their meta types — the single generic that drives all inference. */ export type RoleMeta = Record>; -/** First message in the thread chain (`messages[0]`) — passed to the moderator on start. */ +/** Engine start frame: prompt, max rounds cap, and timestamps for the thread. */ export type StartSignal = { role: START; content: string; @@ -99,11 +103,19 @@ export type RoleSignal = { }[keyof M & string]; /** - * The moderator — a pure routing function. Receives the last signal, + * Moderator input: either the initial start frame or a role signal after a step. + * Lets implementations branch on `context.kind` with full typing for each arm. + */ +export type ModeratorContext = + | { kind: "start"; start: StartSignal } + | { kind: "step"; signal: RoleSignal }; + +/** + * The moderator — a pure routing function. Receives start vs step context, * current round, and maxRounds. Returns the next role name or END. */ export type Moderator = ( - signal: StartSignal | RoleSignal, + context: ModeratorContext, round: number, maxRounds: number, ) => (keyof M & string) | END; diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts index 3264c02..b8dbcb6 100644 --- a/packages/daemon/src/workflow-worker.ts +++ b/packages/daemon/src/workflow-worker.ts @@ -13,7 +13,7 @@ import { existsSync } from "node:fs"; import { join, resolve } from "node:path"; import type { - Moderator, + ModeratorContext, RoleMeta, StartSignal, WorkflowDefinition, @@ -29,8 +29,6 @@ import type { import { parseParentMessage } from "./ipc.js"; import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js"; -type ModeratorInput = Parameters>[0]; - // --------------------------------------------------------------------------- // IPC helpers // --------------------------------------------------------------------------- @@ -87,42 +85,83 @@ function validateRoleResult( return true; } -function buildInitialLastSignal(lastMsg: WorkflowMessage): ModeratorInput { - if (lastMsg.role === START) { - return { - role: START, - content: lastMsg.content, - meta: lastMsg.meta as StartSignal["meta"], - timestamp: lastMsg.timestamp, - }; - } - return { role: lastMsg.role, meta: lastMsg.meta as Record }; +function isStartMeta(meta: unknown): meta is StartSignal["meta"] { + return isPlainRecord(meta) && typeof meta.maxRounds === "number"; } -function initChain( +function startSignalFromWorkflowMessage( + msg: WorkflowMessage, + maxRoundsFallback: number, +): StartSignal { + if (msg.role !== START) { + return { + role: START, + content: "", + meta: { maxRounds: maxRoundsFallback }, + timestamp: Date.now(), + }; + } + const meta = isStartMeta(msg.meta) ? msg.meta : { maxRounds: maxRoundsFallback }; + return { + role: START, + content: msg.content, + meta, + timestamp: msg.timestamp, + }; +} + +type ThreadMessagesState = { + start: StartSignal; + /** Role outputs only; never includes the `__start__` frame. */ + messages: WorkflowMessage[]; +}; + +function initThreadMessages( runId: string, resumeMessages: WorkflowMessage[], freshPrompt: string | null, maxRounds: number, -): WorkflowMessage[] { +): ThreadMessagesState { if (resumeMessages.length > 0) { - return [...resumeMessages]; + const [first, ...rest] = resumeMessages; + if (first.role === START) { + return { + start: startSignalFromWorkflowMessage(first, maxRounds), + messages: [...rest], + }; + } + const prompt = freshPrompt ?? ""; + return { + start: { + role: START, + content: prompt, + meta: { maxRounds }, + timestamp: Date.now(), + }, + messages: [...resumeMessages], + }; } const prompt = freshPrompt ?? ""; - const startMsg: WorkflowMessage = { + const start: StartSignal = { role: START, content: prompt, meta: { maxRounds }, timestamp: Date.now(), }; - sendWorkflowMessage(runId, startMsg); - return [startMsg]; + sendWorkflowMessage(runId, { + role: start.role, + content: start.content, + meta: start.meta, + timestamp: start.timestamp, + }); + return { start, messages: [] }; } async function executeRole( def: WorkflowDefinition, nextRole: string, - chain: WorkflowMessage[], + start: StartSignal, + messages: WorkflowMessage[], runId: string, ): Promise<{ content: string; meta: Record } | null> { const role = def.roles[nextRole]; @@ -133,7 +172,7 @@ async function executeRole( let result: { content: string; meta: Record }; try { - result = await role(chain); + result = await role(start, messages); } catch (e: unknown) { const errMsg = e instanceof Error ? e.message : String(e); sendThreadEvent(runId, "failed", { error: errMsg }); @@ -151,16 +190,15 @@ async function runThread( resumeMessages: WorkflowMessage[] = [], freshPrompt: string | null = null, ): Promise { - const chain = initChain(runId, resumeMessages, freshPrompt, maxRounds); + const { start, messages: roleMessages } = initThreadMessages( + runId, + resumeMessages, + freshPrompt, + maxRounds, + ); - let roleRound = chain.filter((m) => m.role !== START).length; - const lastMsg = chain[chain.length - 1]; - if (lastMsg === undefined) { - sendWorkflowError(runId, "empty workflow message chain"); - return; - } - - let nextRole = def.moderator(buildInitialLastSignal(lastMsg), roleRound, maxRounds); + let roleRound = roleMessages.length; + let nextRole = def.moderator({ kind: "start", start }, roleRound, maxRounds); if (nextRole === END) { sendThreadEvent(runId, "completed", null); @@ -168,7 +206,7 @@ async function runThread( } while (roleRound < maxRounds) { - const result = await executeRole(def, nextRole, chain, runId); + const result = await executeRole(def, nextRole, start, roleMessages, runId); if (result === null) return; const message: WorkflowMessage = { @@ -177,13 +215,16 @@ async function runThread( meta: result.meta, timestamp: Date.now(), }; - chain.push(message); + roleMessages.push(message); sendWorkflowMessage(runId, message); roleRound += 1; - const signal: ModeratorInput = { role: nextRole, meta: result.meta }; - nextRole = def.moderator(signal, roleRound, maxRounds); + const stepContext: ModeratorContext = { + kind: "step", + signal: { role: nextRole, meta: result.meta }, + }; + nextRole = def.moderator(stepContext, roleRound, maxRounds); if (nextRole === END) { sendThreadEvent(runId, "completed", null); -- 2.43.0