feat(dispatcher): AgentClient interface + OC Plugin integration (#32)

- New AgentClient interface: push(actor, events, summary?)
- OcPluginAgentClient: POST to /plugins/ograph/dispatch
- Scheduler rewritten: uses AgentClient instead of Telegram/file/CLI
- Removed OC session-status dependency (Plugin manages backpressure)
- Removed cooldown logic (Plugin debounce replaces it)
- Config supports agents[] array with type/url/secret/actor
- End-to-end verified: Engine → Dispatcher → Plugin → Agent session

Closes #32
This commit is contained in:
小墨 2026-04-13 07:38:44 +00:00
parent a3aa77eb31
commit 9d8ebe9f74
5 changed files with 258 additions and 124 deletions

View File

@ -0,0 +1,58 @@
// AgentClient interface and implementations
// Provides a unified interface for pushing events to various agents
import type { OGraphEvent } from './types.js';
// AgentClient 接口(#28 #31 共识)
export interface AgentClient {
name: string;
push(actor: string, events: OGraphEvent[], summary?: string): Promise<PushResult>;
}
export interface PushResult {
ok: boolean;
buffered?: number;
sessionBusy?: boolean;
error?: string;
}
// OC Plugin AgentClient 实现
export class OcPluginAgentClient implements AgentClient {
name: string;
constructor(
private readonly url: string, // http://localhost:18789/plugins/ograph/dispatch
private readonly secret: string, // gateway auth token
private readonly defaultActor: string = 'task-execution',
) {
this.name = `oc-plugin(${url})`;
}
async push(actor: string, events: OGraphEvent[], summary?: string): Promise<PushResult> {
const resp = await fetch(this.url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.secret}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
topic: actor || this.defaultActor,
events: events.map(e => ({
id: e.id,
type: e.type_name, // 字段映射:type_name → type
payload: e.payload,
created_at: e.created_at,
})),
summary,
}),
});
if (!resp.ok) {
const body = await resp.text();
return { ok: false, error: `HTTP ${resp.status}: ${body}` };
}
const result = await resp.json() as PushResult;
return result;
}
}

View File

@ -17,11 +17,13 @@ const DEFAULTS: DispatcherConfig = {
projections: [], projections: [],
}, },
discovery: undefined, discovery: undefined,
push: undefined,
oc: { oc: {
statusEndpoint: 'http://localhost:18789/plugins/session-status/status', statusEndpoint: 'http://localhost:18789/plugins/session-status/status',
statusToken: 'ograph-status-token-2026', statusToken: 'ograph-status-token-2026',
minAvailable: 2, minAvailable: 2,
}, },
agents: undefined, // 新增 agents 默认配置
intervals: { intervals: {
watcherIdle: 30_000, watcherIdle: 30_000,
watcherActive: 5_000, watcherActive: 5_000,
@ -91,5 +93,40 @@ export function loadConfig(): DispatcherConfig {
config.discovery.agentId = parseInt(process.env['OGRAPH_AGENT_ID'], 10); config.discovery.agentId = parseInt(process.env['OGRAPH_AGENT_ID'], 10);
} }
// ── Push configuration overrides ───────────────────────────────────────────
if (process.env['PUSH_TELEGRAM_BOT_TOKEN']) {
config.push = config.push ?? { method: 'telegram' };
config.push.method = 'telegram';
config.push.telegramBotToken = process.env['PUSH_TELEGRAM_BOT_TOKEN'];
}
if (process.env['PUSH_TELEGRAM_CHAT_ID']) {
config.push = config.push ?? { method: 'telegram' };
config.push.telegramChatId = process.env['PUSH_TELEGRAM_CHAT_ID'];
}
// ── Agent configuration overrides ───────────────────────────────────────────
// 如果配置文件没有 agents 但有环境变量,创建默认 agent 配置
if (process.env['AGENT_OC_URL'] || process.env['AGENT_OC_SECRET'] || process.env['AGENT_OC_ACTOR']) {
config.agents = config.agents ?? [{
type: 'oc-plugin',
url: 'http://localhost:18789/plugins/ograph/dispatch',
secret: '',
actor: 'task-execution',
}];
}
// 环境变量覆盖第一个 agent 的配置
if (config.agents && config.agents.length > 0) {
if (process.env['AGENT_OC_URL']) {
config.agents[0].url = process.env['AGENT_OC_URL'];
}
if (process.env['AGENT_OC_SECRET']) {
config.agents[0].secret = process.env['AGENT_OC_SECRET'];
}
if (process.env['AGENT_OC_ACTOR']) {
config.agents[0].actor = process.env['AGENT_OC_ACTOR'];
}
}
return config; return config;
} }

