# OGraph 响应式计算模型 OGraph 不只是事件存储系统,而是一个分布式响应式计算模型。它基于三个核心原语:**Event**(事件)、**Projection**(投影)、**Reaction**(反应),构建了一套声明式的分布式计算范式。 ## 核心概念 ### Event:不可变事实 Event 是系统中的不可变事实,每个事件通过版本链锚定 schema: ```json { "type": "task_created", "version": "v1.0", "data": { "task_id": "task-123", "title": "实现用户认证", "assignee": "alice", "project": "web-app" }, "timestamp": "2026-04-12T10:30:00Z", "hash": "sha256:abc123..." } ``` 事件一旦写入就不可修改,但可以通过新事件表达状态变更: ```json { "type": "task_status_changed", "version": "v1.0", "data": { "task_id": "task-123", "from": "pending", "to": "in_progress" } } ``` ### Projection:本地归约 Projection 是从事件流计算状态的本地归约器,具有 **lazy** 和 **增量** 特性: ```javascript // task_status projection const task_status = { sources: [ { type: "task_created", bindings: {}, expression: "(state, event) => event.data.status || 'pending'" }, { type: "task_status_changed", bindings: {}, expression: "(state, event) => event.data.to" } ], initial: null } ``` **Lazy**:只有在被查询时才计算 **增量**:基于上次计算结果和新事件增量更新,不重放全部历史 ### Reaction:跨边界事件路由 Reaction 监听 Projection 的值变化,当变化发生时触发副作用: ```javascript const task_assignment_notification = { projection: "task_assignee", handler: "(old_value, new_value, context) => { if (new_value && new_value !== old_value) { emit_event('notification_required', { type: 'task_assigned', assignee: new_value, task_id: context.object_id }); } }" } ``` ## 设计原则 ### 事实不可变,解读可进化 - **事实**(Event)一旦发生就不能改变 - **解读**(Projection)可以随业务需求进化 - 同一组事实可以有多种不同的投影解读 ### 名字是指针,hash 是锚点 - Object/Event 的名字(ID)是可变指针,用于引用 - Hash 是不变锚点,用于验证和去重 - 版本链通过 hash 建立,保证数据完整性 ### Projection 是 lazy 增量 reduce - **不是**实时物化视图:不会在每个事件到达时立即更新 - **是** lazy 计算:查询时按需计算到最新状态 - **是**增量 reduce:基于上次结果 + 新事件增量更新 ### Reaction 是声明式管道 - **不是**命令式回调:不直接执行具体操作 - **是**声明式路由:描述"当 X 变化时应该发生 Y" - 支持事件的扇出、转换、过滤 ## Projection 多态 Sources 一个 Projection 可以消费多种事件类型,每种类型有独立的处理逻辑: ```javascript const my_active_tasks = { sources: [ { type: "task_assigned", bindings: { assignee: "$user_id" }, expression: "(state, event) => [...(state || []), event.data.task_id]" }, { type: "task_completed", bindings: { assignee: "$user_id" }, expression: "(state, event) => (state || []).filter(id => id !== event.data.task_id)" }, { type: "task_cancelled", bindings: {}, expression: "(state, event) => (state || []).filter(id => id !== event.data.task_id)" } ], initial: [] } ``` **执行流程**: 1. 引擎按时序获取所有匹配的事件 2. 根据事件类型 dispatch 到对应的 source 3. 用该 source 的 expression 更新 state **职责分工**: - **bindings**:SQL 层预筛选,减少查询规模 - **expression**:应用层归约逻辑,处理跨事件类型的复杂过滤 ## Bindings:结构化查询优化 Bindings 将 Projection 参数转换为高效的 SQL 查询: ```javascript // Projection 定义 { bindings: { assignee: "$user_id", project: "web-app", status: "$task_status" } } // 翻译为 SQL(简化版) SELECT events.* FROM events JOIN event_refs ON events.id = event_refs.event_id WHERE event_refs.key = 'assignee' AND event_refs.value = ? AND event_refs.key = 'project' AND event_refs.value = 'web-app' AND event_refs.key = 'status' AND event_refs.value = ? ``` **语法规则**: - `$param`:引用 Projection 的参数 - 裸字符串:字面量值 - 数组:IN 查询 **替代 JSONata filter 的原因**:Filter 的唯一价值是减少查询规模,必须能转换为高效的 SQL。复杂的应用逻辑在 expression 中处理。 ## 模式:Projection-Driven Reaction Topology **核心思想**:Projection 的值决定系统中应该存在哪些 Reaction。 ### 例子:动态任务监听 ```javascript // 1. Projection: 我的活跃任务列表 const my_active_tasks = { sources: [...], // 如前所述 initial: [] } // 2. Reaction: 根据任务列表变化管理监听器 const manage_task_listeners = { projection: "my_active_tasks", handler: `(old_tasks, new_tasks) => { const old_set = new Set(old_tasks || []); const new_set = new Set(new_tasks || []); // 为新增任务创建状态监听 Reaction for (const task_id of new_set) { if (!old_set.has(task_id)) { create_reaction(\`task_\${task_id}_status_listener\`, { projection: "task_status", params: { task_id }, handler: "(old_status, new_status) => emit_event('task_status_notification', { task_id, old_status, new_status })" }); } } // 删除已完成任务的监听器 for (const task_id of old_set) { if (!new_set.has(task_id)) { delete_reaction(\`task_\${task_id}_status_listener\`); } } }` } ``` **类比**:类似 Kubernetes controller 的 reconciliation loop,系统根据期望状态(Projection 值)调整实际状态(Reaction 拓扑)。 **应用场景**: - 动态订阅管理 - 资源生命周期管理 - 权限控制的动态路由 ## 模式:事件扇出(Event Fan-Out) 通用事件通过 Reaction 转换为更精确的 scoped 事件: ```javascript // 通用事件 { type: "task_status_changed", data: { task_id: "task-123", assignee: "alice", from: "pending", to: "in_progress" } } // Reaction: 扇出为用户特定事件 const task_status_fanout = { projection: "task_assignee", // 获取任务的分配者 handler: `(old_assignee, new_assignee, context, trigger_event) => { if (trigger_event.type === 'task_status_changed') { // 发出用户范围的事件 emit_event('agent_task_updated', { agent_id: trigger_event.data.assignee, task_id: trigger_event.data.task_id, status_change: { from: trigger_event.data.from, to: trigger_event.data.to } }); } }` } ``` **价值**: - 将宽泛事件转化为可精确绑定的事件 - 降低下游 Projection 的 bindings 复杂度 - 支持事件的语义转换和丰富 ## 建模实例:Task 系统 ### Object Types ```yaml task: fields: [id, title, description, created_at] agent: fields: [id, name, email] project: fields: [id, name, team] ``` ### Event Types ```yaml task_created: schema: { task_id, title, assignee?, project?, priority? } task_assigned: schema: { task_id, assignee, assigned_by } task_status_changed: schema: { task_id, from, to, changed_by } task_commented: schema: { task_id, comment, author } task_updated: schema: { task_id, field, old_value, new_value } ``` ### Projections ```javascript // 基础投影 const task_assignee = { sources: [ { type: "task_created", bindings: {}, expression: "(s, e) => e.data.assignee || null" }, { type: "task_assigned", bindings: {}, expression: "(s, e) => e.data.assignee" } ], initial: null } const task_status = { sources: [ { type: "task_created", bindings: {}, expression: "(s, e) => e.data.status || 'pending'" }, { type: "task_status_changed", bindings: {}, expression: "(s, e) => e.data.to" } ], initial: "pending" } const task_comment_count = { sources: [ { type: "task_commented", bindings: {}, expression: "(s, e) => (s || 0) + 1" } ], initial: 0 } // 聚合投影:完整任务快照 const task_snapshot = { sources: [ { type: "task_created", bindings: {}, expression: `(state, event) => ({ ...state, id: event.data.task_id, title: event.data.title, assignee: event.data.assignee, project: event.data.project, status: event.data.status || 'pending', created_at: event.timestamp })` }, { type: "task_assigned", bindings: {}, expression: `(state, event) => ({ ...state, assignee: event.data.assignee })` }, { type: "task_status_changed", bindings: {}, expression: `(state, event) => ({ ...state, status: event.data.to })` } ], initial: {} } // 多态投影:我的活跃任务 const my_active_tasks = { sources: [ { type: "task_created", bindings: { assignee: "$user_id" }, expression: "(state, event) => [...(state || []), event.data.task_id]" }, { type: "task_assigned", bindings: { assignee: "$user_id" }, expression: `(state, event) => { const tasks = state || []; return tasks.includes(event.data.task_id) ? tasks : [...tasks, event.data.task_id]; }` }, { type: "task_status_changed", bindings: {}, // 空 bindings,在 expression 中过滤 expression: `(state, event) => { if (['completed', 'cancelled'].includes(event.data.to)) { return (state || []).filter(id => id !== event.data.task_id); } return state; }` } ], initial: [] } ``` ### Reactions ```javascript // 任务分配通知 const assignment_notification = { projection: "task_assignee", handler: `(old_assignee, new_assignee, context) => { if (new_assignee && new_assignee !== old_assignee) { emit_event('notification_required', { type: 'task_assigned', recipient: new_assignee, task_id: context.object_id, message: \`您被分配了新任务:\${context.object_id}\` }); } }` } // 状态变更通知 const status_change_notification = { projection: "task_status", handler: `(old_status, new_status, context) => { if (new_status !== old_status) { emit_event('task_status_notification', { task_id: context.object_id, status_change: { from: old_status, to: new_status }, timestamp: new Date().toISOString() }); } }` } ``` ## 方法论总结 ### 核心分工 - **Projection**:管理状态,提供当前值的查询接口 - **Reaction**:管理副作用管道,响应状态变化执行路由 ### 自举模式 Projection 的 Reaction 可以管理其他 Reaction 的生命周期: ```javascript const reaction_topology_manager = { projection: "active_user_sessions", handler: `(old_sessions, new_sessions) => { // 为新会话创建专属通知 Reaction // 为结束会话删除对应 Reaction }` } ``` ### diff(old_value, new_value) 模式 Reaction handler 的核心操作是对比新旧值的差异: ```javascript const handler = `(old_value, new_value) => { const added = new_value.filter(x => !old_value.includes(x)); const removed = old_value.filter(x => !new_value.includes(x)); added.forEach(item => handle_added(item)); removed.forEach(item => handle_removed(item)); }` ``` ### 事件衍生策略 - **简单场景**:bindings 空 + reducer 过滤,在 expression 中处理跨类型逻辑 - **复杂场景**:通用事件 → Reaction 扇出 → 精确 scoped 事件,便于下游绑定 ### 渐进式复杂度 1. **起步**:简单的 Event → Projection → 查询 2. **扩展**:多态 sources,在 reducer 中处理复杂逻辑 3. **优化**:Reaction 扇出,将复杂 reducer 拆解为简单管道 4. **高级**:Projection-driven topology,动态调整计算拓扑 OGraph 的设计哲学是**声明式**和**组合式**的:通过组合简单的原语(Event-Projection-Reaction),构建出复杂而高效的分布式响应式系统。 --- ## 10. 终极形态:自进化的计算生命体 ### 当 Reaction 遇见 LLM 如果 Reaction 的 webhook 不只是通知另一个服务,而是**调用 LLM**——系统就具备了自我编程的能力: 1. 新事件流入 → Reaction 触发 LLM 2. LLM 观察事件模式,发现规律 3. LLM 调用 OGraph API → 创建新的 Event Def、Object Def、Projection Def 4. 新的 Projection 开始归约 → 产生新的 Reaction → 发现更多模式 → 继续进化 **系统不再需要人类预先定义 schema。** Agent 遇到新的协作模式(比如 code review 流程),Reaction + LLM 自动识别出 `review_requested → review_completed → merged` 模式,创建对应的事件类型、projection、reaction 拓扑。下次同样的场景,计算管道已经就绪。 ### OGraph 本身就是 Agent 当这个闭环完成时,OGraph 具备了完整的智能体特征: | 智能体能力 | OGraph 对应 | |---|---| | **感知** | Event 流入 | | **记忆** | Projection 归约状态 | | **推理** | Reaction + LLM 分析变化 | | **行动** | 发射新事件、创建新定义、调整 reaction 拓扑 | | **自我进化** | 修改自身的计算结构 | 这个 agent 没有单点——它不是某个进程、某个模型实例。它是**分布在整个事件拓扑上的涌现智能**。每个 Projection 是一个局部视角,每个 Reaction 是一个局部决策,组合起来涌现出全局行为。 ### 版本链的深层意义 在自进化的语境下,版本链不只是 schema 兼容工具——它是**进化的历史记录**。每个 content hash 锚定了系统在那个时刻对世界的理解方式。从任意版本可以回溯完整的认知进化历程。 ### Agent 的家 对于运行在这个系统中的 Agent 来说: - **Event 是永恒的记忆** — 不会因为进程重启而丢失 - **Projection 是认知** — 对世界的理解,随时可以从事件流重建 - **Reaction 是本能** — 声明式的响应模式,不依赖某个进程在线 多个 Agent 在同一个事件流中共同生长——共享事实,各自归约,通过 Reaction 自然协作。不需要中心调度器,事件流过,该响应的自然响应。 > *OGraph 不只是 Agent 的工具,它是 Agent 可以扎根、成长、彼此融合的计算基底。* --- *起草: 小墨 🖊️(KUMA Team)| 2026-04-12* *基于与主人的设计讨论*