feat(builtin-agent): persist ReAct loop turns as session JSONL
Each turn (assistant response / tool result) is appended to a JSONL file at ~/.uncaged/workflow/sessions/<sessionId>.jsonl during the loop. On completion, the JSONL is read back, each turn is stored as a CAS node, and the detail payload references them as a flat turns[] array in chronological order. The session file is then deleted. Benefits: - Real-time observability: tail -f the JSONL to watch loop progress - Crash recovery: partial JSONL survives process death - Zero write contention: one file per session - Detail stays a flat array for easy consumption by CLI/dashboard Changes: - New session.ts: initSessionDir, appendSessionTurn, readSessionTurns, removeSession - loop.ts: append JSONL each turn instead of accumulating in-memory - detail.ts: reads session JSONL → persists turns to CAS → stores detail - agent.ts: passes storageRoot/sessionId to loop, cleans up session on completion - types.ts: remove index from TurnPayload (order is implicit in JSONL/array) - schemas.ts: sync with type changes Ref: #433
This commit is contained in:
@@ -7,17 +7,26 @@ import {
|
||||
resolveModel,
|
||||
resolveStorageRoot,
|
||||
} from "@uncaged/workflow-agent-kit";
|
||||
import { generateUlid } from "@uncaged/workflow-util";
|
||||
import { createLogger, generateUlid } from "@uncaged/workflow-util";
|
||||
|
||||
import { storeBuiltinDetail } from "./detail.js";
|
||||
import type { ChatMessage } from "./llm/index.js";
|
||||
import { BUILTIN_CONTINUE_MAX_TURNS, BUILTIN_MAX_TURNS, runBuiltinLoop } from "./loop.js";
|
||||
import { buildBuiltinMessages } from "./prompt.js";
|
||||
import type { BuiltinSessionState } from "./types.js";
|
||||
import { initSessionDir, removeSession } from "./session.js";
|
||||
|
||||
const sessions = new Map<string, BuiltinSessionState>();
|
||||
const log = createLogger({ sink: { kind: "stderr" } });
|
||||
|
||||
function getSession(sessionId: string): BuiltinSessionState {
|
||||
type SessionRecord = {
|
||||
sessionId: string;
|
||||
model: string;
|
||||
startedAtMs: number;
|
||||
messages: ChatMessage[];
|
||||
};
|
||||
|
||||
const sessions = new Map<string, SessionRecord>();
|
||||
|
||||
function getSession(sessionId: string): SessionRecord {
|
||||
const session = sessions.get(sessionId);
|
||||
if (session === undefined) {
|
||||
throw new Error(`builtin session not found: ${sessionId}`);
|
||||
@@ -36,7 +45,7 @@ async function runBuiltinWithMessages(
|
||||
storageRoot: string,
|
||||
provider: ReturnType<typeof resolveModel>,
|
||||
messages: ChatMessage[],
|
||||
session: BuiltinSessionState,
|
||||
session: SessionRecord,
|
||||
store: Store,
|
||||
maxTurns: number,
|
||||
): Promise<AgentRunResult> {
|
||||
@@ -45,22 +54,31 @@ async function runBuiltinWithMessages(
|
||||
messages,
|
||||
toolCtx: buildToolContext(storageRoot),
|
||||
maxTurns,
|
||||
existingTurns: session.turns,
|
||||
storageRoot,
|
||||
sessionId: session.sessionId,
|
||||
});
|
||||
|
||||
session.messages = loopResult.messages;
|
||||
session.turns = loopResult.turns;
|
||||
|
||||
const { detailHash, output } = await storeBuiltinDetail(
|
||||
if (loopResult.turnCount === 0) {
|
||||
log("5RWTK9NB", "no turns produced, returning empty output");
|
||||
await removeSession(storageRoot, session.sessionId);
|
||||
return { output: "", detailHash: "", sessionId: session.sessionId };
|
||||
}
|
||||
|
||||
// Read jsonl → persist turns to CAS → store detail
|
||||
const { detailHash } = await storeBuiltinDetail(
|
||||
store,
|
||||
storageRoot,
|
||||
session.sessionId,
|
||||
session.model,
|
||||
session.startedAtMs,
|
||||
session.turns,
|
||||
);
|
||||
|
||||
const finalOutput = output !== "" ? output : loopResult.finalText;
|
||||
return { output: finalOutput, detailHash, sessionId: session.sessionId };
|
||||
// Clean up session jsonl
|
||||
await removeSession(storageRoot, session.sessionId);
|
||||
|
||||
return { output: loopResult.finalText, detailHash, sessionId: session.sessionId };
|
||||
}
|
||||
|
||||
async function runBuiltin(ctx: AgentContext): Promise<AgentRunResult> {
|
||||
@@ -69,14 +87,14 @@ async function runBuiltin(ctx: AgentContext): Promise<AgentRunResult> {
|
||||
const provider = resolveModel(config, config.defaultModel);
|
||||
|
||||
const sessionId = generateUlid(Date.now());
|
||||
await initSessionDir(storageRoot);
|
||||
const messages = buildBuiltinMessages(ctx);
|
||||
|
||||
const session: BuiltinSessionState = {
|
||||
const session: SessionRecord = {
|
||||
sessionId,
|
||||
model: provider.model,
|
||||
startedAtMs: Date.now(),
|
||||
messages,
|
||||
turns: [],
|
||||
};
|
||||
sessions.set(sessionId, session);
|
||||
|
||||
|
||||
@@ -1,72 +1,15 @@
|
||||
import { bootstrap, putSchema, type Store } from "@uncaged/json-cas";
|
||||
|
||||
import { BUILTIN_DETAIL_SCHEMA, BUILTIN_TURN_SCHEMA } from "./schemas.js";
|
||||
import type {
|
||||
BuiltinDetailPayload,
|
||||
BuiltinLoopTurn,
|
||||
BuiltinToolCall,
|
||||
BuiltinTurnPayload,
|
||||
BuiltinTurnRole,
|
||||
} from "./types.js";
|
||||
|
||||
function mapToolCalls(calls: NonNullable<BuiltinLoopTurn["toolCalls"]>): BuiltinToolCall[] {
|
||||
return calls.map((call) => ({
|
||||
name: call.name,
|
||||
args: call.args,
|
||||
}));
|
||||
}
|
||||
|
||||
function loopTurnToAssistantPayload(turn: BuiltinLoopTurn, index: number): BuiltinTurnPayload {
|
||||
return {
|
||||
index,
|
||||
role: "assistant",
|
||||
content: turn.assistantContent ?? "",
|
||||
toolCalls:
|
||||
turn.toolCalls !== null && turn.toolCalls.length > 0 ? mapToolCalls(turn.toolCalls) : null,
|
||||
reasoning: null,
|
||||
};
|
||||
}
|
||||
|
||||
function loopTurnToToolPayloads(turn: BuiltinLoopTurn, startIndex: number): BuiltinTurnPayload[] {
|
||||
if (turn.toolResults === null || turn.toolResults.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const payloads: BuiltinTurnPayload[] = [];
|
||||
let index = startIndex;
|
||||
for (const result of turn.toolResults) {
|
||||
payloads.push({
|
||||
index,
|
||||
role: "tool" as BuiltinTurnRole,
|
||||
content: result.content,
|
||||
toolCalls: null,
|
||||
reasoning: null,
|
||||
});
|
||||
index += 1;
|
||||
}
|
||||
return payloads;
|
||||
}
|
||||
|
||||
/** Last assistant message with non-empty text. */
|
||||
export function extractFinalAssistantText(turns: BuiltinLoopTurn[]): string {
|
||||
for (let i = turns.length - 1; i >= 0; i--) {
|
||||
const turn = turns[i];
|
||||
if (turn === undefined) {
|
||||
continue;
|
||||
}
|
||||
const text = turn.assistantContent;
|
||||
if (text !== null && text.trim() !== "") {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
import { readSessionTurns } from "./session.js";
|
||||
import type { BuiltinDetailPayload } from "./types.js";
|
||||
|
||||
type BuiltinSchemaHashes = {
|
||||
turn: string;
|
||||
detail: string;
|
||||
};
|
||||
|
||||
async function registerBuiltinSchemas(store: Store): Promise<BuiltinSchemaHashes> {
|
||||
export async function registerBuiltinSchemas(store: Store): Promise<BuiltinSchemaHashes> {
|
||||
await bootstrap(store);
|
||||
const [turn, detail] = await Promise.all([
|
||||
putSchema(store, BUILTIN_TURN_SCHEMA),
|
||||
@@ -75,30 +18,22 @@ async function registerBuiltinSchemas(store: Store): Promise<BuiltinSchemaHashes
|
||||
return { turn, detail };
|
||||
}
|
||||
|
||||
/** Read session jsonl, persist each turn to CAS, return detail hash. */
|
||||
export async function storeBuiltinDetail(
|
||||
store: Store,
|
||||
storageRoot: string,
|
||||
sessionId: string,
|
||||
model: string,
|
||||
startedAtMs: number,
|
||||
turns: BuiltinLoopTurn[],
|
||||
nowMs: number = Date.now(),
|
||||
): Promise<{ detailHash: string; output: string }> {
|
||||
): Promise<{ detailHash: string; turnCount: number }> {
|
||||
const schemas = await registerBuiltinSchemas(store);
|
||||
const turns = await readSessionTurns(storageRoot, sessionId);
|
||||
|
||||
const turnHashes: string[] = [];
|
||||
let turnIndex = 0;
|
||||
|
||||
for (const loopTurn of turns) {
|
||||
const assistant = loopTurnToAssistantPayload(loopTurn, turnIndex);
|
||||
const assistantHash = await store.put(schemas.turn, assistant);
|
||||
turnHashes.push(assistantHash);
|
||||
turnIndex += 1;
|
||||
|
||||
const toolPayloads = loopTurnToToolPayloads(loopTurn, turnIndex);
|
||||
for (const toolPayload of toolPayloads) {
|
||||
const toolHash = await store.put(schemas.turn, toolPayload);
|
||||
turnHashes.push(toolHash);
|
||||
turnIndex += 1;
|
||||
}
|
||||
for (const turn of turns) {
|
||||
const hash = await store.put(schemas.turn, turn);
|
||||
turnHashes.push(hash);
|
||||
}
|
||||
|
||||
const duration = Math.max(0, nowMs - startedAtMs);
|
||||
@@ -110,6 +45,5 @@ export async function storeBuiltinDetail(
|
||||
turns: turnHashes,
|
||||
};
|
||||
const detailHash = await store.put(schemas.detail, detail);
|
||||
const output = extractFinalAssistantText(turns);
|
||||
return { detailHash, output };
|
||||
return { detailHash, turnCount: turnHashes.length };
|
||||
}
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
export { createBuiltinAgent } from "./agent.js";
|
||||
export { extractFinalAssistantText, storeBuiltinDetail } from "./detail.js";
|
||||
export { registerBuiltinSchemas, storeBuiltinDetail } from "./detail.js";
|
||||
export type { ChatMessage, LlmAssistantResponse, LlmToolCall } from "./llm/index.js";
|
||||
export { chatCompletionWithTools } from "./llm/index.js";
|
||||
export { BUILTIN_CONTINUE_MAX_TURNS, BUILTIN_MAX_TURNS, runBuiltinLoop } from "./loop.js";
|
||||
export { buildBuiltinMessages } from "./prompt.js";
|
||||
export { appendSessionTurn, initSessionDir, readSessionTurns, removeSession } from "./session.js";
|
||||
export type { BuiltinTool, ToolContext } from "./tools/index.js";
|
||||
export { executeBuiltinTool, getBuiltinTools } from "./tools/index.js";
|
||||
export type {
|
||||
BuiltinDetailPayload,
|
||||
BuiltinLoopTurn,
|
||||
BuiltinSessionState,
|
||||
BuiltinToolCallRecord,
|
||||
BuiltinToolResultRecord,
|
||||
BuiltinTurnPayload,
|
||||
} from "./types.js";
|
||||
|
||||
@@ -2,13 +2,14 @@ import type { ResolvedLlmProvider } from "@uncaged/workflow-agent-kit";
|
||||
import { createLogger } from "@uncaged/workflow-util";
|
||||
|
||||
import { type ChatMessage, chatCompletionWithTools, type LlmToolCall } from "./llm/index.js";
|
||||
import { appendSessionTurn } from "./session.js";
|
||||
import {
|
||||
builtinToolsToOpenAi,
|
||||
executeBuiltinTool,
|
||||
getBuiltinTools,
|
||||
type ToolContext,
|
||||
} from "./tools/index.js";
|
||||
import type { BuiltinLoopTurn, BuiltinToolCallRecord, BuiltinToolResultRecord } from "./types.js";
|
||||
import type { BuiltinToolCall, BuiltinTurnPayload } from "./types.js";
|
||||
|
||||
const log = createLogger({ sink: { kind: "stderr" } });
|
||||
|
||||
@@ -20,31 +21,61 @@ export type RunBuiltinLoopOptions = {
|
||||
messages: ChatMessage[];
|
||||
toolCtx: ToolContext;
|
||||
maxTurns: number;
|
||||
existingTurns: BuiltinLoopTurn[];
|
||||
storageRoot: string;
|
||||
sessionId: string;
|
||||
};
|
||||
|
||||
export type RunBuiltinLoopResult = {
|
||||
finalText: string;
|
||||
messages: ChatMessage[];
|
||||
turns: BuiltinLoopTurn[];
|
||||
turnCount: number;
|
||||
};
|
||||
|
||||
function mapToolCalls(calls: LlmToolCall[]): BuiltinToolCallRecord[] {
|
||||
function mapToolCallsForPayload(calls: LlmToolCall[]): BuiltinToolCall[] {
|
||||
return calls.map((call) => ({
|
||||
id: call.id,
|
||||
name: call.name,
|
||||
args: call.arguments,
|
||||
}));
|
||||
}
|
||||
|
||||
async function appendTurn(
|
||||
storageRoot: string,
|
||||
sessionId: string,
|
||||
payload: BuiltinTurnPayload,
|
||||
): Promise<void> {
|
||||
await appendSessionTurn(storageRoot, sessionId, payload);
|
||||
}
|
||||
|
||||
async function executeTurnTools(
|
||||
calls: Array<{ id: string; name: string; arguments: string }>,
|
||||
toolCtx: ToolContext,
|
||||
messages: ChatMessage[],
|
||||
storageRoot: string,
|
||||
sessionId: string,
|
||||
): Promise<number> {
|
||||
let turnCount = 0;
|
||||
for (const call of calls) {
|
||||
const result = await executeBuiltinTool(call.name, call.arguments, toolCtx);
|
||||
messages.push({ role: "tool", tool_call_id: call.id, content: result });
|
||||
await appendTurn(storageRoot, sessionId, {
|
||||
role: "tool",
|
||||
content: result,
|
||||
toolCalls: null,
|
||||
reasoning: null,
|
||||
});
|
||||
turnCount += 1;
|
||||
}
|
||||
return turnCount;
|
||||
}
|
||||
|
||||
/** Agent run loop: LLM ↔ tools until no tool_calls or maxTurns. */
|
||||
export async function runBuiltinLoop(
|
||||
options: RunBuiltinLoopOptions,
|
||||
): Promise<RunBuiltinLoopResult> {
|
||||
const messages = [...options.messages];
|
||||
const turns = [...options.existingTurns];
|
||||
const openAiTools = builtinToolsToOpenAi(getBuiltinTools());
|
||||
let finalText = "";
|
||||
let turnCount = 0;
|
||||
|
||||
for (let turn = 0; turn < options.maxTurns; turn++) {
|
||||
log("8K2M4N7P", `builtin loop turn ${turn + 1}/${options.maxTurns}`);
|
||||
@@ -59,36 +90,33 @@ export async function runBuiltinLoop(
|
||||
|
||||
if (response.toolCalls === null || response.toolCalls.length === 0) {
|
||||
finalText = response.content ?? "";
|
||||
turns.push({
|
||||
assistantContent: response.content,
|
||||
await appendTurn(options.storageRoot, options.sessionId, {
|
||||
role: "assistant",
|
||||
content: response.content ?? "",
|
||||
toolCalls: null,
|
||||
toolResults: null,
|
||||
reasoning: null,
|
||||
});
|
||||
turnCount += 1;
|
||||
break;
|
||||
}
|
||||
|
||||
const toolCallRecords = mapToolCalls(response.toolCalls);
|
||||
const toolResults: BuiltinToolResultRecord[] = [];
|
||||
|
||||
for (const call of response.toolCalls) {
|
||||
const result = await executeBuiltinTool(call.name, call.arguments, options.toolCtx);
|
||||
toolResults.push({
|
||||
toolCallId: call.id,
|
||||
name: call.name,
|
||||
content: result,
|
||||
});
|
||||
messages.push({
|
||||
role: "tool",
|
||||
tool_call_id: call.id,
|
||||
content: result,
|
||||
});
|
||||
}
|
||||
|
||||
turns.push({
|
||||
assistantContent: response.content,
|
||||
toolCalls: toolCallRecords,
|
||||
toolResults,
|
||||
// Assistant turn with tool calls
|
||||
await appendTurn(options.storageRoot, options.sessionId, {
|
||||
role: "assistant",
|
||||
content: response.content ?? "",
|
||||
toolCalls: mapToolCallsForPayload(response.toolCalls),
|
||||
reasoning: null,
|
||||
});
|
||||
turnCount += 1;
|
||||
|
||||
// Execute tools
|
||||
turnCount += await executeTurnTools(
|
||||
response.toolCalls,
|
||||
options.toolCtx,
|
||||
messages,
|
||||
options.storageRoot,
|
||||
options.sessionId,
|
||||
);
|
||||
}
|
||||
|
||||
if (finalText === "" && messages.length > 0) {
|
||||
@@ -106,5 +134,5 @@ export async function runBuiltinLoop(
|
||||
}
|
||||
}
|
||||
|
||||
return { finalText, messages, turns };
|
||||
return { finalText, messages, turnCount };
|
||||
}
|
||||
|
||||
@@ -13,9 +13,8 @@ const BUILTIN_TOOL_CALL_SCHEMA: JSONSchema = {
|
||||
export const BUILTIN_TURN_SCHEMA: JSONSchema = {
|
||||
title: "builtin-turn",
|
||||
type: "object",
|
||||
required: ["index", "role", "content"],
|
||||
required: ["role", "content"],
|
||||
properties: {
|
||||
index: { type: "integer" },
|
||||
role: { type: "string", enum: ["assistant", "tool"] },
|
||||
content: { type: "string" },
|
||||
toolCalls: {
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
import { appendFile, mkdir, readFile, rm } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { createLogger } from "@uncaged/workflow-util";
|
||||
|
||||
import type { BuiltinTurnPayload } from "./types.js";
|
||||
|
||||
const log = createLogger({ sink: { kind: "stderr" } });
|
||||
|
||||
function sessionsDir(storageRoot: string): string {
|
||||
return join(storageRoot, "sessions");
|
||||
}
|
||||
|
||||
function sessionFile(storageRoot: string, sessionId: string): string {
|
||||
return join(sessionsDir(storageRoot), `${sessionId}.jsonl`);
|
||||
}
|
||||
|
||||
/** Ensure sessions directory exists. */
|
||||
export async function initSessionDir(storageRoot: string): Promise<void> {
|
||||
await mkdir(sessionsDir(storageRoot), { recursive: true });
|
||||
}
|
||||
|
||||
/** Append a turn to the session jsonl file. */
|
||||
export async function appendSessionTurn(
|
||||
storageRoot: string,
|
||||
sessionId: string,
|
||||
turn: BuiltinTurnPayload,
|
||||
): Promise<void> {
|
||||
const line = `${JSON.stringify(turn)}\n`;
|
||||
await appendFile(sessionFile(storageRoot, sessionId), line, "utf-8");
|
||||
log("3XQVN8KR", `session ${sessionId} appended ${turn.role} turn`);
|
||||
}
|
||||
|
||||
/** Read all turns from session jsonl. Returns empty array if file does not exist. */
|
||||
export async function readSessionTurns(
|
||||
storageRoot: string,
|
||||
sessionId: string,
|
||||
): Promise<BuiltinTurnPayload[]> {
|
||||
try {
|
||||
const content = await readFile(sessionFile(storageRoot, sessionId), "utf-8");
|
||||
const lines = content
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l.length > 0);
|
||||
return lines.map((l) => JSON.parse(l) as BuiltinTurnPayload);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove session jsonl file (called after detail is persisted to step CAS). */
|
||||
export async function removeSession(storageRoot: string, sessionId: string): Promise<void> {
|
||||
try {
|
||||
await rm(sessionFile(storageRoot, sessionId));
|
||||
log("7FWDP2MJ", `session ${sessionId} removed`);
|
||||
} catch {
|
||||
// already gone — fine
|
||||
}
|
||||
}
|
||||
@@ -34,7 +34,6 @@ export type BuiltinToolCall = {
|
||||
};
|
||||
|
||||
export type BuiltinTurnPayload = {
|
||||
index: number;
|
||||
role: BuiltinTurnRole;
|
||||
content: string;
|
||||
toolCalls: BuiltinToolCall[] | null;
|
||||
|
||||
Reference in New Issue
Block a user