feat(util-agent): extend AgentOptions with fork / cleanup (Phase 2a) #155

Merged
xingyue merged 1 commits from fix/145-agent-fork-cleanup into main 2026-06-07 09:16:27 +00:00
11 changed files with 363 additions and 1 deletions
+18
View File
@@ -0,0 +1,18 @@
---
"@united-workforce/util-agent": minor
"@united-workforce/agent-mock": patch
"@united-workforce/agent-builtin": patch
"@united-workforce/agent-hermes": patch
"@united-workforce/agent-claude-code": patch
---
feat(util-agent): extend AgentOptions with `fork` / `cleanup` and add ask-session cache
Phase 2a infrastructure for `step ask`. Extends `AgentOptions` with
`fork: AgentForkFn | null` and `cleanup: AgentCleanupFn | null` fields, exporting
the new `AgentForkFn` and `AgentCleanupFn` type aliases. Adds `getAskSessionId` /
`setAskSessionId` to the per-agent session cache, using `<stepHash>:ask` keys
that share the cache file with exec sessions (`<threadId>:<role>` keys) without
collision. All four adapters (mock, builtin, hermes, claude-code) now pass
`fork: null, cleanup: null` — real implementations land in Phase 2b. Resolves
issue #145.
+2
View File
@@ -167,5 +167,7 @@ export function createBuiltinAgent(): () => Promise<void> {
name: "builtin", name: "builtin",
run: runBuiltin, run: runBuiltin,
continue: continueBuiltin, continue: continueBuiltin,
fork: null,
cleanup: null,
}); });
} }
@@ -253,5 +253,7 @@ export function createClaudeCodeAgent(model: string | null): () => Promise<void>
name: "claude-code", name: "claude-code",
run: (ctx) => runClaudeCode(ctx, model), run: (ctx) => runClaudeCode(ctx, model),
continue: (sessionId, message, store) => continueClaudeCode(sessionId, message, store, model), continue: (sessionId, message, store) => continueClaudeCode(sessionId, message, store, model),
fork: null,
cleanup: null,
}); });
} }
+2
View File
@@ -246,6 +246,8 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
name: "hermes", name: "hermes",
run: runHermes, run: runHermes,
continue: continueHermes, continue: continueHermes,
fork: null,
cleanup: null,
}); });
// Wrap to ensure ACP client is closed after agent completes, // Wrap to ensure ACP client is closed after agent completes,
+2
View File
@@ -125,5 +125,7 @@ export function createMockAgent(mockDataPath: string): () => Promise<void> {
name: "mock", name: "mock",
run, run,
continue: continueRun, continue: continueRun,
fork: null,
cleanup: null,
}); });
} }
@@ -0,0 +1,60 @@
import { readFile } from "node:fs/promises";
import { join } from "node:path";
import { describe, expect, test } from "vitest";
/**
* Source-level verification that each adapter's `createAgent({...})` call
* includes the new `fork: null` and `cleanup: null` fields.
*
* Adapters are CLI binaries that spawn external processes — runtime testing
* requires real LLM environments — so we use static source inspection here.
* Type-level correctness is enforced separately by `tsc --build`.
*/
const REPO_ROOT = join(__dirname, "..", "..", "..");
const ADAPTERS: Array<{ name: string; path: string }> = [
{ name: "agent-mock", path: "packages/agent-mock/src/mock-agent.ts" },
{ name: "agent-builtin", path: "packages/agent-builtin/src/agent.ts" },
{ name: "agent-hermes", path: "packages/agent-hermes/src/hermes.ts" },
{ name: "agent-claude-code", path: "packages/agent-claude-code/src/claude-code.ts" },
];
/** Find the matching `}` for the `{` at `openIdx` in `source`. */
function findMatchingBrace(source: string, openIdx: number): number {
let depth = 0;
for (let i = openIdx; i < source.length; i++) {
const ch = source[i];
if (ch === "{") {
depth++;
} else if (ch === "}") {
depth--;
if (depth === 0) {
return i;
}
}
}
return -1;
}
/** Extract the `createAgent({...})` block from adapter source. */
function extractCreateAgentBlock(source: string): string {
const startIdx = source.indexOf("createAgent({");
expect(startIdx).toBeGreaterThanOrEqual(0);
const openIdx = source.indexOf("{", startIdx);
const endIdx = findMatchingBrace(source, openIdx);
expect(endIdx).toBeGreaterThan(openIdx);
return source.slice(openIdx, endIdx + 1);
}
describe("adapter createAgent calls include fork: null and cleanup: null", () => {
for (const adapter of ADAPTERS) {
test(`${adapter.name} createAgent call includes fork: null and cleanup: null`, async () => {
const source = await readFile(join(REPO_ROOT, adapter.path), "utf8");
expect(source).toMatch(/createAgent\s*\(\s*\{/);
const block = extractCreateAgentBlock(source);
expect(block).toMatch(/fork:\s*null/);
expect(block).toMatch(/cleanup:\s*null/);
});
}
});
@@ -0,0 +1,78 @@
import type { Store } from "@ocas/core";
import { describe, expect, test } from "vitest";
import type {
AgentCleanupFn,
AgentContext,
AgentContinueFn,
AgentForkFn,
AgentOptions,
AgentRunFn,
} from "../src/types.js";
const makeRun: AgentRunFn = async (_ctx: AgentContext) => ({
output: "",
detailHash: "",
sessionId: "",
assembledPrompt: "",
usage: null,
});
const makeContinue: AgentContinueFn = async (_sessionId, _message, _store) => ({
output: "",
detailHash: "",
sessionId: "",
assembledPrompt: "",
usage: null,
});
describe("AgentOptions fork/cleanup", () => {
test("AgentOptions accepts fork and cleanup as null", () => {
const opts: AgentOptions = {
name: "test",
run: makeRun,
continue: makeContinue,
fork: null,
cleanup: null,
};
expect(opts.name).toBe("test");
expect(opts.run).toBe(makeRun);
expect(opts.continue).toBe(makeContinue);
expect(opts.fork).toBeNull();
expect(opts.cleanup).toBeNull();
});
test("AgentOptions accepts real fork and cleanup functions", () => {
const fork: AgentForkFn = async (sessionId, _store) => `${sessionId}-forked`;
const cleanup: AgentCleanupFn = async () => {
/* no-op */
};
const opts: AgentOptions = {
name: "test",
run: makeRun,
continue: makeContinue,
fork,
cleanup,
};
expect(typeof opts.fork).toBe("function");
expect(typeof opts.cleanup).toBe("function");
});
test("AgentForkFn signature accepts (sessionId: string, store: Store) and returns Promise<string>", async () => {
const fork: AgentForkFn = async (sessionId, _store) => `${sessionId}-child`;
// Cast a placeholder Store — only the signature shape matters for this test.
const fakeStore = {} as Store;
const result = await fork("session-abc", fakeStore);
expect(result).toBe("session-abc-child");
});
test("AgentCleanupFn signature accepts no args and returns Promise<void>", async () => {
let called = false;
const cleanup: AgentCleanupFn = async () => {
called = true;
};
const result = await cleanup();
expect(result).toBeUndefined();
expect(called).toBe(true);
});
});
@@ -0,0 +1,131 @@
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import type { ThreadId } from "@united-workforce/protocol";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import {
getAskSessionId,
getCachedSessionId,
getCachePath,
setAskSessionId,
setCachedSessionId,
} from "../src/session-cache.js";
import { getDefaultStorageRoot } from "../src/storage.js";
describe("session-cache ask sessions", () => {
let testStorageRoot: string;
beforeEach(async () => {
testStorageRoot = join(
getDefaultStorageRoot(),
"test-cache",
`ask-${Date.now()}-${Math.random()}`,
);
await mkdir(testStorageRoot, { recursive: true });
});
afterEach(async () => {
await rm(testStorageRoot, { recursive: true, force: true });
});
const stepHash = "ABCDEFG1234567";
test("getAskSessionId returns null when no ask session cached", async () => {
const session = await getAskSessionId("claude-code", stepHash, testStorageRoot);
expect(session).toBeNull();
});
test("setAskSessionId + getAskSessionId round-trip", async () => {
await setAskSessionId("claude-code", stepHash, "ask-session-123", testStorageRoot);
const session = await getAskSessionId("claude-code", stepHash, testStorageRoot);
expect(session).toBe("ask-session-123");
});
test("ask cache keys use stepHash:ask format", async () => {
await setAskSessionId("claude-code", stepHash, "ask-session-456", testStorageRoot);
const cachePath = getCachePath("claude-code", testStorageRoot);
const content = JSON.parse(await readFile(cachePath, "utf8")) as Record<string, string>;
expect(content).toHaveProperty(`${stepHash}:ask`, "ask-session-456");
});
test("exec cache and ask cache coexist in same file", async () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
await setCachedSessionId("claude-code", threadId, role, "exec-session", testStorageRoot);
await setAskSessionId("claude-code", stepHash, "ask-session", testStorageRoot);
const cachePath = getCachePath("claude-code", testStorageRoot);
const content = JSON.parse(await readFile(cachePath, "utf8")) as Record<string, string>;
expect(content).toHaveProperty(`${threadId}:${role}`, "exec-session");
expect(content).toHaveProperty(`${stepHash}:ask`, "ask-session");
expect(await getCachedSessionId("claude-code", threadId, role, testStorageRoot)).toBe(
"exec-session",
);
expect(await getAskSessionId("claude-code", stepHash, testStorageRoot)).toBe("ask-session");
});
test("updating ask session does not affect exec session", async () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
await setCachedSessionId("claude-code", threadId, role, "exec-original", testStorageRoot);
await setAskSessionId("claude-code", stepHash, "ask-original", testStorageRoot);
await setAskSessionId("claude-code", stepHash, "ask-updated", testStorageRoot);
expect(await getCachedSessionId("claude-code", threadId, role, testStorageRoot)).toBe(
"exec-original",
);
expect(await getAskSessionId("claude-code", stepHash, testStorageRoot)).toBe("ask-updated");
});
test("updating exec session does not affect ask session", async () => {
const threadId = "01234567890123456789012345" as ThreadId;
const role = "developer";
await setAskSessionId("claude-code", stepHash, "ask-original", testStorageRoot);
await setCachedSessionId("claude-code", threadId, role, "exec-original", testStorageRoot);
await setCachedSessionId("claude-code", threadId, role, "exec-updated", testStorageRoot);
expect(await getAskSessionId("claude-code", stepHash, testStorageRoot)).toBe("ask-original");
expect(await getCachedSessionId("claude-code", threadId, role, testStorageRoot)).toBe(
"exec-updated",
);
});
test("different stepHashes have independent ask sessions", async () => {
const stepHashA = "AAAAAAA1234567";
const stepHashB = "BBBBBBB1234567";
await setAskSessionId("claude-code", stepHashA, "session-A", testStorageRoot);
await setAskSessionId("claude-code", stepHashB, "session-B", testStorageRoot);
expect(await getAskSessionId("claude-code", stepHashA, testStorageRoot)).toBe("session-A");
expect(await getAskSessionId("claude-code", stepHashB, testStorageRoot)).toBe("session-B");
});
test("ask session for one agent does not leak to another", async () => {
await setAskSessionId("claude-code", stepHash, "cc-ask-session", testStorageRoot);
const ccSession = await getAskSessionId("claude-code", stepHash, testStorageRoot);
const hermesSession = await getAskSessionId("hermes", stepHash, testStorageRoot);
expect(ccSession).toBe("cc-ask-session");
expect(hermesSession).toBeNull();
});
test("empty string ask session treated as missing", async () => {
const cachePath = getCachePath("claude-code", testStorageRoot);
await mkdir(dirname(cachePath), { recursive: true });
await writeFile(cachePath, JSON.stringify({ [`${stepHash}:ask`]: "" }), "utf8");
const session = await getAskSessionId("claude-code", stepHash, testStorageRoot);
expect(session).toBeNull();
});
});
+9 -1
View File
@@ -14,12 +14,20 @@ export type { FrontmatterFastPathResult } from "./frontmatter.js";
export { tryFrontmatterFastPath } from "./frontmatter.js"; export { tryFrontmatterFastPath } from "./frontmatter.js";
export { buildFrontmatterRetryPrompt } from "./frontmatter-retry-prompt.js"; export { buildFrontmatterRetryPrompt } from "./frontmatter-retry-prompt.js";
export { createAgent, parseArgv } from "./run.js"; export { createAgent, parseArgv } from "./run.js";
export { getCachedSessionId, getCachePath, setCachedSessionId } from "./session-cache.js"; export {
getAskSessionId,
getCachedSessionId,
getCachePath,
setAskSessionId,
setCachedSessionId,
} from "./session-cache.js";
export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js"; export { getConfigPath, getEnvPath, loadWorkflowConfig, resolveStorageRoot } from "./storage.js";
export type { export type {
AdapterOutput, AdapterOutput,
AgentCleanupFn,
AgentContext, AgentContext,
AgentContinueFn, AgentContinueFn,
AgentForkFn,
AgentOptions, AgentOptions,
AgentRunFn, AgentRunFn,
AgentRunResult, AgentRunResult,
+34
View File
@@ -14,6 +14,10 @@ function cacheKey(threadId: ThreadId, role: string): string {
return `${threadId}:${role}`; return `${threadId}:${role}`;
} }
function askCacheKey(stepHash: string): string {
return `${stepHash}:ask`;
}
function isRecord(value: unknown): value is Record<string, unknown> { function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value); return typeof value === "object" && value !== null && !Array.isArray(value);
} }
@@ -86,3 +90,33 @@ export async function setCachedSessionId(
cache[cacheKey(threadId, role)] = sessionId; cache[cacheKey(threadId, role)] = sessionId;
await writeCache(agentName, storageRoot, cache); await writeCache(agentName, storageRoot, cache);
} }
/**
* Read the cached ask-session ID for a stepHash.
*
* Ask sessions are forked side conversations spawned by `step ask` from a
* specific completed step. They share the per-agent cache file with exec
* sessions but use the `<stepHash>:ask` key shape so the two namespaces
* never collide.
*/
export async function getAskSessionId(
agentName: string,
stepHash: string,
storageRoot: string,
): Promise<string | null> {
const cache = await readCache(agentName, storageRoot);
const sessionId = cache[askCacheKey(stepHash)];
return sessionId ?? null;
}
/** Write the ask-session ID for a stepHash into the cache. */
export async function setAskSessionId(
agentName: string,
stepHash: string,
sessionId: string,
storageRoot: string,
): Promise<void> {
const cache = await readCache(agentName, storageRoot);
cache[askCacheKey(stepHash)] = sessionId;
await writeCache(agentName, storageRoot, cache);
}
+25
View File
@@ -50,6 +50,21 @@ export type AgentContinueFn = (
export type AgentRunFn = (ctx: AgentContext) => Promise<AgentRunResult>; export type AgentRunFn = (ctx: AgentContext) => Promise<AgentRunResult>;
/**
* Fork an existing agent session, returning a new session ID that branches
* from the source session's state. Used by `step ask` (Phase 2a infrastructure)
* to spawn a side conversation from a completed step's session without
* polluting the original session's history.
*/
export type AgentForkFn = (sessionId: string, store: AgentContext["store"]) => Promise<string>;
/**
* Clean up adapter-level resources (e.g. close ACP client, kill subprocesses).
* Invoked by the agent CLI factory after the run completes — regardless of
* success or failure — so adapters can release I/O handles deterministically.
*/
export type AgentCleanupFn = () => Promise<void>;
export type AdapterOutput = { export type AdapterOutput = {
stepHash: string; stepHash: string;
detailHash: string; detailHash: string;
@@ -65,4 +80,14 @@ export type AgentOptions = {
name: string; name: string;
run: AgentRunFn; run: AgentRunFn;
continue: AgentContinueFn; continue: AgentContinueFn;
/**
* Optional session-fork hook. null means the adapter does not yet support
* `step ask` (Phase 2a placeholder — wired up in Phase 2b).
*/
fork: AgentForkFn | null;
/**
* Optional cleanup hook invoked after the agent CLI completes. null means
* the adapter has no resources to release.
*/
cleanup: AgentCleanupFn | null;
}; };