diff --git a/examples/hello-world.ts b/examples/hello-world.ts index 25559bf..590b963 100644 --- a/examples/hello-world.ts +++ b/examples/hello-world.ts @@ -28,6 +28,7 @@ const greeter: RoleDefinition = { systemPrompt: "You greet the user briefly.", extractPrompt: "Extract the greeting string produced for the user.", schema: greeterMetaSchema, + extractRefs: null, }; const extract = createExtract({ diff --git a/packages/workflow-role-coder/src/coder.ts b/packages/workflow-role-coder/src/coder.ts index 2bd33d1..b5f6bac 100644 --- a/packages/workflow-role-coder/src/coder.ts +++ b/packages/workflow-role-coder/src/coder.ts @@ -38,4 +38,5 @@ export const coderRole: RoleDefinition = { extractPrompt: "Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.", schema: coderMetaSchema, + extractRefs: (meta) => [meta.completedPhase], }; diff --git a/packages/workflow-role-committer/src/committer.ts b/packages/workflow-role-committer/src/committer.ts index b61c8dd..c1d5562 100644 --- a/packages/workflow-role-committer/src/committer.ts +++ b/packages/workflow-role-committer/src/committer.ts @@ -31,4 +31,5 @@ export const committerRole: RoleDefinition = { extractPrompt: "Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.", schema: committerMetaSchema, + extractRefs: null, }; diff --git a/packages/workflow-role-planner/src/planner.ts b/packages/workflow-role-planner/src/planner.ts index 49242d0..eb5101b 100644 --- a/packages/workflow-role-planner/src/planner.ts +++ b/packages/workflow-role-planner/src/planner.ts @@ -49,4 +49,5 @@ export const plannerRole: RoleDefinition = { extractPrompt: "Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).", schema: plannerMetaSchema, + extractRefs: (meta) => meta.phases.map((p) => p.hash), }; diff --git a/packages/workflow-role-preparer/src/preparer.ts b/packages/workflow-role-preparer/src/preparer.ts index 029160c..d79f45e 100644 --- a/packages/workflow-role-preparer/src/preparer.ts +++ b/packages/workflow-role-preparer/src/preparer.ts @@ -47,4 +47,5 @@ export const preparerRole: RoleDefinition = { extractPrompt: "Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).", schema: preparerMetaSchema, + extractRefs: null, }; diff --git a/packages/workflow-role-reviewer/src/reviewer.ts b/packages/workflow-role-reviewer/src/reviewer.ts index 00d8c2d..695d771 100644 --- a/packages/workflow-role-reviewer/src/reviewer.ts +++ b/packages/workflow-role-reviewer/src/reviewer.ts @@ -21,4 +21,5 @@ export const reviewerRole: RoleDefinition = { extractPrompt: "Extract the review verdict: approved or rejected. If rejected, list the blocking issues.", schema: reviewerMetaSchema, + extractRefs: null, }; diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index 27ddbfc..143942f 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -124,6 +124,7 @@ function preparerStep(): RoleStep { buildCommand: "bun run build", }, }, + refs: [], timestamp: 0, }; } @@ -133,6 +134,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep p.hash), timestamp: 1, }; } @@ -142,6 +144,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep { role: "coder", content: "code", meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" }, + refs: [completedPhase], timestamp: 2, }; } @@ -153,6 +156,7 @@ function reviewerStep(approved: boolean): RoleStep { meta: approved ? { status: "approved" as const } : { status: "rejected" as const, issues: ["needs fix"] }, + refs: [], timestamp: 3, }; } @@ -162,6 +166,7 @@ function committerStep(): RoleStep { role: "committer", content: "commit", meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" }, + refs: [], timestamp: 4, }; } diff --git a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts index 94c087d..af469fd 100644 --- a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts +++ b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts @@ -37,6 +37,7 @@ describe("buildAgentPrompt", () => { role: "coder", content: "only step full body", meta: { files: ["a.ts"] }, + refs: [], timestamp: 2, }, ], @@ -61,12 +62,14 @@ describe("buildAgentPrompt", () => { role: "planner", content: "PLANNER_SECRET_FULL_TEXT", meta: { plan: "short" }, + refs: [], timestamp: 2, }, { role: "coder", content: "last step full content", meta: { done: true }, + refs: [], timestamp: 3, }, ], @@ -94,18 +97,21 @@ describe("buildAgentPrompt", () => { role: "a", content: "HIDDEN_A", meta: { n: 1 }, + refs: [], timestamp: 2, }, { role: "b", content: "HIDDEN_B_MIDDLE", meta: { n: 2 }, + refs: [], timestamp: 3, }, { role: "c", content: "VISIBLE_LAST", meta: { n: 3 }, + refs: [], timestamp: 4, }, ], diff --git a/packages/workflow/__tests__/build-descriptor.test.ts b/packages/workflow/__tests__/build-descriptor.test.ts index 5523aa2..40b6244 100644 --- a/packages/workflow/__tests__/build-descriptor.test.ts +++ b/packages/workflow/__tests__/build-descriptor.test.ts @@ -22,6 +22,7 @@ describe("buildDescriptor", () => { systemPrompt: "You are an analyst.", extractPrompt: "Extract title and count from the analysis.", schema, + extractRefs: null, }, }, moderator: () => END, diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index cf718da..0072696 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -89,12 +89,14 @@ const demoWorkflow = createWorkflow( systemPrompt: "You are a planner.", extractPrompt: "Extract plan text and affected files list.", schema: plannerMetaSchema, + extractRefs: null, }, coder: { description: "Demo coder", systemPrompt: "You are a coder.", extractPrompt: "Extract the code diff summary.", schema: coderMetaSchema, + extractRefs: null, }, }, moderator: (ctx) => { @@ -182,10 +184,12 @@ describe("executeThread", () => { expect(role1.role).toBe("planner"); expect(role1.content).toBe("plan-body"); expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] }); + expect(role1.refs).toEqual([]); expect(typeof role1.timestamp).toBe("number"); const role2 = JSON.parse(lines[2] ?? "{}") as Record; expect(role2.role).toBe("coder"); + expect(role2.refs).toEqual([]); const infoText = await readFile(infoPath, "utf8"); const infoLines = infoText @@ -228,6 +232,7 @@ describe("executeThread", () => { role: "planner", content: "plan-body", meta: { plan: "do-it", files: ["a.ts"] }, + refs: ["CAS111AAAAAAA"], }, ], }, @@ -241,6 +246,7 @@ describe("executeThread", () => { role: "planner", content: "plan-body", meta: { plan: "do-it", files: ["a.ts"] }, + refs: ["CAS111AAAAAAA"], timestamp: histTs, }, ], @@ -264,6 +270,7 @@ describe("executeThread", () => { const role0 = JSON.parse(lines[1] ?? "{}") as Record; expect(role0.role).toBe("planner"); expect(role0.timestamp).toBe(histTs); + expect(role0.refs).toEqual(["CAS111AAAAAAA"]); const role1 = JSON.parse(lines[2] ?? "{}") as Record; expect(role1.role).toBe("coder"); diff --git a/packages/workflow/__tests__/refs-tracking.test.ts b/packages/workflow/__tests__/refs-tracking.test.ts new file mode 100644 index 0000000..585ae9b --- /dev/null +++ b/packages/workflow/__tests__/refs-tracking.test.ts @@ -0,0 +1,191 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import * as z from "zod/v4"; + +import { createWorkflow } from "../src/create-workflow.js"; +import { executeThread } from "../src/engine.js"; +import { createExtract } from "../src/extract-fn.js"; +import { buildForkPlan, parseThreadDataJsonl } from "../src/fork-thread.js"; +import { createLogger } from "../src/logger.js"; +import { END } from "../src/types.js"; + +const phaseSchema = z.object({ + hash: z.string(), + title: z.string(), +}); + +const plannerMetaSchema = z.object({ + phases: z.array(phaseSchema), +}); + +type RefsDemoMeta = { + planner: z.infer; +}; + +function installMockChatCompletions(sequence: ReadonlyArray>): () => void { + const origFetch = globalThis.fetch; + let i = 0; + const mockFetch = async ( + input: Parameters[0], + init?: RequestInit, + ): Promise => { + const args = sequence[i] ?? sequence[sequence.length - 1]; + if (args === undefined) { + throw new Error("installMockChatCompletions: empty sequence"); + } + i += 1; + void input; + const body = init?.body ? (JSON.parse(String(init.body)) as Record) : {}; + const tools = body.tools; + const firstTool = + Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object" + ? (tools[0] as Record) + : null; + const fn = + firstTool !== null ? (firstTool.function as Record | undefined) : undefined; + const toolName = typeof fn?.name === "string" ? fn.name : "extract"; + return new Response( + JSON.stringify({ + choices: [ + { + message: { + tool_calls: [ + { + type: "function", + function: { + name: toolName, + arguments: JSON.stringify(args), + }, + }, + ], + }, + }, + ], + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ); + }; + globalThis.fetch = Object.assign(mockFetch, { + preconnect: origFetch.preconnect.bind(origFetch), + }) as typeof fetch; + return () => { + globalThis.fetch = origFetch; + }; +} + +const refsDemoExtract = createExtract({ + baseUrl: "http://127.0.0.1:9", + apiKey: "test", + model: "test", +}); + +const refsDemoWorkflow = createWorkflow( + { + roles: { + planner: { + description: "Planner with phase hashes", + systemPrompt: "Plan.", + extractPrompt: "Extract phases with CAS hashes.", + schema: plannerMetaSchema, + extractRefs: (meta) => meta.phases.map((p) => p.hash), + }, + }, + moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END), + }, + { + agent: async () => "plan-output", + }, + refsDemoExtract, +); + +describe("RoleStep refs tracking", () => { + let restoreFetch: (() => void) | null = null; + + afterEach(() => { + restoreFetch?.(); + restoreFetch = null; + }); + + test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => { + const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100} +{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101} +{"role":"coder","content":"c","meta":{},"timestamp":102} +`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.roleSteps[0]?.refs).toEqual(["H111AAAAAAAAA", "H222AAAAAAAAA"]); + expect(r.value.roleSteps[1]?.refs).toEqual([]); + }); + + test("executeThread persists refs from extractRefs on role yields", async () => { + restoreFetch = installMockChatCompletions([ + { + phases: [ + { hash: "C9NMV6V2TQT81", title: "phase-a" }, + { hash: "C9NMV6V2TQT82", title: "phase-b" }, + ], + }, + ]); + + const root = await mkdtemp(join(tmpdir(), "wf-refs-")); + try { + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const hash = "C9NMV6V2TQT81"; + const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); + const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); + await mkdir(join(root, "logs", hash), { recursive: true }); + + const logger = createLogger({ sink: { kind: "file", path: infoPath } }); + const ac = new AbortController(); + + const result = await executeThread( + refsDemoWorkflow, + "refs-demo", + { prompt: "task", steps: [] }, + { + maxRounds: 5, + signal: ac.signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, + }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + logger, + ); + + expect(result.returnCode).toBe(0); + + const dataText = await readFile(dataPath, "utf8"); + const lines = dataText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(lines.length).toBe(2); + + const role1 = JSON.parse(lines[1] ?? "{}") as Record; + expect(role1.role).toBe("planner"); + expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); + + test("buildForkPlan carries refs on historical steps", () => { + const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100} +{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101} +{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102} +`; + const plan = buildForkPlan(text, null); + expect(plan.ok).toBe(true); + if (!plan.ok) { + return; + } + expect(plan.value.historicalSteps.length).toBe(1); + expect(plan.value.historicalSteps[0]?.refs).toEqual(["KEEPREFAAAAAA"]); + }); +}); diff --git a/packages/workflow/__tests__/thread-jsonl-format.test.ts b/packages/workflow/__tests__/thread-jsonl-format.test.ts index 25f8851..a8d0a32 100644 --- a/packages/workflow/__tests__/thread-jsonl-format.test.ts +++ b/packages/workflow/__tests__/thread-jsonl-format.test.ts @@ -19,13 +19,16 @@ describe("RFC-001 thread JSONL shapes", () => { role: "planner", content: "Plan: modify auth middleware...", meta: { plan: "...", files: ["src/auth.ts"] }, + refs: [] as string[], timestamp: 1714963201000, }; expect(Object.keys(startRecord).sort()).toEqual( ["hash", "name", "parameters", "threadId", "timestamp"].sort(), ); - expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort()); + expect(Object.keys(roleRecord).sort()).toEqual( + ["content", "meta", "refs", "role", "timestamp"].sort(), + ); }); test("documents the `.info.jsonl` debug record keys", () => { diff --git a/packages/workflow/src/create-workflow.ts b/packages/workflow/src/create-workflow.ts index e9b6062..954448d 100644 --- a/packages/workflow/src/create-workflow.ts +++ b/packages/workflow/src/create-workflow.ts @@ -5,6 +5,7 @@ import { END, type ExtractContext, type ModeratorContext, + type RoleDefinition, type RoleMeta, type RoleOutput, type RoleStep, @@ -22,6 +23,17 @@ function isRoleNext( return next !== END; } +function resolveExtractedRefs( + roleDef: RoleDefinition>, + meta: unknown, +): string[] { + const extractRefsFn = roleDef.extractRefs; + if (extractRefsFn === null || typeof extractRefsFn !== "function") { + return []; + } + return extractRefsFn(meta as Record); +} + /** * Binds pure role definitions + moderator to runtime agents and structured extraction. * Assign with `export const run = createWorkflow(def, binding, extract)`. @@ -48,6 +60,7 @@ export function createWorkflow( role: out.role, content: out.content, meta: out.meta, + refs: out.refs, timestamp: baseTs + i, })) as RoleStep[]; @@ -96,15 +109,21 @@ export function createWorkflow( extractCtx as unknown as ExtractContext, ); + const refs = resolveExtractedRefs( + roleDef as unknown as RoleDefinition>, + meta, + ); + const ts = Date.now(); const step = { role: next, content: raw, meta, + refs, timestamp: ts, } as RoleStep; - yield { role: step.role, content: step.content, meta: step.meta }; + yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs }; steps = [...steps, step]; } diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 626e18b..5fc45c0 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -2,6 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import type { LogFn } from "./logger.js"; +import { normalizeRefsField } from "./refs-field.js"; import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js"; export type ExecuteThreadIo = { @@ -16,6 +17,7 @@ export type PrefilledDiskStep = { role: string; content: string; meta: Record; + refs: string[]; timestamp: number; }; @@ -79,6 +81,7 @@ async function driveWorkflowGenerator(params: { role: step.role, content: step.content, meta: step.meta, + refs: normalizeRefsField(step.refs), timestamp: ts, }); @@ -151,6 +154,7 @@ export async function executeThread( role: row.role, content: row.content, meta: row.meta, + refs: normalizeRefsField(row.refs), timestamp: row.timestamp, }); } diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index d2d67f4..e3a43dd 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -1,3 +1,4 @@ +import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; import type { RoleOutput } from "./types.js"; @@ -36,6 +37,7 @@ function parseRoleLine( role, content, meta: meta as Record, + refs: normalizeRefsField(obj.refs), timestamp, }); } diff --git a/packages/workflow/src/refs-field.ts b/packages/workflow/src/refs-field.ts new file mode 100644 index 0000000..fc00edc --- /dev/null +++ b/packages/workflow/src/refs-field.ts @@ -0,0 +1,13 @@ +/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */ +export function normalizeRefsField(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + const out: string[] = []; + for (const x of value) { + if (typeof x === "string") { + out.push(x); + } + } + return out; +} diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts index 374c1cd..169c5a6 100644 --- a/packages/workflow/src/types.ts +++ b/packages/workflow/src/types.ts @@ -19,6 +19,8 @@ export type RoleOutput = { role: string; content: string; meta: Record; + /** CAS hashes produced or consumed by this step (for GC traceability). */ + refs: string[]; }; /** What the workflow AsyncGenerator returns when done. */ @@ -55,7 +57,13 @@ export type StartStep = { /** A completed role step in the thread. */ export type RoleStep = { - [K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number }; + [K in keyof M & string]: { + role: K; + meta: M[K]; + content: string; + refs: string[]; + timestamp: number; + }; }[keyof M & string]; /** Phase 1: Moderator decides next role. */ @@ -96,6 +104,8 @@ export type RoleDefinition> = { systemPrompt: string; extractPrompt: string; schema: z.ZodType; + /** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */ + extractRefs: ((meta: Meta) => string[]) | null; }; /** diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index a985ebf..cf5200e 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -5,6 +5,7 @@ import { pathToFileURL } from "node:url"; import type { PrefilledDiskStep } from "./engine.js"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; import { createLogger } from "./logger.js"; +import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; import type { RoleOutput, WorkflowFn } from "./types.js"; @@ -55,7 +56,12 @@ function parseRoleOutputRecord(obj: Record): RoleOutput | null if (meta === null || typeof meta !== "object") { return null; } - return { role, content, meta: meta as Record }; + return { + role, + content, + meta: meta as Record, + refs: normalizeRefsField(obj.refs), + }; } function parseRunStepsPayload(rec: Record): { @@ -382,6 +388,7 @@ async function main(): Promise { role: step.role, content: step.content, meta: step.meta, + refs: normalizeRefsField(step.refs), timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i, }; });