小橘 🍊 e82fe8eaba
feat: OGraph Dispatcher — dual-loop actor for task notification (#4 P0) (#17)
* feat: add packages/dispatcher — dual-loop OGraph projection watcher + OC scheduler

Adds a new Node.js daemon that:
- Loop A (ProjectionWatcher): polls OGraph projections, diffs against
  snapshot, merges changes into a pending queue.
  - Idle: 30s poll interval; active (changes detected): 5s
- Loop B (OcScheduler): polls OC session-status, pushes pending queue
  when OC has available slots (>= minAvailable).
  - Idle (no pending): 60s; active (pending): 5s
  - Cooldown of 60s after each push to avoid spam

Tech:
- TypeScript + esbuild (zero runtime external deps)
- Graceful error handling: each poll is independent try-catch, errors
  logged but never crash the process
- Config from ~/.config/ograph/dispatcher.json + env-var overrides
- OGRAPH_CONFIG_FILE env var for config path override
- Push via /tmp/ograph-dispatch.json + openclaw message send (best-effort)

Build: npm run build → dist/index.js
Run:   node dist/index.js

* fix: address PR #17 review — package name, tests, shell safety, first-run

---------

Co-authored-by: 小墨 <xiaomooo@shazhou.work>
2026-04-13 10:01:48 +08:00

102 lines
3.3 KiB
Markdown

# OGraph Dispatcher
本地常驻进程,轮询 OGraph projection 变化,并在 OpenClaw (OC) 空闲时推送任务通知。
## 架构:双 Loop Actor
```
┌─ Loop A: Projection Watcher ──────────────┐
│ 轮询 OGraph projection → diff → pending │
│ 无变化: 30s 有变化: 5s │
└──────────────┬────────────────────────────┘
│ pending queue (合并缓冲)
┌─ Loop B: OC Scheduler ───────────────────┐
│ 轮询 OC 忙闲状态 │
│ pending 空: 60s (低频) │
│ pending 有: 5s (高频,等空闲立刻推) │
│ 空闲 + pending → 推送 → 清 pending │
└──────────────────────────────────────────┘
```
**Loop A** 持续 diff OGraph projections,合并变化到 pending queue(Map,按 projection name 去重合并)。
**Loop B** 持续查 OC session-status,发现空闲 + pending 非空时推送,推送后进入冷却期。
## 快速开始
```bash
cd packages/dispatcher
npm install
npm run build
node dist/index.js
```
## 配置
配置文件:`~/.config/ograph/dispatcher.json`
```json
{
"ograph": {
"endpoint": "https://ograph.shazhou.workers.dev",
"token": "your-ograph-token",
"projections": ["my-projection", "another-projection"]
},
"oc": {
"statusEndpoint": "http://localhost:18789/plugins/session-status/status",
"statusToken": "ograph-status-token-2026",
"minAvailable": 2
},
"intervals": {
"watcherIdle": 30000,
"watcherActive": 5000,
"schedulerIdle": 60000,
"schedulerActive": 5000,
"cooldownAfterPush": 60000
}
}
```
## 环境变量覆盖
| 变量 | 对应配置 |
|------|----------|
| `OGRAPH_ENDPOINT` | `ograph.endpoint` |
| `OGRAPH_TOKEN` | `ograph.token` |
| `OGRAPH_PROJECTIONS` | `ograph.projections`(逗号分隔) |
| `OC_STATUS_ENDPOINT` | `oc.statusEndpoint` |
| `OC_STATUS_TOKEN` | `oc.statusToken` |
| `OC_MIN_AVAILABLE` | `oc.minAvailable` |
## 推送机制
当 OC 空闲且 pending 非空时,Scheduler 通过两种方式推送(均 best-effort):
1. **Dispatch 文件**:写入 `/tmp/ograph-dispatch.json`,可被其他工具读取
2. **openclaw message send**:通过 CLI 发送通知消息(需 openclaw 在 PATH)
## 文件结构
```
packages/dispatcher/
├── src/
│ ├── index.ts # 入口,启动两个 loop
│ ├── watcher.ts # Loop A: Projection Watcher
│ ├── scheduler.ts # Loop B: OC Scheduler
│ ├── ograph-client.ts # OGraph API 客户端
│ ├── oc-client.ts # OC session-status API 客户端
│ ├── config.ts # 配置加载
│ └── types.ts # 类型定义
├── build.mjs # esbuild 打包脚本
├── package.json
├── tsconfig.json
└── README.md
```
## 依赖
- 零外部运行时依赖
- 仅使用 Node.js 内置模块(`fs`, `path`, `os`, `child_process`)和全局 `fetch`(Node ≥ 18)
- 构建时仅依赖 `esbuild` + `typescript`