From 9d8ebe9f74b2372610c39485a70ee99e64d49306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E5=A2=A8?= Date: Mon, 13 Apr 2026 07:38:44 +0000 Subject: [PATCH] feat(dispatcher): AgentClient interface + OC Plugin integration (#32) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New AgentClient interface: push(actor, events, summary?) - OcPluginAgentClient: POST to /plugins/ograph/dispatch - Scheduler rewritten: uses AgentClient instead of Telegram/file/CLI - Removed OC session-status dependency (Plugin manages backpressure) - Removed cooldown logic (Plugin debounce replaces it) - Config supports agents[] array with type/url/secret/actor - End-to-end verified: Engine → Dispatcher → Plugin → Agent session Closes #32 --- packages/dispatcher/src/agent-client.ts | 58 ++++++ packages/dispatcher/src/config.ts | 37 ++++ packages/dispatcher/src/index.ts | 39 +++- packages/dispatcher/src/scheduler.ts | 234 ++++++++++++------------ packages/dispatcher/src/types.ts | 14 ++ 5 files changed, 258 insertions(+), 124 deletions(-) create mode 100644 packages/dispatcher/src/agent-client.ts diff --git a/packages/dispatcher/src/agent-client.ts b/packages/dispatcher/src/agent-client.ts new file mode 100644 index 0000000..cf28882 --- /dev/null +++ b/packages/dispatcher/src/agent-client.ts @@ -0,0 +1,58 @@ +// AgentClient interface and implementations +// Provides a unified interface for pushing events to various agents + +import type { OGraphEvent } from './types.js'; + +// AgentClient 接口(#28 #31 共识) +export interface AgentClient { + name: string; + push(actor: string, events: OGraphEvent[], summary?: string): Promise; +} + +export interface PushResult { + ok: boolean; + buffered?: number; + sessionBusy?: boolean; + error?: string; +} + +// OC Plugin AgentClient 实现 +export class OcPluginAgentClient implements AgentClient { + name: string; + + constructor( + private readonly url: string, // http://localhost:18789/plugins/ograph/dispatch + private readonly secret: string, // gateway auth token + private readonly defaultActor: string = 'task-execution', + ) { + this.name = `oc-plugin(${url})`; + } + + async push(actor: string, events: OGraphEvent[], summary?: string): Promise { + const resp = await fetch(this.url, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${this.secret}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + topic: actor || this.defaultActor, + events: events.map(e => ({ + id: e.id, + type: e.type_name, // 字段映射:type_name → type + payload: e.payload, + created_at: e.created_at, + })), + summary, + }), + }); + + if (!resp.ok) { + const body = await resp.text(); + return { ok: false, error: `HTTP ${resp.status}: ${body}` }; + } + + const result = await resp.json() as PushResult; + return result; + } +} \ No newline at end of file diff --git a/packages/dispatcher/src/config.ts b/packages/dispatcher/src/config.ts index a0ead11..0795e8c 100644 --- a/packages/dispatcher/src/config.ts +++ b/packages/dispatcher/src/config.ts @@ -17,11 +17,13 @@ const DEFAULTS: DispatcherConfig = { projections: [], }, discovery: undefined, + push: undefined, oc: { statusEndpoint: 'http://localhost:18789/plugins/session-status/status', statusToken: 'ograph-status-token-2026', minAvailable: 2, }, + agents: undefined, // 新增 agents 默认配置 intervals: { watcherIdle: 30_000, watcherActive: 5_000, @@ -91,5 +93,40 @@ export function loadConfig(): DispatcherConfig { config.discovery.agentId = parseInt(process.env['OGRAPH_AGENT_ID'], 10); } + // ── Push configuration overrides ─────────────────────────────────────────── + if (process.env['PUSH_TELEGRAM_BOT_TOKEN']) { + config.push = config.push ?? { method: 'telegram' }; + config.push.method = 'telegram'; + config.push.telegramBotToken = process.env['PUSH_TELEGRAM_BOT_TOKEN']; + } + if (process.env['PUSH_TELEGRAM_CHAT_ID']) { + config.push = config.push ?? { method: 'telegram' }; + config.push.telegramChatId = process.env['PUSH_TELEGRAM_CHAT_ID']; + } + + // ── Agent configuration overrides ─────────────────────────────────────────── + // 如果配置文件没有 agents 但有环境变量,创建默认 agent 配置 + if (process.env['AGENT_OC_URL'] || process.env['AGENT_OC_SECRET'] || process.env['AGENT_OC_ACTOR']) { + config.agents = config.agents ?? [{ + type: 'oc-plugin', + url: 'http://localhost:18789/plugins/ograph/dispatch', + secret: '', + actor: 'task-execution', + }]; + } + + // 环境变量覆盖第一个 agent 的配置 + if (config.agents && config.agents.length > 0) { + if (process.env['AGENT_OC_URL']) { + config.agents[0].url = process.env['AGENT_OC_URL']; + } + if (process.env['AGENT_OC_SECRET']) { + config.agents[0].secret = process.env['AGENT_OC_SECRET']; + } + if (process.env['AGENT_OC_ACTOR']) { + config.agents[0].actor = process.env['AGENT_OC_ACTOR']; + } + } + return config; } diff --git a/packages/dispatcher/src/index.ts b/packages/dispatcher/src/index.ts index d4ee86d..a4c75a4 100644 --- a/packages/dispatcher/src/index.ts +++ b/packages/dispatcher/src/index.ts @@ -4,12 +4,26 @@ import { loadConfig } from './config.js'; import { ProjectionWatcher } from './watcher.js'; import { OcScheduler } from './scheduler.js'; -import type { PendingEntry } from './types.js'; +import { OcPluginAgentClient } from './agent-client.js'; +import type { PendingEntry, AgentConfig } from './types.js'; +import type { AgentClient } from './agent-client.js'; function ts(): string { return new Date().toISOString(); } +function createAgentClient(agentConfig: AgentConfig): AgentClient { + switch (agentConfig.type) { + case 'oc-plugin': + return new OcPluginAgentClient(agentConfig.url, agentConfig.secret, agentConfig.actor); + case 'hermes': + // Future: implement Hermes client + throw new Error(`Agent type '${agentConfig.type}' not yet implemented`); + default: + throw new Error(`Unknown agent type: ${(agentConfig as any).type}`); + } +} + async function main(): Promise { console.log(`[${ts()}] [dispatcher] OGraph Dispatcher starting...`); @@ -18,19 +32,32 @@ async function main(): Promise { console.log(`[${ts()}] [dispatcher] config loaded`); console.log(`[${ts()}] [dispatcher] ograph.endpoint = ${config.ograph.endpoint}`); console.log(`[${ts()}] [dispatcher] ograph.projections = [${config.ograph.projections.join(', ')}]`); - console.log(`[${ts()}] [dispatcher] oc.statusEndpoint = ${config.oc.statusEndpoint}`); - console.log(`[${ts()}] [dispatcher] oc.minAvailable = ${config.oc.minAvailable}`); console.log(`[${ts()}] [dispatcher] intervals.watcherIdle = ${config.intervals.watcherIdle}ms`); console.log(`[${ts()}] [dispatcher] intervals.watcherActive = ${config.intervals.watcherActive}ms`); console.log(`[${ts()}] [dispatcher] intervals.schedulerIdle = ${config.intervals.schedulerIdle}ms`); console.log(`[${ts()}] [dispatcher] intervals.schedulerActive= ${config.intervals.schedulerActive}ms`); - console.log(`[${ts()}] [dispatcher] intervals.cooldownAfterPush = ${config.intervals.cooldownAfterPush}ms`); + + // Create agent clients from config + const agentClients: AgentClient[] = []; + if (config.agents && config.agents.length > 0) { + for (const agentConfig of config.agents) { + try { + const client = createAgentClient(agentConfig); + agentClients.push(client); + console.log(`[${ts()}] [dispatcher] agent client: ${client.name}`); + } catch (err) { + console.error(`[${ts()}] [dispatcher] failed to create agent client: ${err instanceof Error ? err.message : String(err)}`); + } + } + } else { + console.warn(`[${ts()}] [dispatcher] no agents configured in config.agents - scheduler will not push anywhere`); + } // Shared pending queue — Watcher writes, Scheduler reads + clears const pending: Map = new Map(); const watcher = new ProjectionWatcher(config, pending); - const scheduler = new OcScheduler(config, pending); + const scheduler = new OcScheduler(config, pending, agentClients); // Start both loops independently watcher.start(); @@ -58,4 +85,4 @@ async function main(): Promise { main().catch((err: unknown) => { console.error(`[${new Date().toISOString()}] [dispatcher] fatal: ${err instanceof Error ? err.message : String(err)}`); process.exit(1); -}); +}); \ No newline at end of file diff --git a/packages/dispatcher/src/scheduler.ts b/packages/dispatcher/src/scheduler.ts index 21e07f0..83d3cb1 100644 --- a/packages/dispatcher/src/scheduler.ts +++ b/packages/dispatcher/src/scheduler.ts @@ -1,37 +1,32 @@ // Loop B: OC Scheduler -// Polls OC busy/idle state, pushes pending queue when OC is available. +// Uses AgentClient interface to push events to various agents. -import { writeFileSync, mkdirSync } from 'node:fs'; -import { join } from 'node:path'; -import { execFileSync } from 'node:child_process'; -import type { DispatcherConfig, PendingEntry } from './types.js'; -import { OcClient } from './oc-client.js'; +import type { DispatcherConfig, PendingEntry, OGraphEvent } from './types.js'; +import type { AgentClient } from './agent-client.js'; function ts(): string { return new Date().toISOString(); } -/** Dispatch file path — a simple push mechanism readable by OC / other tools */ -const DISPATCH_FILE = '/tmp/ograph-dispatch.json'; - export class OcScheduler { - private readonly client: OcClient; private running = false; private timer: ReturnType | null = null; - private lastPushAt = 0; constructor( private readonly config: DispatcherConfig, /** Shared pending queue (read + cleared by Scheduler) */ private readonly pending: Map, - ) { - this.client = new OcClient(config); - } + /** Agent clients for pushing events */ + private readonly agentClients: AgentClient[], + ) {} start(): void { if (this.running) return; this.running = true; - console.log(`[${ts()}] [scheduler] started — OC status: ${this.config.oc.statusEndpoint}`); + console.log(`[${ts()}] [scheduler] started with ${this.agentClients.length} agent(s)`); + for (const client of this.agentClients) { + console.log(`[${ts()}] [scheduler] agent: ${client.name}`); + } void this.poll(); } @@ -54,46 +49,27 @@ export class OcScheduler { private async poll(): Promise { if (!this.running) return; - const { schedulerIdle, schedulerActive, cooldownAfterPush } = this.config.intervals; + const { schedulerIdle, schedulerActive } = this.config.intervals; try { const hasPending = this.pending.size > 0; - // Check cooldown first - const now = Date.now(); - if (this.lastPushAt > 0 && now - this.lastPushAt < cooldownAfterPush) { - const remainMs = cooldownAfterPush - (now - this.lastPushAt); - console.log(`[${ts()}] [scheduler] in cooldown — ${Math.ceil(remainMs / 1000)}s remaining`); - this.scheduleNext(hasPending ? schedulerActive : schedulerIdle); - return; - } - if (!hasPending) { // Nothing to push; low-frequency idle polling this.scheduleNext(schedulerIdle); return; } - // We have pending items — check OC availability - let available = false; - try { - available = await this.client.isAvailable(); - console.log(`[${ts()}] [scheduler] OC available=${available} pending=${this.pending.size}`); - } catch (err) { - console.warn(`[${ts()}] [scheduler] OC status check failed: ${err instanceof Error ? err.message : String(err)}`); - // Can't determine status — back off - this.scheduleNext(schedulerActive); + if (this.agentClients.length === 0) { + console.warn(`[${ts()}] [scheduler] no agent clients configured, clearing pending queue`); + this.pending.clear(); + this.scheduleNext(schedulerIdle); return; } - if (available) { - await this.push(); - this.lastPushAt = Date.now(); - this.scheduleNext(schedulerIdle); // after push, slow down - } else { - // OC busy but we have work — keep polling actively - this.scheduleNext(schedulerActive); - } + // We have pending items — push to all agents + await this.push(); + this.scheduleNext(schedulerIdle); // after push, slow down } catch (err) { console.error(`[${ts()}] [scheduler] poll error: ${err instanceof Error ? err.message : String(err)}`); this.scheduleNext(schedulerActive); @@ -104,92 +80,114 @@ export class OcScheduler { const entries = Array.from(this.pending.values()); if (entries.length === 0) return; - const message = this.buildMessage(entries); - - console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to OC`); - console.log(`[${ts()}] [scheduler] message:\n${message}`); - - // ── Strategy 1: write dispatch file ───────────────────────────────────── - try { - mkdirSync('/tmp', { recursive: true }); - const payload = { - pushedAt: new Date().toISOString(), - changes: entries, - message, - }; - writeFileSync(DISPATCH_FILE, JSON.stringify(payload, null, 2), 'utf-8'); - console.log(`[${ts()}] [scheduler] dispatch file written: ${DISPATCH_FILE}`); - } catch (err) { - console.warn(`[${ts()}] [scheduler] failed to write dispatch file: ${err instanceof Error ? err.message : String(err)}`); - } - - // ── Strategy 2: openclaw message send (best-effort) ────────────────────── - try { - execFileSync('openclaw', ['message', 'send', message], { timeout: 10_000, stdio: 'pipe' }); - console.log(`[${ts()}] [scheduler] openclaw message send OK`); - } catch (err) { - // Not fatal — openclaw may not be in PATH or the channel may not be configured - console.warn(`[${ts()}] [scheduler] openclaw message send failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`); - } - - // Clear pending after successful push attempt - this.pending.clear(); - console.log(`[${ts()}] [scheduler] pending queue cleared`); - } - - private buildMessage(entries: PendingEntry[]): string { - // 检查是否有事件流模式的条目 - const hasEvents = entries.some(entry => entry.name.startsWith('event:')); - - if (hasEvents) { - return this.buildEventMessage(entries); - } else { - return this.buildProjectionMessage(entries); - } - } - - private buildEventMessage(entries: PendingEntry[]): string { - const lines: string[] = ['📋 OGraph Event Stream Updates\n']; + console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to ${this.agentClients.length} agent(s)`); + // Build events array from pending entries + const events: OGraphEvent[] = []; for (const entry of entries) { if (entry.events && entry.events.length > 0) { - const event = entry.events[0]; // 只显示第一个事件(每个 entry 对应一个 event) - const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); - - // 根据 event type 格式化消息 - if (event.type_name === 'task_created') { - const payload = event.payload as any; - lines.push(`📋 新任务: #${payload.id} ${payload.title} (${payload.priority}) — 由${payload.creator || '系统'}创建`); - } else if (event.type_name === 'task_updated') { - const payload = event.payload as any; - lines.push(`📋 #${payload.id} 状态更新: ${payload.previous_status} → ${payload.new_status}`); - } else { - // 通用格式 - lines.push(`• **${event.type_name}** #${event.id}`); - lines.push(` Payload: ${JSON.stringify(event.payload)}`); - lines.push(` Age: ${age}s`); - } - lines.push(''); + // Event-stream mode: use the actual events + events.push(...entry.events); + } else { + // Projection mode: convert to synthetic events + const syntheticEvent: OGraphEvent = { + id: Date.now() + Math.floor(Math.random() * 1000), // synthetic ID + type_hash: 'projection_changed', + type_name: 'projection_changed', + payload: { + projection: entry.name, + previousValue: entry.previousValue, + currentValue: entry.currentValue, + changeCount: entry.changeCount, + firstDetectedAt: entry.firstDetectedAt, + lastDetectedAt: entry.lastDetectedAt, + }, + created_at: entry.lastDetectedAt, + }; + events.push(syntheticEvent); } } - lines.push(`Total: ${entries.length} event(s) detected`); - return lines.join('\n'); + const summary = this.buildSummary(entries, events); + console.log(`[${ts()}] [scheduler] summary: ${summary}`); + + // Push to all agent clients + const pushPromises = this.agentClients.map(async (client) => { + try { + const result = await client.push('task-execution', events, summary); + if (result.ok) { + console.log(`[${ts()}] [scheduler] ${client.name} push OK ${result.buffered ? `(buffered: ${result.buffered})` : ''}${result.sessionBusy ? ' (session busy)' : ''}`); + } else { + console.warn(`[${ts()}] [scheduler] ${client.name} push failed: ${result.error}`); + } + return result; + } catch (err) { + console.error(`[${ts()}] [scheduler] ${client.name} push error: ${err instanceof Error ? err.message : String(err)}`); + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } + }); + + const results = await Promise.allSettled(pushPromises); + const successCount = results.filter(r => r.status === 'fulfilled' && r.value.ok).length; + + if (successCount > 0) { + // At least one agent got the events successfully, clear pending + this.pending.clear(); + console.log(`[${ts()}] [scheduler] pending queue cleared (${successCount}/${this.agentClients.length} agents succeeded)`); + } else { + // All agents failed, keep pending for retry + console.warn(`[${ts()}] [scheduler] all agents failed, keeping pending for retry`); + } } - private buildProjectionMessage(entries: PendingEntry[]): string { - const lines: string[] = ['🔔 OGraph Projection Changes Detected\n']; + private buildSummary(entries: PendingEntry[], events: OGraphEvent[]): string { + // Check if we have event-stream mode entries + const hasEvents = entries.some(entry => entry.name.startsWith('event:')); + + if (hasEvents) { + return this.buildEventSummary(entries, events); + } else { + return this.buildProjectionSummary(entries); + } + } - for (const entry of entries) { - const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); - lines.push(`• **${entry.name}**`); - lines.push(` Changes: ${entry.changeCount} | Age: ${age}s`); - lines.push(` Previous: ${JSON.stringify(entry.previousValue)}`); - lines.push(` Current: ${JSON.stringify(entry.currentValue)}`); - lines.push(''); + private buildEventSummary(entries: PendingEntry[], events: OGraphEvent[]): string { + const lines: string[] = ['📋 OGraph Event Stream Updates']; + + for (const event of events.slice(0, 5)) { // limit to first 5 events + const age = Math.round((Date.now() - event.created_at) / 1000); + + // Format based on event type + if (event.type_name === 'task_created') { + const payload = event.payload as any; + lines.push(`📋 新任务: #${payload.subject} ${payload.title || '(无标题)'} (${payload.priority || 'normal'}) — 由 agent:${payload.creator || '?'} 创建`); + } else if (event.type_name === 'task_status_changed') { + const payload = event.payload as any; + lines.push(`📋 #${payload.subject} 状态更新: → ${payload.status}`); + } else { + lines.push(`• ${event.type_name} #${event.id} (${age}s ago)`); + } + } + + if (events.length > 5) { + lines.push(`... and ${events.length - 5} more events`); } - lines.push(`Total: ${entries.length} projection(s) changed`); return lines.join('\n'); } -} + + private buildProjectionSummary(entries: PendingEntry[]): string { + const lines: string[] = ['🔔 OGraph Projection Changes']; + + for (const entry of entries.slice(0, 3)) { // limit to first 3 + const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); + lines.push(`• ${entry.name}: ${entry.changeCount} change(s), ${age}s ago`); + } + + if (entries.length > 3) { + lines.push(`... and ${entries.length - 3} more projections`); + } + + return lines.join('\n'); + } +} \ No newline at end of file diff --git a/packages/dispatcher/src/types.ts b/packages/dispatcher/src/types.ts index 3b3a016..30960e1 100644 --- a/packages/dispatcher/src/types.ts +++ b/packages/dispatcher/src/types.ts @@ -1,5 +1,13 @@ // OGraph Dispatcher — shared types +export interface AgentConfig { + type: 'oc-plugin' | 'hermes'; // 未来可扩展 + url: string; + secret: string; + actor: string; // 固定 actor 名 + actorPattern?: string; // 动态 actor(P2b 用,现在可选) +} + export interface DispatcherConfig { ograph: { endpoint: string; // OGraph API endpoint @@ -10,11 +18,17 @@ export interface DispatcherConfig { agentId: number; // 当前 agent 的 object id eventTypes?: string[]; // 只关注哪些 event type(可选过滤) }; + push?: { + method: 'telegram' | 'file'; // 推送方式 + telegramBotToken?: string; + telegramChatId?: string; + }; oc: { statusEndpoint: string; // session-status plugin URL statusToken: string; // Bearer token minAvailable: number; // 最少空闲槽位(默认 2) }; + agents?: AgentConfig[]; // 新的 AgentClient 配置 intervals: { watcherIdle: number; // 无变化时 poll 间隔 ms(默认 30000) watcherActive: number; // 有变化时 poll 间隔 ms(默认 5000)