Re-runs the head step's agent with a supplementary prompt and replaces the head step (rewires new step's prev to old head's prev) instead of appending. Skips moderator re-route — the role of the head step is reused. Fixes #144
This commit is contained in:
@@ -199,6 +199,7 @@ const PL_THREAD_ARCHIVED = "F4D8Q2K5";
|
||||
const PL_STEP_ERROR = "B8T5N1V6";
|
||||
const PL_BACKGROUND_START = "X7Q4W9M2";
|
||||
const PL_THREAD_RESUME = "K2R7M4N8";
|
||||
const PL_THREAD_POKE = "P4Q9R3X7";
|
||||
|
||||
type ResumeStepConfig = {
|
||||
role: string;
|
||||
@@ -1135,6 +1136,147 @@ export async function cmdThreadResume(
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that a thread can be poked. Returns the existing entry and the head StepNode payload.
|
||||
* Fails (process exit) when the thread is missing, running, completed, cancelled, or has no
|
||||
* StepNode at its head.
|
||||
*/
|
||||
async function validatePokePreconditions(
|
||||
storageRoot: string,
|
||||
uwf: UwfStore,
|
||||
threadId: ThreadId,
|
||||
): Promise<{ entry: ThreadIndexEntry; oldHead: CasRef; oldHeadPayload: StepNodePayload }> {
|
||||
const runningMarker = await isThreadRunning(storageRoot, threadId);
|
||||
if (runningMarker !== null) {
|
||||
fail(`thread already executing in background (PID: ${runningMarker.pid})`);
|
||||
}
|
||||
|
||||
const entry = getThread(uwf.varStore, threadId);
|
||||
if (entry === null) {
|
||||
fail(`thread not active: ${threadId}`);
|
||||
}
|
||||
|
||||
if (entry.status === "completed" || entry.status === "cancelled") {
|
||||
fail(`thread cannot be poked: ${threadId} (status: ${entry.status})`);
|
||||
}
|
||||
|
||||
const oldHead = entry.head;
|
||||
const oldHeadNode = uwf.store.cas.get(oldHead);
|
||||
if (oldHeadNode === null) {
|
||||
fail(`CAS node not found: ${oldHead}`);
|
||||
}
|
||||
if (oldHeadNode.type !== uwf.schemas.stepNode) {
|
||||
fail("thread cannot be poked: no step to replace (head is StartNode)");
|
||||
}
|
||||
|
||||
return { entry, oldHead, oldHeadPayload: oldHeadNode.payload as StepNodePayload };
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the next role from the post-poke chain state, used for the StepOutput.currentRole field.
|
||||
* Returns null when the next role is $END, evaluation fails, or the result is a suspend.
|
||||
*/
|
||||
function resolveCurrentRoleFromChain(
|
||||
uwfAfter: UwfStore,
|
||||
workflow: WorkflowPayload,
|
||||
replacedHash: CasRef,
|
||||
): string | null {
|
||||
const chainAfter = walkChain(uwfAfter, replacedHash);
|
||||
const { lastRole, lastOutput } = resolveEvaluateArgs(uwfAfter, chainAfter);
|
||||
const afterResult = evaluate(workflow.graph, lastRole, lastOutput);
|
||||
if (!afterResult.ok || isSuspendResult(afterResult.value)) {
|
||||
return null;
|
||||
}
|
||||
if (afterResult.value.role === END_ROLE) {
|
||||
return null;
|
||||
}
|
||||
return afterResult.value.role;
|
||||
}
|
||||
|
||||
/**
|
||||
* Poke a thread: re-run the agent on the head step with a supplementary prompt,
|
||||
* replacing the head step's output. The new step's `prev` points to the OLD head's
|
||||
* `prev` — semantically replacing (not appending to) the head. The moderator is NOT
|
||||
* re-evaluated for routing; the role of the head step is re-used.
|
||||
*/
|
||||
export async function cmdThreadPoke(
|
||||
storageRoot: string,
|
||||
threadId: ThreadId,
|
||||
prompt: string,
|
||||
agentOverride: string | null,
|
||||
): Promise<StepOutput> {
|
||||
const uwf = await createUwfStore(storageRoot);
|
||||
const { entry, oldHeadPayload } = await validatePokePreconditions(storageRoot, uwf, threadId);
|
||||
|
||||
const chain = walkChain(uwf, entry.head);
|
||||
const workflowHash = chain.start.workflow;
|
||||
const threadCwd = chain.start.cwd;
|
||||
|
||||
const plog = createProcessLogger({
|
||||
storageRoot,
|
||||
context: { thread: threadId, workflow: workflowHash },
|
||||
});
|
||||
|
||||
// Resolve the agent: --agent override wins; otherwise read from old head step's `agent` field.
|
||||
const config = await loadWorkflowConfig(storageRoot);
|
||||
const workflow = loadWorkflowPayload(uwf, workflowHash);
|
||||
const role = oldHeadPayload.role;
|
||||
const agent =
|
||||
agentOverride !== null
|
||||
? resolveAgentConfig(config, workflow, role, agentOverride)
|
||||
: parseAgentOverride(oldHeadPayload.agent);
|
||||
|
||||
const effectiveCwd = oldHeadPayload.cwd !== "" ? oldHeadPayload.cwd : threadCwd;
|
||||
|
||||
plog.log(PL_THREAD_POKE, `poke role=${role} agent=${agent.command}`, null);
|
||||
plog.log(PL_AGENT_SPAWN, `spawning agent command=${agent.command}`, {
|
||||
args: [...agent.args, threadId, role].join(" "),
|
||||
});
|
||||
|
||||
loadDotenv({ path: getEnvPath(storageRoot) });
|
||||
|
||||
// Spawn the agent. The agent will create a new StepNode with prev=oldHead (it reads
|
||||
// the active thread head). After the agent returns, we rewrite that node's prev so
|
||||
// that the new head replaces the old head instead of appending after it.
|
||||
const agentResult = spawnAgent(plog, agent, threadId, role, prompt, effectiveCwd);
|
||||
const agentStepHash = agentResult.stepHash as CasRef;
|
||||
|
||||
plog.log(PL_AGENT_DONE, `agent returned head=${agentStepHash}`, null);
|
||||
|
||||
const uwfAfter = await createUwfStore(storageRoot);
|
||||
const agentNode = uwfAfter.store.cas.get(agentStepHash);
|
||||
if (agentNode === null || agentNode.type !== uwfAfter.schemas.stepNode) {
|
||||
failStep(plog, `agent returned hash that is not a StepNode: ${agentStepHash}`);
|
||||
}
|
||||
const agentPayload = agentNode.payload as StepNodePayload;
|
||||
|
||||
// Rewrite the new step so that its `prev` points to the OLD head's prev (replace semantics).
|
||||
const replacedPayload: StepNodePayload = {
|
||||
...agentPayload,
|
||||
prev: oldHeadPayload.prev,
|
||||
};
|
||||
const replacedHash = await uwfAfter.store.cas.put(uwfAfter.schemas.stepNode, replacedPayload);
|
||||
const replacedNode = uwfAfter.store.cas.get(replacedHash);
|
||||
if (replacedNode === null || !validate(uwfAfter.store, replacedNode)) {
|
||||
failStep(plog, "rewritten StepNode failed schema validation");
|
||||
}
|
||||
|
||||
// Update thread head to the replaced step. Status becomes idle (no moderator re-route).
|
||||
setThread(uwfAfter.varStore, threadId, updateThreadHead(entry, replacedHash));
|
||||
|
||||
return {
|
||||
workflow: workflowHash,
|
||||
thread: threadId,
|
||||
head: replacedHash,
|
||||
status: "idle",
|
||||
currentRole: resolveCurrentRoleFromChain(uwfAfter, workflow, replacedHash),
|
||||
suspendedRole: null,
|
||||
suspendMessage: null,
|
||||
done: false,
|
||||
background: null,
|
||||
};
|
||||
}
|
||||
|
||||
export function validateCount(count: number): void {
|
||||
if (count < 1 || !Number.isInteger(count)) {
|
||||
throw new Error(`--count must be a positive integer, got: ${count}`);
|
||||
|
||||
Reference in New Issue
Block a user