refactor(daemon): RFC-005 Phase 3 — workflow-worker uses ThreadContext (closes #270)

This commit is contained in:
2026-04-30 07:10:58 +00:00
parent 975f15c66d
commit d13b59e787
2 changed files with 45 additions and 42 deletions
+1 -3
View File
@@ -127,7 +127,6 @@ function ensureThreadMessagesWithStart(
threadId: string,
fallbackPrompt: string,
fallbackMaxRounds: number,
fallbackDryRun: boolean,
): WorkflowMessage[] {
const mapped: WorkflowMessage[] = messages.map((m) => ({
role: m.role,
@@ -141,7 +140,7 @@ function ensureThreadMessagesWithStart(
const start: WorkflowMessage = {
role: START,
content: fallbackPrompt,
meta: { maxRounds: fallbackMaxRounds, dryRun: fallbackDryRun, threadId },
meta: { maxRounds: fallbackMaxRounds, threadId },
timestamp: Date.now(),
};
return [start, ...mapped];
@@ -408,7 +407,6 @@ export function createWorkflowManager(
runId,
launch.prompt,
launch.maxRounds,
launch.dryRun,
);
state.active.add(runId);
const msg: ResumeThreadMessage = {
+44 -39
View File
@@ -14,7 +14,14 @@ import "./experimental-warning-suppression.js";
import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type { RoleMeta, StartStep, WorkflowDefinition, WorkflowMessage } from "@uncaged/nerve-core";
import type {
RoleMeta,
RoleStep,
StartStep,
ThreadContext,
WorkflowDefinition,
WorkflowMessage,
} from "@uncaged/nerve-core";
import { END, START, isPlainRecord } from "@uncaged/nerve-core";
import type {
@@ -87,15 +94,14 @@ function normalizeStartMeta(
threadIdFallback: string,
): StartStep["meta"] {
if (!isPlainRecord(meta)) {
return { maxRounds: maxRoundsFallback, dryRun: false, threadId: threadIdFallback };
return { maxRounds: maxRoundsFallback, threadId: threadIdFallback };
}
const maxRounds = typeof meta.maxRounds === "number" ? meta.maxRounds : maxRoundsFallback;
const dryRun = typeof meta.dryRun === "boolean" ? meta.dryRun : false;
const threadId =
typeof meta.threadId === "string" && meta.threadId.length > 0
? meta.threadId
: threadIdFallback;
return { maxRounds, dryRun, threadId };
return { maxRounds, threadId };
}
function startStepFromWorkflowMessage(
@@ -107,7 +113,7 @@ function startStepFromWorkflowMessage(
return {
role: START,
content: "",
meta: { maxRounds: maxRoundsFallback, dryRun: false, threadId: threadIdFallback },
meta: { maxRounds: maxRoundsFallback, threadId: threadIdFallback },
timestamp: Date.now(),
};
}
@@ -124,8 +130,30 @@ type ThreadMessagesState = {
start: StartStep;
/** Role outputs only; never includes the `__start__` frame. */
messages: WorkflowMessage[];
/** From IPC (`start-thread` / `resume-thread`); not part of `StartStep.meta`. */
dryRun: boolean;
};
function workflowMessagesToRoleSteps(messages: WorkflowMessage[]): RoleStep<RoleMeta>[] {
return messages.map((m) => ({
role: m.role,
meta: m.meta as Record<string, unknown>,
content: m.content,
timestamp: m.timestamp,
})) as RoleStep<RoleMeta>[];
}
function buildThreadContext(
start: StartStep,
roleMessages: WorkflowMessage[],
): ThreadContext<RoleMeta> {
return {
threadId: start.meta.threadId,
start,
steps: workflowMessagesToRoleSteps(roleMessages),
};
}
function initThreadMessages(
runId: string,
resumeMessages: WorkflowMessage[],
@@ -139,6 +167,7 @@ function initThreadMessages(
return {
start: startStepFromWorkflowMessage(first, maxRounds, runId),
messages: [...rest],
dryRun,
};
}
const prompt = freshPrompt ?? "";
@@ -146,17 +175,18 @@ function initThreadMessages(
start: {
role: START,
content: prompt,
meta: { maxRounds, dryRun, threadId: runId },
meta: { maxRounds, threadId: runId },
timestamp: Date.now(),
},
messages: [...resumeMessages],
dryRun,
};
}
const prompt = freshPrompt ?? "";
const start: StartStep = {
role: START,
content: prompt,
meta: { maxRounds, dryRun, threadId: runId },
meta: { maxRounds, threadId: runId },
timestamp: Date.now(),
};
sendWorkflowMessage(runId, {
@@ -165,14 +195,13 @@ function initThreadMessages(
meta: start.meta,
timestamp: start.timestamp,
});
return { start, messages: [] };
return { start, messages: [], dryRun };
}
async function executeRole(
def: WorkflowDefinition<RoleMeta>,
nextRole: string,
start: StartStep,
messages: WorkflowMessage[],
ctx: ThreadContext<RoleMeta>,
runId: string,
): Promise<{ content: string; meta: Record<string, unknown> } | null> {
const role = def.roles[nextRole];
@@ -183,7 +212,7 @@ async function executeRole(
let result: { content: string; meta: Record<string, unknown> };
try {
result = await role(start, messages);
result = await role(ctx);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg, exitCode: 1 });
@@ -213,37 +242,20 @@ async function runThread(
dryRun,
);
const steps: Array<{
role: string;
meta: Record<string, unknown>;
content: string;
timestamp: number;
}> = [];
// Rebuild steps from any resumed messages
for (const msg of roleMessages) {
steps.push({
role: msg.role,
meta: msg.meta as Record<string, unknown>,
content: msg.content,
timestamp: msg.timestamp,
});
}
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
return;
}
let nextRole = def.moderator({ start, steps });
let nextRole = def.moderator(buildThreadContext(start, roleMessages));
if (nextRole === END) {
sendThreadEvent(runId, "completed", { exitCode: 0 });
return;
}
while (steps.length < maxRounds) {
const result = await executeRole(def, nextRole, start, roleMessages, runId);
while (roleMessages.length < maxRounds) {
const result = await executeRole(def, nextRole, buildThreadContext(start, roleMessages), runId);
if (killFlag.value) {
sendThreadEvent(runId, "killed", { exitCode: 137 });
@@ -261,14 +273,7 @@ async function runThread(
roleMessages.push(message);
sendWorkflowMessage(runId, message);
steps.push({
role: nextRole,
meta: result.meta,
content: result.content,
timestamp: message.timestamp,
});
nextRole = def.moderator({ start, steps });
nextRole = def.moderator(buildThreadContext(start, roleMessages));
if (nextRole === END) {
sendThreadEvent(runId, "completed", { exitCode: 0 });