View File

@ -4,12 +4,26 @@
import { loadConfig } from './config.js'; import { loadConfig } from './config.js';
import { ProjectionWatcher } from './watcher.js'; import { ProjectionWatcher } from './watcher.js';
import { OcScheduler } from './scheduler.js'; import { OcScheduler } from './scheduler.js';
import type { PendingEntry } from './types.js'; import { OcPluginAgentClient } from './agent-client.js';
import type { PendingEntry, AgentConfig } from './types.js';
import type { AgentClient } from './agent-client.js';
function ts(): string { function ts(): string {
return new Date().toISOString(); return new Date().toISOString();
} }
function createAgentClient(agentConfig: AgentConfig): AgentClient {
switch (agentConfig.type) {
case 'oc-plugin':
return new OcPluginAgentClient(agentConfig.url, agentConfig.secret, agentConfig.actor);
case 'hermes':
// Future: implement Hermes client
throw new Error(`Agent type '${agentConfig.type}' not yet implemented`);
default:
throw new Error(`Unknown agent type: ${(agentConfig as any).type}`);
}
}
async function main(): Promise<void> { async function main(): Promise<void> {
console.log(`[${ts()}] [dispatcher] OGraph Dispatcher starting...`); console.log(`[${ts()}] [dispatcher] OGraph Dispatcher starting...`);
@ -18,19 +32,32 @@ async function main(): Promise<void> {
console.log(`[${ts()}] [dispatcher] config loaded`); console.log(`[${ts()}] [dispatcher] config loaded`);
console.log(`[${ts()}] [dispatcher] ograph.endpoint = ${config.ograph.endpoint}`); console.log(`[${ts()}] [dispatcher] ograph.endpoint = ${config.ograph.endpoint}`);
console.log(`[${ts()}] [dispatcher] ograph.projections = [${config.ograph.projections.join(', ')}]`); console.log(`[${ts()}] [dispatcher] ograph.projections = [${config.ograph.projections.join(', ')}]`);
console.log(`[${ts()}] [dispatcher] oc.statusEndpoint = ${config.oc.statusEndpoint}`);
console.log(`[${ts()}] [dispatcher] oc.minAvailable = ${config.oc.minAvailable}`);
console.log(`[${ts()}] [dispatcher] intervals.watcherIdle = ${config.intervals.watcherIdle}ms`); console.log(`[${ts()}] [dispatcher] intervals.watcherIdle = ${config.intervals.watcherIdle}ms`);
console.log(`[${ts()}] [dispatcher] intervals.watcherActive = ${config.intervals.watcherActive}ms`); console.log(`[${ts()}] [dispatcher] intervals.watcherActive = ${config.intervals.watcherActive}ms`);
console.log(`[${ts()}] [dispatcher] intervals.schedulerIdle = ${config.intervals.schedulerIdle}ms`); console.log(`[${ts()}] [dispatcher] intervals.schedulerIdle = ${config.intervals.schedulerIdle}ms`);
console.log(`[${ts()}] [dispatcher] intervals.schedulerActive= ${config.intervals.schedulerActive}ms`); console.log(`[${ts()}] [dispatcher] intervals.schedulerActive= ${config.intervals.schedulerActive}ms`);
console.log(`[${ts()}] [dispatcher] intervals.cooldownAfterPush = ${config.intervals.cooldownAfterPush}ms`);
// Create agent clients from config
const agentClients: AgentClient[] = [];
if (config.agents && config.agents.length > 0) {
for (const agentConfig of config.agents) {
try {
const client = createAgentClient(agentConfig);
agentClients.push(client);
console.log(`[${ts()}] [dispatcher] agent client: ${client.name}`);
} catch (err) {
console.error(`[${ts()}] [dispatcher] failed to create agent client: ${err instanceof Error ? err.message : String(err)}`);
}
}
} else {
console.warn(`[${ts()}] [dispatcher] no agents configured in config.agents - scheduler will not push anywhere`);
}
// Shared pending queue — Watcher writes, Scheduler reads + clears // Shared pending queue — Watcher writes, Scheduler reads + clears
const pending: Map<string, PendingEntry> = new Map(); const pending: Map<string, PendingEntry> = new Map();
const watcher = new ProjectionWatcher(config, pending); const watcher = new ProjectionWatcher(config, pending);
const scheduler = new OcScheduler(config, pending); const scheduler = new OcScheduler(config, pending, agentClients);
// Start both loops independently // Start both loops independently
watcher.start(); watcher.start();

View File

@ -1,37 +1,32 @@
// Loop B: OC Scheduler // Loop B: OC Scheduler
// Polls OC busy/idle state, pushes pending queue when OC is available. // Uses AgentClient interface to push events to various agents.
import { writeFileSync, mkdirSync } from 'node:fs'; import type { DispatcherConfig, PendingEntry, OGraphEvent } from './types.js';
import { join } from 'node:path'; import type { AgentClient } from './agent-client.js';
import { execFileSync } from 'node:child_process';
import type { DispatcherConfig, PendingEntry } from './types.js';
import { OcClient } from './oc-client.js';
function ts(): string { function ts(): string {
return new Date().toISOString(); return new Date().toISOString();
} }
/** Dispatch file path — a simple push mechanism readable by OC / other tools */
const DISPATCH_FILE = '/tmp/ograph-dispatch.json';
export class OcScheduler { export class OcScheduler {
private readonly client: OcClient;
private running = false; private running = false;
private timer: ReturnType<typeof setTimeout> | null = null; private timer: ReturnType<typeof setTimeout> | null = null;
private lastPushAt = 0;
constructor( constructor(
private readonly config: DispatcherConfig, private readonly config: DispatcherConfig,
/** Shared pending queue (read + cleared by Scheduler) */ /** Shared pending queue (read + cleared by Scheduler) */
private readonly pending: Map<string, PendingEntry>, private readonly pending: Map<string, PendingEntry>,
) { /** Agent clients for pushing events */
this.client = new OcClient(config); private readonly agentClients: AgentClient[],
} ) {}
start(): void { start(): void {
if (this.running) return; if (this.running) return;
this.running = true; this.running = true;
console.log(`[${ts()}] [scheduler] started — OC status: ${this.config.oc.statusEndpoint}`); console.log(`[${ts()}] [scheduler] started with ${this.agentClients.length} agent(s)`);
for (const client of this.agentClients) {
console.log(`[${ts()}] [scheduler] agent: ${client.name}`);
}
void this.poll(); void this.poll();
} }
@ -54,46 +49,27 @@ export class OcScheduler {
private async poll(): Promise<void> { private async poll(): Promise<void> {
if (!this.running) return; if (!this.running) return;
const { schedulerIdle, schedulerActive, cooldownAfterPush } = this.config.intervals; const { schedulerIdle, schedulerActive } = this.config.intervals;
try { try {
const hasPending = this.pending.size > 0; const hasPending = this.pending.size > 0;
// Check cooldown first
const now = Date.now();
if (this.lastPushAt > 0 && now - this.lastPushAt < cooldownAfterPush) {
const remainMs = cooldownAfterPush - (now - this.lastPushAt);
console.log(`[${ts()}] [scheduler] in cooldown — ${Math.ceil(remainMs / 1000)}s remaining`);
this.scheduleNext(hasPending ? schedulerActive : schedulerIdle);
return;
}
if (!hasPending) { if (!hasPending) {
// Nothing to push; low-frequency idle polling // Nothing to push; low-frequency idle polling
this.scheduleNext(schedulerIdle); this.scheduleNext(schedulerIdle);
return; return;
} }
// We have pending items — check OC availability if (this.agentClients.length === 0) {
let available = false; console.warn(`[${ts()}] [scheduler] no agent clients configured, clearing pending queue`);
try { this.pending.clear();
available = await this.client.isAvailable(); this.scheduleNext(schedulerIdle);
console.log(`[${ts()}] [scheduler] OC available=${available} pending=${this.pending.size}`);
} catch (err) {
console.warn(`[${ts()}] [scheduler] OC status check failed: ${err instanceof Error ? err.message : String(err)}`);
// Can't determine status — back off
this.scheduleNext(schedulerActive);
return; return;
} }
if (available) { // We have pending items — push to all agents
await this.push(); await this.push();
this.lastPushAt = Date.now();
this.scheduleNext(schedulerIdle); // after push, slow down this.scheduleNext(schedulerIdle); // after push, slow down
} else {
// OC busy but we have work — keep polling actively
this.scheduleNext(schedulerActive);
}
} catch (err) { } catch (err) {
console.error(`[${ts()}] [scheduler] poll error: ${err instanceof Error ? err.message : String(err)}`); console.error(`[${ts()}] [scheduler] poll error: ${err instanceof Error ? err.message : String(err)}`);
this.scheduleNext(schedulerActive); this.scheduleNext(schedulerActive);
@ -104,92 +80,114 @@ export class OcScheduler {
const entries = Array.from(this.pending.values()); const entries = Array.from(this.pending.values());
if (entries.length === 0) return; if (entries.length === 0) return;
const message = this.buildMessage(entries); console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to ${this.agentClients.length} agent(s)`);
console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to OC`); // Build events array from pending entries
console.log(`[${ts()}] [scheduler] message:\n${message}`); const events: OGraphEvent[] = [];
for (const entry of entries) {
// ── Strategy 1: write dispatch file ───────────────────────────────────── if (entry.events && entry.events.length > 0) {
try { // Event-stream mode: use the actual events
mkdirSync('/tmp', { recursive: true }); events.push(...entry.events);
const payload = { } else {
pushedAt: new Date().toISOString(), // Projection mode: convert to synthetic events
changes: entries, const syntheticEvent: OGraphEvent = {
message, id: Date.now() + Math.floor(Math.random() * 1000), // synthetic ID
type_hash: 'projection_changed',
type_name: 'projection_changed',
payload: {
projection: entry.name,
previousValue: entry.previousValue,
currentValue: entry.currentValue,
changeCount: entry.changeCount,
firstDetectedAt: entry.firstDetectedAt,
lastDetectedAt: entry.lastDetectedAt,
},
created_at: entry.lastDetectedAt,
}; };
writeFileSync(DISPATCH_FILE, JSON.stringify(payload, null, 2), 'utf-8'); events.push(syntheticEvent);
console.log(`[${ts()}] [scheduler] dispatch file written: ${DISPATCH_FILE}`); }
} catch (err) {
console.warn(`[${ts()}] [scheduler] failed to write dispatch file: ${err instanceof Error ? err.message : String(err)}`);
} }
// ── Strategy 2: openclaw message send (best-effort) ────────────────────── const summary = this.buildSummary(entries, events);
console.log(`[${ts()}] [scheduler] summary: ${summary}`);
// Push to all agent clients
const pushPromises = this.agentClients.map(async (client) => {
try { try {
execFileSync('openclaw', ['message', 'send', message], { timeout: 10_000, stdio: 'pipe' }); const result = await client.push('task-execution', events, summary);
console.log(`[${ts()}] [scheduler] openclaw message send OK`); if (result.ok) {
console.log(`[${ts()}] [scheduler] ${client.name} push OK ${result.buffered ? `(buffered: ${result.buffered})` : ''}${result.sessionBusy ? ' (session busy)' : ''}`);
} else {
console.warn(`[${ts()}] [scheduler] ${client.name} push failed: ${result.error}`);
}
return result;
} catch (err) { } catch (err) {
// Not fatal — openclaw may not be in PATH or the channel may not be configured console.error(`[${ts()}] [scheduler] ${client.name} push error: ${err instanceof Error ? err.message : String(err)}`);
console.warn(`[${ts()}] [scheduler] openclaw message send failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`); return { ok: false, error: err instanceof Error ? err.message : String(err) };
} }
});
// Clear pending after successful push attempt const results = await Promise.allSettled(pushPromises);
const successCount = results.filter(r => r.status === 'fulfilled' && r.value.ok).length;
if (successCount > 0) {
// At least one agent got the events successfully, clear pending
this.pending.clear(); this.pending.clear();
console.log(`[${ts()}] [scheduler] pending queue cleared`); console.log(`[${ts()}] [scheduler] pending queue cleared (${successCount}/${this.agentClients.length} agents succeeded)`);
} else {
// All agents failed, keep pending for retry
console.warn(`[${ts()}] [scheduler] all agents failed, keeping pending for retry`);
}
} }
private buildMessage(entries: PendingEntry[]): string { private buildSummary(entries: PendingEntry[], events: OGraphEvent[]): string {
// 检查是否有事件流模式的条目 // Check if we have event-stream mode entries
const hasEvents = entries.some(entry => entry.name.startsWith('event:')); const hasEvents = entries.some(entry => entry.name.startsWith('event:'));
if (hasEvents) { if (hasEvents) {
return this.buildEventMessage(entries); return this.buildEventSummary(entries, events);
} else { } else {
return this.buildProjectionMessage(entries); return this.buildProjectionSummary(entries);
} }
} }
private buildEventMessage(entries: PendingEntry[]): string { private buildEventSummary(entries: PendingEntry[], events: OGraphEvent[]): string {
const lines: string[] = ['📋 OGraph Event Stream Updates\n']; const lines: string[] = ['📋 OGraph Event Stream Updates'];
for (const entry of entries) { for (const event of events.slice(0, 5)) { // limit to first 5 events
if (entry.events && entry.events.length > 0) { const age = Math.round((Date.now() - event.created_at) / 1000);
const event = entry.events[0]; // 只显示第一个事件(每个 entry 对应一个 event)
const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000);
// 根据 event type 格式化消息 // Format based on event type
if (event.type_name === 'task_created') { if (event.type_name === 'task_created') {
const payload = event.payload as any; const payload = event.payload as any;
lines.push(`📋 新任务: #${payload.id} ${payload.title} (${payload.priority}) — 由${payload.creator || '系统'}创建`); lines.push(`📋 新任务: #${payload.subject} ${payload.title || '(无标题)'} (${payload.priority || 'normal'}) — 由 agent:${payload.creator || '?'} 创建`);
} else if (event.type_name === 'task_updated') { } else if (event.type_name === 'task_status_changed') {
const payload = event.payload as any; const payload = event.payload as any;
lines.push(`📋 #${payload.id} 状态更新: ${payload.previous_status}${payload.new_status}`); lines.push(`📋 #${payload.subject} 状态更新: → ${payload.status}`);
} else { } else {
// 通用格式 lines.push(`${event.type_name} #${event.id} (${age}s ago)`);
lines.push(`• **${event.type_name}** #${event.id}`);
lines.push(` Payload: ${JSON.stringify(event.payload)}`);
lines.push(` Age: ${age}s`);
}
lines.push('');
} }
} }
lines.push(`Total: ${entries.length} event(s) detected`); if (events.length > 5) {
lines.push(`... and ${events.length - 5} more events`);
}
return lines.join('\n'); return lines.join('\n');
} }
private buildProjectionMessage(entries: PendingEntry[]): string { private buildProjectionSummary(entries: PendingEntry[]): string {
const lines: string[] = ['🔔 OGraph Projection Changes Detected\n']; const lines: string[] = ['🔔 OGraph Projection Changes'];
for (const entry of entries) { for (const entry of entries.slice(0, 3)) { // limit to first 3
const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000);
lines.push(`**${entry.name}**`); lines.push(`${entry.name}: ${entry.changeCount} change(s), ${age}s ago`);
lines.push(` Changes: ${entry.changeCount} | Age: ${age}s`); }
lines.push(` Previous: ${JSON.stringify(entry.previousValue)}`);
lines.push(` Current: ${JSON.stringify(entry.currentValue)}`); if (entries.length > 3) {
lines.push(''); lines.push(`... and ${entries.length - 3} more projections`);
} }
lines.push(`Total: ${entries.length} projection(s) changed`);
return lines.join('\n'); return lines.join('\n');
} }
} }

View File

@ -1,5 +1,13 @@
// OGraph Dispatcher — shared types // OGraph Dispatcher — shared types
export interface AgentConfig {
type: 'oc-plugin' | 'hermes'; // 未来可扩展
url: string;
secret: string;
actor: string; // 固定 actor 名
actorPattern?: string; // 动态 actor(P2b 用,现在可选)
}
export interface DispatcherConfig { export interface DispatcherConfig {
ograph: { ograph: {
endpoint: string; // OGraph API endpoint endpoint: string; // OGraph API endpoint
@ -10,11 +18,17 @@ export interface DispatcherConfig {
agentId: number; // 当前 agent 的 object id agentId: number; // 当前 agent 的 object id
eventTypes?: string[]; // 只关注哪些 event type(可选过滤) eventTypes?: string[]; // 只关注哪些 event type(可选过滤)
}; };
push?: {
method: 'telegram' | 'file'; // 推送方式
telegramBotToken?: string;
telegramChatId?: string;
};
oc: { oc: {
statusEndpoint: string; // session-status plugin URL statusEndpoint: string; // session-status plugin URL
statusToken: string; // Bearer token statusToken: string; // Bearer token
minAvailable: number; // 最少空闲槽位(默认 2) minAvailable: number; // 最少空闲槽位(默认 2)
}; };
agents?: AgentConfig[]; // 新的 AgentClient 配置
intervals: { intervals: {
watcherIdle: number; // 无变化时 poll 间隔 ms(默认 30000) watcherIdle: number; // 无变化时 poll 间隔 ms(默认 30000)
watcherActive: number; // 有变化时 poll 间隔 ms(默认 5000) watcherActive: number; // 有变化时 poll 间隔 ms(默认 5000)