refactor(rfc-001): Reflex 作为 Event Mesh,Sense 不知道 Workflow
- 删除 Sense → Workflow 桥接(ThreadStart payload),Sense 只产出数据 - Reflex 新增两种 action:TriggerCompute / StartWorkflow - 引入 Event Mesh 概念:所有事件路由声明在 Reflex 中 - 更新依赖图、Haskell 类型签名、设计原则 - 删除 should-cleanup sense(改由 Reflex when 守卫实现) 小橘 <xiaoju@shazhou.work>
This commit is contained in:
@@ -84,34 +84,38 @@ export async function compute(): Promise<Task[] | null> {
|
||||
|
||||
这是合理的,因为 Signal 本质是 append-only 的时序数据,不会跨 Sense join。每个 Sense 最了解自己的查询模式。
|
||||
|
||||
#### Sense → Workflow 的桥接
|
||||
#### Sense 不知道 Workflow
|
||||
|
||||
当 Payload 类型是 `ThreadStart` 时,引擎看到非 null 的返回值就自动发起 Workflow Thread。不需要额外的"桥"概念——这只是 `Maybe (Payload a)` 的一个特例。
|
||||
Sense 只感知世界并产出 Signal,永远不关心"谁在听"或"听了之后要做什么"。Sense 不引用 Workflow、不返回 `ThreadStart`、不知道自己的 Signal 会触发什么动作。
|
||||
|
||||
```typescript
|
||||
// senses/should-cleanup.ts
|
||||
// Payload = ThreadStart,null 则不触发
|
||||
export async function compute(): Promise<ThreadStart | null> {
|
||||
const disk = diskDb.prepare(
|
||||
'SELECT value FROM samples ORDER BY ts DESC LIMIT 1'
|
||||
).get()
|
||||
return disk.value > 90
|
||||
? { workflowId: 'cleanup', context: { usage: disk.value } }
|
||||
: null // 磁盘正常,静默
|
||||
// senses/disk-usage.ts — 只关心磁盘状态
|
||||
export async function compute(): Promise<DiskUsage | null> {
|
||||
const usage = await getDiskUsage()
|
||||
return usage // 纯事实,不关心下游
|
||||
}
|
||||
```
|
||||
|
||||
启动 Workflow 是 Reflex 的职责,见 §4.2。
|
||||
|
||||
### 4.2 Reflex
|
||||
|
||||
Reflex 是纯声明式的 YAML 配置,定义什么时候执行某个 Sense 的 compute。
|
||||
Reflex 是纯声明式的 YAML 配置,定义**引擎对 Signal 的反应**。Reflex 是连接 Sense 与 Sense、Sense 与 Workflow 的唯一纽带。
|
||||
|
||||
Reflex 有两种 action:
|
||||
|
||||
| action | 含义 |
|
||||
|--------|------|
|
||||
| 触发 compute | 让某个 Sense 重新计算(默认行为) |
|
||||
| 启动 workflow | 创建一个 Thread(Post-MVP) |
|
||||
|
||||
```yaml
|
||||
# nerve.yaml
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
throttle: 5s # 无论怎么触发,最快 5 秒一次
|
||||
timeout: 3s # compute 超时则 abort,记录错误 signal
|
||||
throttle: 5s
|
||||
timeout: 3s
|
||||
disk-usage:
|
||||
group: system
|
||||
throttle: 30s
|
||||
@@ -119,9 +123,9 @@ senses:
|
||||
group: tasks
|
||||
throttle: 10s
|
||||
timeout: 30s
|
||||
should-cleanup: {}
|
||||
|
||||
reflexes:
|
||||
# Sense → Sense(触发 compute,默认 action)
|
||||
- sense: cpu-usage
|
||||
interval: 10s
|
||||
|
||||
@@ -132,8 +136,10 @@ reflexes:
|
||||
on: ["task.created", "task.completed"]
|
||||
interval: 10m
|
||||
|
||||
- sense: should-cleanup
|
||||
# Sense → Workflow(启动 thread)
|
||||
- workflow: cleanup
|
||||
on: ["disk-usage"]
|
||||
when: "signal.value > 90" # 条件守卫,可选
|
||||
```
|
||||
|
||||
两种触发条件:
|
||||
@@ -143,10 +149,23 @@ reflexes:
|
||||
| `interval` | 定时触发 |
|
||||
| `on: [signals]` | 当指定的 Sense 发出新 Signal 时触发 |
|
||||
|
||||
一个 Sense 可以有多个触发条件(如 `active-tasks` 同时有事件触发和定时兜底)。
|
||||
一个 Reflex 可以有多个触发条件(如 `active-tasks` 同时有事件触发和定时兜底)。
|
||||
|
||||
OnDemand(按需触发)不需要声明——引擎内置提供,任何 Sense 都可以被外部 API 调用触发。
|
||||
|
||||
#### Reflex 是 Event Mesh
|
||||
|
||||
所有的 Reflex 声明合在一起,构成了一个声明式的事件路由网格(Event Mesh)。引擎启动时解析所有 Reflex,构建出完整的事件流拓扑:
|
||||
|
||||
```
|
||||
Signal Bus ──→ Reflex Mesh ──→ Sense compute
|
||||
──→ Workflow thread
|
||||
```
|
||||
|
||||
每个 Signal 进入 bus 后,Reflex Mesh 决定它的去向——触发哪些 Sense 重算、启动哪些 Workflow。这不是 Sense 主动"发消息"给下游,而是引擎根据声明式规则自动路由。
|
||||
|
||||
**进程视角:** Sense Worker 只负责执行 compute 并把 Signal 交给 Kernel。Kernel 里的 Reflex Mesh 决定下一步——该触发谁的 compute、该起哪个 Workflow Worker。Worker 之间永远不直连,所有路由决策都在 Kernel 完成。
|
||||
|
||||
#### Reflex 语义规则
|
||||
|
||||
**Interval 起点**:以库中记录的上次 compute 完成时间为准,不是 daemon 启动时间。daemon 重启时,若已过期则立即执行一次,然后恢复正常节奏。不会对停机期间的缺失进行逐次补偿。
|
||||
@@ -431,10 +450,10 @@ class Sense a where
|
||||
type Payload a
|
||||
compute :: IO (Maybe (Payload a)) -- Nothing → 静默,Just → 发 signal
|
||||
|
||||
-- Reflex 是纯数据
|
||||
-- Reflex 是纯数据,连接 Signal 与 Action
|
||||
data Reflex = Reflex
|
||||
{ target :: SenseId
|
||||
, condition :: ReflexCondition
|
||||
{ condition :: ReflexCondition
|
||||
, action :: ReflexAction
|
||||
, enabled :: Bool
|
||||
}
|
||||
|
||||
@@ -443,6 +462,11 @@ data ReflexCondition
|
||||
| OnSignal [SenseId]
|
||||
-- OnDemand 是引擎内置能力,不需要声明
|
||||
|
||||
data ReflexAction
|
||||
= TriggerCompute SenseId -- 触发 Sense 重算
|
||||
| StartWorkflow WorkflowId Guard -- 启动 Workflow Thread
|
||||
-- Reflex 构成 Event Mesh:Signal → ReflexCondition → ReflexAction
|
||||
|
||||
-- Workflow 内部
|
||||
moderate :: Thread -> CommandEvent -> (RoleId, Prompt) -- 纯函数 ✅
|
||||
execute :: Role -> Prompt -> IO CommandEvent -- 有副作用 ❌
|
||||
@@ -463,9 +487,8 @@ execute :: Role -> Prompt -> IO CommandEvent -- 有副作用 ❌
|
||||
|
||||
```
|
||||
Reflex ──→ Sense ──→ Sense (复合依赖)
|
||||
│
|
||||
▼ (Payload = ThreadStart)
|
||||
Workflow
|
||||
│
|
||||
└──→ Workflow (Reflex 声明触发)
|
||||
```
|
||||
|
||||
### 软删除与级联
|
||||
@@ -475,7 +498,7 @@ Reflex ──→ Sense ──→ Sense (复合依赖)
|
||||
| 操作 | 级联 |
|
||||
|------|------|
|
||||
| disable Sense | disable 依赖它的 Reflex + 依赖它的复合 Sense |
|
||||
| disable Workflow | disable 产出其 ThreadStart 的 Sense |
|
||||
| disable Workflow | disable 触发它的 Reflex |
|
||||
| disable Reflex | 无级联 |
|
||||
|
||||
清理是可选的离线 GC 过程。
|
||||
@@ -492,7 +515,6 @@ Reflex ──→ Sense ──→ Sense (复合依赖)
|
||||
cpu-usage.ts
|
||||
disk-usage.ts
|
||||
active-tasks.ts
|
||||
should-cleanup.ts
|
||||
workflows/ # post-MVP
|
||||
cleanup.ts
|
||||
data/ # ⛔ gitignored
|
||||
@@ -528,9 +550,10 @@ Reflex ──→ Sense ──→ Sense (复合依赖)
|
||||
|
||||
## 10. 设计原则
|
||||
|
||||
1. **Sense 是唯一的一等公民** — 原始采样和派生计算统一为 Sense
|
||||
1. **Sense 是唯一的一等公民** — 原始采样和派生计算统一为 Sense,Sense 不知道下游
|
||||
2. **计算与触发解耦** — compute 不知道自己什么时候被调用
|
||||
3. **存储去中心化** — 每个 Sense 自管存储,没有统一 Event Store
|
||||
4. **不删除只关停** — 历史不可变,生命周期通过 enabled 控制
|
||||
5. **Workflow 是可选扩展** — MVP 不需要,后续按需加入
|
||||
6. **引擎极简** — 只做调度,不做业务逻辑
|
||||
3. **Reflex 是 Event Mesh** — 所有事件路由都声明在 Reflex 中,Sense 和 Workflow 之间无直连
|
||||
4. **存储去中心化** — 每个 Sense 自管存储,没有统一 Event Store
|
||||
5. **不删除只关停** — 历史不可变,生命周期通过 enabled 控制
|
||||
6. **Workflow 是可选扩展** — MVP 不需要,后续按需加入
|
||||
7. **引擎极简** — 只做调度,不做业务逻辑
|
||||
|
||||
Reference in New Issue
Block a user