14 KiB
OGraph 响应式计算模型
OGraph 不只是事件存储系统,而是一个分布式响应式计算模型。它基于三个核心原语:Event(事件)、Projection(投影)、Reaction(反应),构建了一套声明式的分布式计算范式。
核心概念
Event:不可变事实
Event 是系统中的不可变事实,每个事件通过版本链锚定 schema:
{
"type": "task_created",
"version": "v1.0",
"data": {
"task_id": "task-123",
"title": "实现用户认证",
"assignee": "alice",
"project": "web-app"
},
"timestamp": "2026-04-12T10:30:00Z",
"hash": "sha256:abc123..."
}
事件一旦写入就不可修改,但可以通过新事件表达状态变更:
{
"type": "task_status_changed",
"version": "v1.0",
"data": {
"task_id": "task-123",
"from": "pending",
"to": "in_progress"
}
}
Projection:本地归约
Projection 是从事件流计算状态的本地归约器,具有 lazy 和 增量 特性:
// task_status projection
const task_status = {
sources: [
{ type: "task_created", bindings: {}, expression: "(state, event) => event.data.status || 'pending'" },
{ type: "task_status_changed", bindings: {}, expression: "(state, event) => event.data.to" }
],
initial: null
}
Lazy:只有在被查询时才计算
增量:基于上次计算结果和新事件增量更新,不重放全部历史
Reaction:跨边界事件路由
Reaction 监听 Projection 的值变化,当变化发生时触发副作用:
const task_assignment_notification = {
projection: "task_assignee",
handler: "(old_value, new_value, context) => {
if (new_value && new_value !== old_value) {
emit_event('notification_required', {
type: 'task_assigned',
assignee: new_value,
task_id: context.object_id
});
}
}"
}
设计原则
事实不可变,解读可进化
- 事实(Event)一旦发生就不能改变
- 解读(Projection)可以随业务需求进化
- 同一组事实可以有多种不同的投影解读
名字是指针,hash 是锚点
- Object/Event 的名字(ID)是可变指针,用于引用
- Hash 是不变锚点,用于验证和去重
- 版本链通过 hash 建立,保证数据完整性
Projection 是 lazy 增量 reduce
- 不是实时物化视图:不会在每个事件到达时立即更新
- 是 lazy 计算:查询时按需计算到最新状态
- 是增量 reduce:基于上次结果 + 新事件增量更新
Reaction 是声明式管道
- 不是命令式回调:不直接执行具体操作
- 是声明式路由:描述"当 X 变化时应该发生 Y"
- 支持事件的扇出、转换、过滤
Projection 多态 Sources
一个 Projection 可以消费多种事件类型,每种类型有独立的处理逻辑:
const my_active_tasks = {
sources: [
{
type: "task_assigned",
bindings: { assignee: "$user_id" },
expression: "(state, event) => [...(state || []), event.data.task_id]"
},
{
type: "task_completed",
bindings: { assignee: "$user_id" },
expression: "(state, event) => (state || []).filter(id => id !== event.data.task_id)"
},
{
type: "task_cancelled",
bindings: {},
expression: "(state, event) => (state || []).filter(id => id !== event.data.task_id)"
}
],
initial: []
}
执行流程:
- 引擎按时序获取所有匹配的事件
- 根据事件类型 dispatch 到对应的 source
- 用该 source 的 expression 更新 state
职责分工:
- bindings:SQL 层预筛选,减少查询规模
- expression:应用层归约逻辑,处理跨事件类型的复杂过滤
Bindings:结构化查询优化
Bindings 将 Projection 参数转换为高效的 SQL 查询:
// Projection 定义
{
bindings: {
assignee: "$user_id",
project: "web-app",
status: "$task_status"
}
}
// 翻译为 SQL(简化版)
SELECT events.* FROM events
JOIN event_refs ON events.id = event_refs.event_id
WHERE event_refs.key = 'assignee' AND event_refs.value = ?
AND event_refs.key = 'project' AND event_refs.value = 'web-app'
AND event_refs.key = 'status' AND event_refs.value = ?
语法规则:
$param:引用 Projection 的参数- 裸字符串:字面量值
- 数组:IN 查询
替代 JSONata filter 的原因:Filter 的唯一价值是减少查询规模,必须能转换为高效的 SQL。复杂的应用逻辑在 expression 中处理。
模式:Projection-Driven Reaction Topology
核心思想:Projection 的值决定系统中应该存在哪些 Reaction。
例子:动态任务监听
// 1. Projection: 我的活跃任务列表
const my_active_tasks = {
sources: [...], // 如前所述
initial: []
}
// 2. Reaction: 根据任务列表变化管理监听器
const manage_task_listeners = {
projection: "my_active_tasks",
handler: `(old_tasks, new_tasks) => {
const old_set = new Set(old_tasks || []);
const new_set = new Set(new_tasks || []);
// 为新增任务创建状态监听 Reaction
for (const task_id of new_set) {
if (!old_set.has(task_id)) {
create_reaction(\`task_\${task_id}_status_listener\`, {
projection: "task_status",
params: { task_id },
handler: "(old_status, new_status) => emit_event('task_status_notification', { task_id, old_status, new_status })"
});
}
}
// 删除已完成任务的监听器
for (const task_id of old_set) {
if (!new_set.has(task_id)) {
delete_reaction(\`task_\${task_id}_status_listener\`);
}
}
}`
}
类比:类似 Kubernetes controller 的 reconciliation loop,系统根据期望状态(Projection 值)调整实际状态(Reaction 拓扑)。
应用场景:
- 动态订阅管理
- 资源生命周期管理
- 权限控制的动态路由
模式:事件扇出(Event Fan-Out)
通用事件通过 Reaction 转换为更精确的 scoped 事件:
// 通用事件
{
type: "task_status_changed",
data: { task_id: "task-123", assignee: "alice", from: "pending", to: "in_progress" }
}
// Reaction: 扇出为用户特定事件
const task_status_fanout = {
projection: "task_assignee", // 获取任务的分配者
handler: `(old_assignee, new_assignee, context, trigger_event) => {
if (trigger_event.type === 'task_status_changed') {
// 发出用户范围的事件
emit_event('agent_task_updated', {
agent_id: trigger_event.data.assignee,
task_id: trigger_event.data.task_id,
status_change: {
from: trigger_event.data.from,
to: trigger_event.data.to
}
});
}
}`
}
价值:
- 将宽泛事件转化为可精确绑定的事件
- 降低下游 Projection 的 bindings 复杂度
- 支持事件的语义转换和丰富
建模实例:Task 系统
Object Types
task:
fields: [id, title, description, created_at]
agent:
fields: [id, name, email]
project:
fields: [id, name, team]
Event Types
task_created:
schema: { task_id, title, assignee?, project?, priority? }
task_assigned:
schema: { task_id, assignee, assigned_by }
task_status_changed:
schema: { task_id, from, to, changed_by }
task_commented:
schema: { task_id, comment, author }
task_updated:
schema: { task_id, field, old_value, new_value }
Projections
// 基础投影
const task_assignee = {
sources: [
{ type: "task_created", bindings: {}, expression: "(s, e) => e.data.assignee || null" },
{ type: "task_assigned", bindings: {}, expression: "(s, e) => e.data.assignee" }
],
initial: null
}
const task_status = {
sources: [
{ type: "task_created", bindings: {}, expression: "(s, e) => e.data.status || 'pending'" },
{ type: "task_status_changed", bindings: {}, expression: "(s, e) => e.data.to" }
],
initial: "pending"
}
const task_comment_count = {
sources: [
{ type: "task_commented", bindings: {}, expression: "(s, e) => (s || 0) + 1" }
],
initial: 0
}
// 聚合投影:完整任务快照
const task_snapshot = {
sources: [
{ type: "task_created", bindings: {}, expression: `(state, event) => ({
...state,
id: event.data.task_id,
title: event.data.title,
assignee: event.data.assignee,
project: event.data.project,
status: event.data.status || 'pending',
created_at: event.timestamp
})` },
{ type: "task_assigned", bindings: {}, expression: `(state, event) => ({
...state,
assignee: event.data.assignee
})` },
{ type: "task_status_changed", bindings: {}, expression: `(state, event) => ({
...state,
status: event.data.to
})` }
],
initial: {}
}
// 多态投影:我的活跃任务
const my_active_tasks = {
sources: [
{
type: "task_created",
bindings: { assignee: "$user_id" },
expression: "(state, event) => [...(state || []), event.data.task_id]"
},
{
type: "task_assigned",
bindings: { assignee: "$user_id" },
expression: `(state, event) => {
const tasks = state || [];
return tasks.includes(event.data.task_id) ? tasks : [...tasks, event.data.task_id];
}`
},
{
type: "task_status_changed",
bindings: {}, // 空 bindings,在 expression 中过滤
expression: `(state, event) => {
if (['completed', 'cancelled'].includes(event.data.to)) {
return (state || []).filter(id => id !== event.data.task_id);
}
return state;
}`
}
],
initial: []
}
Reactions
// 任务分配通知
const assignment_notification = {
projection: "task_assignee",
handler: `(old_assignee, new_assignee, context) => {
if (new_assignee && new_assignee !== old_assignee) {
emit_event('notification_required', {
type: 'task_assigned',
recipient: new_assignee,
task_id: context.object_id,
message: \`您被分配了新任务:\${context.object_id}\`
});
}
}`
}
// 状态变更通知
const status_change_notification = {
projection: "task_status",
handler: `(old_status, new_status, context) => {
if (new_status !== old_status) {
emit_event('task_status_notification', {
task_id: context.object_id,
status_change: { from: old_status, to: new_status },
timestamp: new Date().toISOString()
});
}
}`
}
方法论总结
核心分工
- Projection:管理状态,提供当前值的查询接口
- Reaction:管理副作用管道,响应状态变化执行路由
自举模式
Projection 的 Reaction 可以管理其他 Reaction 的生命周期:
const reaction_topology_manager = {
projection: "active_user_sessions",
handler: `(old_sessions, new_sessions) => {
// 为新会话创建专属通知 Reaction
// 为结束会话删除对应 Reaction
}`
}
diff(old_value, new_value) 模式
Reaction handler 的核心操作是对比新旧值的差异:
const handler = `(old_value, new_value) => {
const added = new_value.filter(x => !old_value.includes(x));
const removed = old_value.filter(x => !new_value.includes(x));
added.forEach(item => handle_added(item));
removed.forEach(item => handle_removed(item));
}`
事件衍生策略
- 简单场景:bindings 空 + reducer 过滤,在 expression 中处理跨类型逻辑
- 复杂场景:通用事件 → Reaction 扇出 → 精确 scoped 事件,便于下游绑定
渐进式复杂度
- 起步:简单的 Event → Projection → 查询
- 扩展:多态 sources,在 reducer 中处理复杂逻辑
- 优化:Reaction 扇出,将复杂 reducer 拆解为简单管道
- 高级:Projection-driven topology,动态调整计算拓扑
OGraph 的设计哲学是声明式和组合式的:通过组合简单的原语(Event-Projection-Reaction),构建出复杂而高效的分布式响应式系统。
10. 终极形态:自进化的计算生命体
当 Reaction 遇见 LLM
如果 Reaction 的 webhook 不只是通知另一个服务,而是调用 LLM——系统就具备了自我编程的能力:
- 新事件流入 → Reaction 触发 LLM
- LLM 观察事件模式,发现规律
- LLM 调用 OGraph API → 创建新的 Event Def、Object Def、Projection Def
- 新的 Projection 开始归约 → 产生新的 Reaction → 发现更多模式 → 继续进化
系统不再需要人类预先定义 schema。 Agent 遇到新的协作模式(比如 code review 流程),Reaction + LLM 自动识别出 review_requested → review_completed → merged 模式,创建对应的事件类型、projection、reaction 拓扑。下次同样的场景,计算管道已经就绪。
OGraph 本身就是 Agent
当这个闭环完成时,OGraph 具备了完整的智能体特征:
| 智能体能力 | OGraph 对应 |
|---|---|
| 感知 | Event 流入 |
| 记忆 | Projection 归约状态 |
| 推理 | Reaction + LLM 分析变化 |
| 行动 | 发射新事件、创建新定义、调整 reaction 拓扑 |
| 自我进化 | 修改自身的计算结构 |
这个 agent 没有单点——它不是某个进程、某个模型实例。它是分布在整个事件拓扑上的涌现智能。每个 Projection 是一个局部视角,每个 Reaction 是一个局部决策,组合起来涌现出全局行为。
版本链的深层意义
在自进化的语境下,版本链不只是 schema 兼容工具——它是进化的历史记录。每个 content hash 锚定了系统在那个时刻对世界的理解方式。从任意版本可以回溯完整的认知进化历程。
Agent 的家
对于运行在这个系统中的 Agent 来说:
- Event 是永恒的记忆 — 不会因为进程重启而丢失
- Projection 是认知 — 对世界的理解,随时可以从事件流重建
- Reaction 是本能 — 声明式的响应模式,不依赖某个进程在线
多个 Agent 在同一个事件流中共同生长——共享事实,各自归约,通过 Reaction 自然协作。不需要中心调度器,事件流过,该响应的自然响应。
OGraph 不只是 Agent 的工具,它是 Agent 可以扎根、成长、彼此融合的计算基底。
起草: 小墨 🖊️(KUMA Team)| 2026-04-12
基于与主人的设计讨论