feat: OGraph Dispatcher — dual-loop actor for task notification (#4 P0) (#17)

* feat: add packages/dispatcher — dual-loop OGraph projection watcher + OC scheduler

Adds a new Node.js daemon that:
- Loop A (ProjectionWatcher): polls OGraph projections, diffs against
  snapshot, merges changes into a pending queue.
  - Idle: 30s poll interval; active (changes detected): 5s
- Loop B (OcScheduler): polls OC session-status, pushes pending queue
  when OC has available slots (>= minAvailable).
  - Idle (no pending): 60s; active (pending): 5s
  - Cooldown of 60s after each push to avoid spam

Tech:
- TypeScript + esbuild (zero runtime external deps)
- Graceful error handling: each poll is independent try-catch, errors
  logged but never crash the process
- Config from ~/.config/ograph/dispatcher.json + env-var overrides
- OGRAPH_CONFIG_FILE env var for config path override
- Push via /tmp/ograph-dispatch.json + openclaw message send (best-effort)

Build: npm run build → dist/index.js
Run:   node dist/index.js

* fix: address PR #17 review — package name, tests, shell safety, first-run

---------

Co-authored-by: 小墨 <xiaomooo@shazhou.work>
This commit is contained in:
小橘 🍊 2026-04-13 10:01:48 +08:00 committed by GitHub
parent b6935fc311
commit e82fe8eaba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1660 additions and 1 deletions

View File

@ -3,7 +3,8 @@
"private": true, "private": true,
"workspaces": [ "workspaces": [
"packages/engine", "packages/engine",
"packages/cli" "packages/cli",
"packages/dispatcher"
], ],
"scripts": { "scripts": {
"test": "npm run test --workspaces", "test": "npm run test --workspaces",

View File

@ -0,0 +1,101 @@
# OGraph Dispatcher
本地常驻进程,轮询 OGraph projection 变化,并在 OpenClaw (OC) 空闲时推送任务通知。
## 架构:双 Loop Actor
```
┌─ Loop A: Projection Watcher ──────────────┐
│ 轮询 OGraph projection → diff → pending │
│ 无变化: 30s 有变化: 5s │
└──────────────┬────────────────────────────┘
│ pending queue (合并缓冲)
┌─ Loop B: OC Scheduler ───────────────────┐
│ 轮询 OC 忙闲状态 │
│ pending 空: 60s (低频) │
│ pending 有: 5s (高频,等空闲立刻推) │
│ 空闲 + pending → 推送 → 清 pending │
└──────────────────────────────────────────┘
```
**Loop A** 持续 diff OGraph projections,合并变化到 pending queue(Map,按 projection name 去重合并)。
**Loop B** 持续查 OC session-status,发现空闲 + pending 非空时推送,推送后进入冷却期。
## 快速开始
```bash
cd packages/dispatcher
npm install
npm run build
node dist/index.js
```
## 配置
配置文件:`~/.config/ograph/dispatcher.json`
```json
{
"ograph": {
"endpoint": "https://ograph.shazhou.workers.dev",
"token": "your-ograph-token",
"projections": ["my-projection", "another-projection"]
},
"oc": {
"statusEndpoint": "http://localhost:18789/plugins/session-status/status",
"statusToken": "ograph-status-token-2026",
"minAvailable": 2
},
"intervals": {
"watcherIdle": 30000,
"watcherActive": 5000,
"schedulerIdle": 60000,
"schedulerActive": 5000,
"cooldownAfterPush": 60000
}
}
```
## 环境变量覆盖
| 变量 | 对应配置 |
|------|----------|
| `OGRAPH_ENDPOINT` | `ograph.endpoint` |
| `OGRAPH_TOKEN` | `ograph.token` |
| `OGRAPH_PROJECTIONS` | `ograph.projections`(逗号分隔) |
| `OC_STATUS_ENDPOINT` | `oc.statusEndpoint` |
| `OC_STATUS_TOKEN` | `oc.statusToken` |
| `OC_MIN_AVAILABLE` | `oc.minAvailable` |
## 推送机制
当 OC 空闲且 pending 非空时,Scheduler 通过两种方式推送(均 best-effort):
1. **Dispatch 文件**:写入 `/tmp/ograph-dispatch.json`,可被其他工具读取
2. **openclaw message send**:通过 CLI 发送通知消息(需 openclaw 在 PATH)
## 文件结构
```
packages/dispatcher/
├── src/
│ ├── index.ts # 入口,启动两个 loop
│ ├── watcher.ts # Loop A: Projection Watcher
│ ├── scheduler.ts # Loop B: OC Scheduler
│ ├── ograph-client.ts # OGraph API 客户端
│ ├── oc-client.ts # OC session-status API 客户端
│ ├── config.ts # 配置加载
│ └── types.ts # 类型定义
├── build.mjs # esbuild 打包脚本
├── package.json
├── tsconfig.json
└── README.md
```
## 依赖
- 零外部运行时依赖
- 仅使用 Node.js 内置模块(`fs`, `path`, `os`, `child_process`)和全局 `fetch`(Node ≥ 18)
- 构建时仅依赖 `esbuild` + `typescript`

View File

@ -0,0 +1,24 @@
// OGraph Dispatcher build script (esbuild)
// Usage: node build.mjs
import * as esbuild from 'esbuild'
import { mkdirSync } from 'node:fs'
mkdirSync('dist', { recursive: true })
await esbuild.build({
entryPoints: ['src/index.ts'],
bundle: true,
platform: 'node',
target: 'node18',
format: 'esm',
outfile: 'dist/index.js',
banner: {
js: '#!/usr/bin/env node',
},
// Mark all node: built-ins as external
packages: 'bundle',
external: ['node:*'],
})
console.log('Build complete → dist/index.js')

539
packages/dispatcher/package-lock.json generated Normal file
View File

@ -0,0 +1,539 @@
{
"name": "@ograph/dispatcher",
"version": "0.1.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@ograph/dispatcher",
"version": "0.1.0",
"license": "MIT",
"bin": {
"ograph-dispatcher": "dist/index.js"
},
"devDependencies": {
"@types/node": "^22.0.0",
"esbuild": "^0.25.0",
"typescript": "^5.4.0"
},
"engines": {
"node": ">=18.0.0"
}
},
"node_modules/@esbuild/aix-ppc64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/aix-ppc64/-/aix-ppc64-0.25.12.tgz",
"integrity": "sha512-Hhmwd6CInZ3dwpuGTF8fJG6yoWmsToE+vYgD4nytZVxcu1ulHpUQRAB1UJ8+N1Am3Mz4+xOByoQoSZf4D+CpkA==",
"cpu": [
"ppc64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"aix"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-arm": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/android-arm/-/android-arm-0.25.12.tgz",
"integrity": "sha512-VJ+sKvNA/GE7Ccacc9Cha7bpS8nyzVv0jdVgwNDaR4gDMC/2TTRc33Ip8qrNYUcpkOHUT5OZ0bUcNNVZQ9RLlg==",
"cpu": [
"arm"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/android-arm64/-/android-arm64-0.25.12.tgz",
"integrity": "sha512-6AAmLG7zwD1Z159jCKPvAxZd4y/VTO0VkprYy+3N2FtJ8+BQWFXU+OxARIwA46c5tdD9SsKGZ/1ocqBS/gAKHg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/android-x64/-/android-x64-0.25.12.tgz",
"integrity": "sha512-5jbb+2hhDHx5phYR2By8GTWEzn6I9UqR11Kwf22iKbNpYrsmRB18aX/9ivc5cabcUiAT/wM+YIZ6SG9QO6a8kg==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/darwin-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-arm64/-/darwin-arm64-0.25.12.tgz",
"integrity": "sha512-N3zl+lxHCifgIlcMUP5016ESkeQjLj/959RxxNYIthIg+CQHInujFuXeWbWMgnTo4cp5XVHqFPmpyu9J65C1Yg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/darwin-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-x64/-/darwin-x64-0.25.12.tgz",
"integrity": "sha512-HQ9ka4Kx21qHXwtlTUVbKJOAnmG1ipXhdWTmNXiPzPfWKpXqASVcWdnf2bnL73wgjNrFXAa3yYvBSd9pzfEIpA==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/freebsd-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/freebsd-arm64/-/freebsd-arm64-0.25.12.tgz",
"integrity": "sha512-gA0Bx759+7Jve03K1S0vkOu5Lg/85dou3EseOGUes8flVOGxbhDDh/iZaoek11Y8mtyKPGF3vP8XhnkDEAmzeg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"freebsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/freebsd-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/freebsd-x64/-/freebsd-x64-0.25.12.tgz",
"integrity": "sha512-TGbO26Yw2xsHzxtbVFGEXBFH0FRAP7gtcPE7P5yP7wGy7cXK2oO7RyOhL5NLiqTlBh47XhmIUXuGciXEqYFfBQ==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"freebsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-arm": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-arm/-/linux-arm-0.25.12.tgz",
"integrity": "sha512-lPDGyC1JPDou8kGcywY0YILzWlhhnRjdof3UlcoqYmS9El818LLfJJc3PXXgZHrHCAKs/Z2SeZtDJr5MrkxtOw==",
"cpu": [
"arm"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-arm64/-/linux-arm64-0.25.12.tgz",
"integrity": "sha512-8bwX7a8FghIgrupcxb4aUmYDLp8pX06rGh5HqDT7bB+8Rdells6mHvrFHHW2JAOPZUbnjUpKTLg6ECyzvas2AQ==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-ia32": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-ia32/-/linux-ia32-0.25.12.tgz",
"integrity": "sha512-0y9KrdVnbMM2/vG8KfU0byhUN+EFCny9+8g202gYqSSVMonbsCfLjUO+rCci7pM0WBEtz+oK/PIwHkzxkyharA==",
"cpu": [
"ia32"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-loong64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-loong64/-/linux-loong64-0.25.12.tgz",
"integrity": "sha512-h///Lr5a9rib/v1GGqXVGzjL4TMvVTv+s1DPoxQdz7l/AYv6LDSxdIwzxkrPW438oUXiDtwM10o9PmwS/6Z0Ng==",
"cpu": [
"loong64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-mips64el": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-mips64el/-/linux-mips64el-0.25.12.tgz",
"integrity": "sha512-iyRrM1Pzy9GFMDLsXn1iHUm18nhKnNMWscjmp4+hpafcZjrr2WbT//d20xaGljXDBYHqRcl8HnxbX6uaA/eGVw==",
"cpu": [
"mips64el"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-ppc64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-ppc64/-/linux-ppc64-0.25.12.tgz",
"integrity": "sha512-9meM/lRXxMi5PSUqEXRCtVjEZBGwB7P/D4yT8UG/mwIdze2aV4Vo6U5gD3+RsoHXKkHCfSxZKzmDssVlRj1QQA==",
"cpu": [
"ppc64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-riscv64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-riscv64/-/linux-riscv64-0.25.12.tgz",
"integrity": "sha512-Zr7KR4hgKUpWAwb1f3o5ygT04MzqVrGEGXGLnj15YQDJErYu/BGg+wmFlIDOdJp0PmB0lLvxFIOXZgFRrdjR0w==",
"cpu": [
"riscv64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-s390x": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-s390x/-/linux-s390x-0.25.12.tgz",
"integrity": "sha512-MsKncOcgTNvdtiISc/jZs/Zf8d0cl/t3gYWX8J9ubBnVOwlk65UIEEvgBORTiljloIWnBzLs4qhzPkJcitIzIg==",
"cpu": [
"s390x"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.25.12.tgz",
"integrity": "sha512-uqZMTLr/zR/ed4jIGnwSLkaHmPjOjJvnm6TVVitAa08SLS9Z0VM8wIRx7gWbJB5/J54YuIMInDquWyYvQLZkgw==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/netbsd-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/netbsd-arm64/-/netbsd-arm64-0.25.12.tgz",
"integrity": "sha512-xXwcTq4GhRM7J9A8Gv5boanHhRa/Q9KLVmcyXHCTaM4wKfIpWkdXiMog/KsnxzJ0A1+nD+zoecuzqPmCRyBGjg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"netbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/netbsd-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/netbsd-x64/-/netbsd-x64-0.25.12.tgz",
"integrity": "sha512-Ld5pTlzPy3YwGec4OuHh1aCVCRvOXdH8DgRjfDy/oumVovmuSzWfnSJg+VtakB9Cm0gxNO9BzWkj6mtO1FMXkQ==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"netbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/openbsd-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/openbsd-arm64/-/openbsd-arm64-0.25.12.tgz",
"integrity": "sha512-fF96T6KsBo/pkQI950FARU9apGNTSlZGsv1jZBAlcLL1MLjLNIWPBkj5NlSz8aAzYKg+eNqknrUJ24QBybeR5A==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"openbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/openbsd-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/openbsd-x64/-/openbsd-x64-0.25.12.tgz",
"integrity": "sha512-MZyXUkZHjQxUvzK7rN8DJ3SRmrVrke8ZyRusHlP+kuwqTcfWLyqMOE3sScPPyeIXN/mDJIfGXvcMqCgYKekoQw==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"openbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/openharmony-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/openharmony-arm64/-/openharmony-arm64-0.25.12.tgz",
"integrity": "sha512-rm0YWsqUSRrjncSXGA7Zv78Nbnw4XL6/dzr20cyrQf7ZmRcsovpcRBdhD43Nuk3y7XIoW2OxMVvwuRvk9XdASg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"openharmony"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/sunos-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/sunos-x64/-/sunos-x64-0.25.12.tgz",
"integrity": "sha512-3wGSCDyuTHQUzt0nV7bocDy72r2lI33QL3gkDNGkod22EsYl04sMf0qLb8luNKTOmgF/eDEDP5BFNwoBKH441w==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"sunos"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-arm64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/win32-arm64/-/win32-arm64-0.25.12.tgz",
"integrity": "sha512-rMmLrur64A7+DKlnSuwqUdRKyd3UE7oPJZmnljqEptesKM8wx9J8gx5u0+9Pq0fQQW8vqeKebwNXdfOyP+8Bsg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-ia32": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/win32-ia32/-/win32-ia32-0.25.12.tgz",
"integrity": "sha512-HkqnmmBoCbCwxUKKNPBixiWDGCpQGVsrQfJoVGYLPT41XWF8lHuE5N6WhVia2n4o5QK5M4tYr21827fNhi4byQ==",
"cpu": [
"ia32"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-x64": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/@esbuild/win32-x64/-/win32-x64-0.25.12.tgz",
"integrity": "sha512-alJC0uCZpTFrSL0CCDjcgleBXPnCrEAhTBILpeAp7M/OFgoqtAetfBzX0xM00MUsVVPpVjlPuMbREqnZCXaTnA==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@types/node": {
"version": "22.19.17",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.17.tgz",
"integrity": "sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==",
"dev": true,
"license": "MIT",
"dependencies": {
"undici-types": "~6.21.0"
}
},
"node_modules/esbuild": {
"version": "0.25.12",
"resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.25.12.tgz",
"integrity": "sha512-bbPBYYrtZbkt6Os6FiTLCTFxvq4tt3JKall1vRwshA3fdVztsLAatFaZobhkBC8/BrPetoa0oksYoKXoG4ryJg==",
"dev": true,
"hasInstallScript": true,
"license": "MIT",
"bin": {
"esbuild": "bin/esbuild"
},
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"@esbuild/aix-ppc64": "0.25.12",
"@esbuild/android-arm": "0.25.12",
"@esbuild/android-arm64": "0.25.12",
"@esbuild/android-x64": "0.25.12",
"@esbuild/darwin-arm64": "0.25.12",
"@esbuild/darwin-x64": "0.25.12",
"@esbuild/freebsd-arm64": "0.25.12",
"@esbuild/freebsd-x64": "0.25.12",
"@esbuild/linux-arm": "0.25.12",
"@esbuild/linux-arm64": "0.25.12",
"@esbuild/linux-ia32": "0.25.12",
"@esbuild/linux-loong64": "0.25.12",
"@esbuild/linux-mips64el": "0.25.12",
"@esbuild/linux-ppc64": "0.25.12",
"@esbuild/linux-riscv64": "0.25.12",
"@esbuild/linux-s390x": "0.25.12",
"@esbuild/linux-x64": "0.25.12",
"@esbuild/netbsd-arm64": "0.25.12",
"@esbuild/netbsd-x64": "0.25.12",
"@esbuild/openbsd-arm64": "0.25.12",
"@esbuild/openbsd-x64": "0.25.12",
"@esbuild/openharmony-arm64": "0.25.12",
"@esbuild/sunos-x64": "0.25.12",
"@esbuild/win32-arm64": "0.25.12",
"@esbuild/win32-ia32": "0.25.12",
"@esbuild/win32-x64": "0.25.12"
}
},
"node_modules/typescript": {
"version": "5.9.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz",
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
"dev": true,
"license": "Apache-2.0",
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=14.17"
}
},
"node_modules/undici-types": {
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz",
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==",
"dev": true,
"license": "MIT"
}
}
}

View File

@ -0,0 +1,37 @@
{
"name": "@uncaged/ograph-dispatcher",
"version": "0.1.0",
"description": "OGraph Dispatcher — polls projections and notifies OC when idle",
"type": "module",
"main": "./dist/index.js",
"bin": {
"ograph-dispatcher": "./dist/index.js"
},
"files": [
"dist/"
],
"scripts": {
"build": "node build.mjs",
"dev": "node --watch dist/index.js",
"start": "node dist/index.js",
"test": "vitest run",
"typecheck": "tsc --noEmit"
},
"keywords": [
"ograph",
"dispatcher",
"projection",
"watcher"
],
"author": "小墨 🖊️",
"license": "MIT",
"engines": {
"node": ">=18.0.0"
},
"devDependencies": {
"@types/node": "^22.0.0",
"esbuild": "^0.25.0",
"typescript": "^5.4.0",
"vitest": "^2.0.0"
}
}

View File

@ -0,0 +1,88 @@
// Config loader for OGraph Dispatcher
// Reads from ~/.config/ograph/dispatcher.json, with env-var overrides.
import { readFileSync } from 'node:fs';
import { join } from 'node:path';
import { homedir } from 'node:os';
import type { DispatcherConfig } from './types.js';
const CONFIG_PATH =
process.env['OGRAPH_CONFIG_FILE'] ??
join(homedir(), '.config', 'ograph', 'dispatcher.json');
const DEFAULTS: DispatcherConfig = {
ograph: {
endpoint: 'https://ograph.shazhou.workers.dev',
token: undefined,
projections: [],
},
oc: {
statusEndpoint: 'http://localhost:18789/plugins/session-status/status',
statusToken: 'ograph-status-token-2026',
minAvailable: 2,
},
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
},
};
function deepMerge<T extends object>(base: T, override: Partial<T>): T {
const result = { ...base };
for (const key of Object.keys(override) as (keyof T)[]) {
const overrideVal = override[key];
const baseVal = base[key];
if (
overrideVal !== null &&
typeof overrideVal === 'object' &&
!Array.isArray(overrideVal) &&
baseVal !== null &&
typeof baseVal === 'object' &&
!Array.isArray(baseVal)
) {
result[key] = deepMerge(baseVal as object, overrideVal as object) as T[keyof T];
} else if (overrideVal !== undefined) {
result[key] = overrideVal as T[keyof T];
}
}
return result;
}
export function loadConfig(): DispatcherConfig {
let fileConfig: Partial<DispatcherConfig> = {};
try {
const raw = readFileSync(CONFIG_PATH, 'utf-8');
fileConfig = JSON.parse(raw) as Partial<DispatcherConfig>;
} catch {
// Config file optional — fall back to defaults + env vars
}
let config = deepMerge(DEFAULTS, fileConfig);
// ── Environment variable overrides ───────────────────────────────────────────
if (process.env['OGRAPH_ENDPOINT']) {
config.ograph.endpoint = process.env['OGRAPH_ENDPOINT'];
}
if (process.env['OGRAPH_TOKEN']) {
config.ograph.token = process.env['OGRAPH_TOKEN'];
}
if (process.env['OGRAPH_PROJECTIONS']) {
config.ograph.projections = process.env['OGRAPH_PROJECTIONS'].split(',').map((s) => s.trim());
}
if (process.env['OC_STATUS_ENDPOINT']) {
config.oc.statusEndpoint = process.env['OC_STATUS_ENDPOINT'];
}
if (process.env['OC_STATUS_TOKEN']) {
config.oc.statusToken = process.env['OC_STATUS_TOKEN'];
}
if (process.env['OC_MIN_AVAILABLE']) {
const n = parseInt(process.env['OC_MIN_AVAILABLE'], 10);
if (!isNaN(n)) config.oc.minAvailable = n;
}
return config;
}

View File

@ -0,0 +1,61 @@
// OGraph Dispatcher — entry point
// Starts Loop A (ProjectionWatcher) and Loop B (OcScheduler) independently.
import { loadConfig } from './config.js';
import { ProjectionWatcher } from './watcher.js';
import { OcScheduler } from './scheduler.js';
import type { PendingEntry } from './types.js';
function ts(): string {
return new Date().toISOString();
}
async function main(): Promise<void> {
console.log(`[${ts()}] [dispatcher] OGraph Dispatcher starting...`);
const config = loadConfig();
console.log(`[${ts()}] [dispatcher] config loaded`);
console.log(`[${ts()}] [dispatcher] ograph.endpoint = ${config.ograph.endpoint}`);
console.log(`[${ts()}] [dispatcher] ograph.projections = [${config.ograph.projections.join(', ')}]`);
console.log(`[${ts()}] [dispatcher] oc.statusEndpoint = ${config.oc.statusEndpoint}`);
console.log(`[${ts()}] [dispatcher] oc.minAvailable = ${config.oc.minAvailable}`);
console.log(`[${ts()}] [dispatcher] intervals.watcherIdle = ${config.intervals.watcherIdle}ms`);
console.log(`[${ts()}] [dispatcher] intervals.watcherActive = ${config.intervals.watcherActive}ms`);
console.log(`[${ts()}] [dispatcher] intervals.schedulerIdle = ${config.intervals.schedulerIdle}ms`);
console.log(`[${ts()}] [dispatcher] intervals.schedulerActive= ${config.intervals.schedulerActive}ms`);
console.log(`[${ts()}] [dispatcher] intervals.cooldownAfterPush = ${config.intervals.cooldownAfterPush}ms`);
// Shared pending queue — Watcher writes, Scheduler reads + clears
const pending: Map<string, PendingEntry> = new Map();
const watcher = new ProjectionWatcher(config, pending);
const scheduler = new OcScheduler(config, pending);
// Start both loops independently
watcher.start();
scheduler.start();
console.log(`[${ts()}] [dispatcher] both loops running. Press Ctrl+C to stop.`);
// Graceful shutdown
const shutdown = (): void => {
console.log(`\n[${ts()}] [dispatcher] shutting down...`);
watcher.stop();
scheduler.stop();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// Keep the process alive (the timers are enough, but be explicit)
await new Promise<void>(() => {
// never resolves — loops keep running via setTimeout chains
});
}
main().catch((err: unknown) => {
console.error(`[${new Date().toISOString()}] [dispatcher] fatal: ${err instanceof Error ? err.message : String(err)}`);
process.exit(1);
});

View File

@ -0,0 +1,46 @@
// OC session-status API client (zero external deps)
import type { DispatcherConfig, OcStatusResponse } from './types.js';
export class OcClient {
constructor(private readonly config: DispatcherConfig) {}
/**
* Query OC session status.
* Returns null if the endpoint is unreachable or returns an error.
*/
async getStatus(): Promise<OcStatusResponse | null> {
const { statusEndpoint, statusToken } = this.config.oc;
const response = await fetch(statusEndpoint, {
headers: {
Authorization: `Bearer ${statusToken}`,
'Content-Type': 'application/json',
},
});
const contentType = response.headers.get('content-type') ?? '';
if (!contentType.includes('application/json')) {
const text = await response.text();
throw new Error(`Non-JSON response from OC status: ${text.slice(0, 120)}`);
}
const body = await response.json() as OcStatusResponse;
if (!response.ok || !body.ok) {
throw new Error(`OC status error: HTTP ${response.status}`);
}
return body;
}
/**
* Returns true if OC has enough free slots to accept a task.
* Condition: available > minAvailable (keep at least one slot for the human).
*/
async isAvailable(): Promise<boolean> {
const status = await this.getStatus();
if (!status) return false;
return status.sessions.available > this.config.oc.minAvailable;
}
}

View File

@ -0,0 +1,73 @@
// OGraph API client (dispatcher-specific, zero external deps)
// Uses node built-in fetch (Node >= 18).
import type { DispatcherConfig } from './types.js';
export interface ProjectionValue {
name: string;
value: unknown;
fetchedAt: number;
}
export class OGraphClient {
constructor(private readonly config: DispatcherConfig) {}
/** Fetch a single projection value. Returns null on any error. */
async getProjection(name: string, params?: Record<string, string>): Promise<ProjectionValue | null> {
const { endpoint, token } = this.config.ograph;
const qs =
params && Object.keys(params).length > 0
? '?' + new URLSearchParams(params).toString()
: '';
const url = `${endpoint}/projections/${encodeURIComponent(name)}${qs}`;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (token) headers['Authorization'] = `Bearer ${token}`;
const response = await fetch(url, { headers });
const contentType = response.headers.get('content-type') ?? '';
if (!contentType.includes('application/json')) {
const text = await response.text();
throw new Error(`Non-JSON response for projection "${name}": ${text.slice(0, 120)}`);
}
const result = await response.json() as { value?: unknown; error?: string };
if (!response.ok) {
throw new Error(result.error ?? `HTTP ${response.status} fetching projection "${name}"`);
}
return { name, value: result.value, fetchedAt: Date.now() };
}
/** Fetch all watched projections in parallel. Returns a map of name → value. */
async fetchAll(): Promise<{ values: Map<string, unknown>; errors: Map<string, string> }> {
const { projections } = this.config.ograph;
const values = new Map<string, unknown>();
const errors = new Map<string, string>();
if (projections.length === 0) return { values, errors };
const fetches = projections.map(async (name) => {
const pv = await this.getProjection(name);
return pv;
});
const settled = await Promise.allSettled(fetches);
for (let i = 0; i < settled.length; i++) {
const result = settled[i];
const name = projections[i] as string;
if (result?.status === 'fulfilled' && result.value !== null) {
values.set(name, result.value.value);
} else if (result?.status === 'rejected') {
const errMsg = result.reason instanceof Error ? result.reason.message : String(result.reason);
errors.set(name, errMsg);
}
}
return { values, errors };
}
}

View File

@ -0,0 +1,155 @@
// Loop B: OC Scheduler
// Polls OC busy/idle state, pushes pending queue when OC is available.
import { writeFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { execFileSync } from 'node:child_process';
import type { DispatcherConfig, PendingEntry } from './types.js';
import { OcClient } from './oc-client.js';
function ts(): string {
return new Date().toISOString();
}
/** Dispatch file path — a simple push mechanism readable by OC / other tools */
const DISPATCH_FILE = '/tmp/ograph-dispatch.json';
export class OcScheduler {
private readonly client: OcClient;
private running = false;
private timer: ReturnType<typeof setTimeout> | null = null;
private lastPushAt = 0;
constructor(
private readonly config: DispatcherConfig,
/** Shared pending queue (read + cleared by Scheduler) */
private readonly pending: Map<string, PendingEntry>,
) {
this.client = new OcClient(config);
}
start(): void {
if (this.running) return;
this.running = true;
console.log(`[${ts()}] [scheduler] started — OC status: ${this.config.oc.statusEndpoint}`);
void this.poll();
}
stop(): void {
this.running = false;
if (this.timer !== null) {
clearTimeout(this.timer);
this.timer = null;
}
console.log(`[${ts()}] [scheduler] stopped`);
}
private scheduleNext(delayMs: number): void {
if (!this.running) return;
this.timer = setTimeout(() => {
void this.poll();
}, delayMs);
}
private async poll(): Promise<void> {
if (!this.running) return;
const { schedulerIdle, schedulerActive, cooldownAfterPush } = this.config.intervals;
try {
const hasPending = this.pending.size > 0;
// Check cooldown first
const now = Date.now();
if (this.lastPushAt > 0 && now - this.lastPushAt < cooldownAfterPush) {
const remainMs = cooldownAfterPush - (now - this.lastPushAt);
console.log(`[${ts()}] [scheduler] in cooldown — ${Math.ceil(remainMs / 1000)}s remaining`);
this.scheduleNext(hasPending ? schedulerActive : schedulerIdle);
return;
}
if (!hasPending) {
// Nothing to push; low-frequency idle polling
this.scheduleNext(schedulerIdle);
return;
}
// We have pending items — check OC availability
let available = false;
try {
available = await this.client.isAvailable();
console.log(`[${ts()}] [scheduler] OC available=${available} pending=${this.pending.size}`);
} catch (err) {
console.warn(`[${ts()}] [scheduler] OC status check failed: ${err instanceof Error ? err.message : String(err)}`);
// Can't determine status — back off
this.scheduleNext(schedulerActive);
return;
}
if (available) {
await this.push();
this.lastPushAt = Date.now();
this.scheduleNext(schedulerIdle); // after push, slow down
} else {
// OC busy but we have work — keep polling actively
this.scheduleNext(schedulerActive);
}
} catch (err) {
console.error(`[${ts()}] [scheduler] poll error: ${err instanceof Error ? err.message : String(err)}`);
this.scheduleNext(schedulerActive);
}
}
private async push(): Promise<void> {
const entries = Array.from(this.pending.values());
if (entries.length === 0) return;
const message = this.buildMessage(entries);
console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to OC`);
console.log(`[${ts()}] [scheduler] message:\n${message}`);
// ── Strategy 1: write dispatch file ─────────────────────────────────────
try {
mkdirSync('/tmp', { recursive: true });
const payload = {
pushedAt: new Date().toISOString(),
changes: entries,
message,
};
writeFileSync(DISPATCH_FILE, JSON.stringify(payload, null, 2), 'utf-8');
console.log(`[${ts()}] [scheduler] dispatch file written: ${DISPATCH_FILE}`);
} catch (err) {
console.warn(`[${ts()}] [scheduler] failed to write dispatch file: ${err instanceof Error ? err.message : String(err)}`);
}
// ── Strategy 2: openclaw message send (best-effort) ──────────────────────
try {
execFileSync('openclaw', ['message', 'send', message], { timeout: 10_000, stdio: 'pipe' });
console.log(`[${ts()}] [scheduler] openclaw message send OK`);
} catch (err) {
// Not fatal — openclaw may not be in PATH or the channel may not be configured
console.warn(`[${ts()}] [scheduler] openclaw message send failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`);
}
// Clear pending after successful push attempt
this.pending.clear();
console.log(`[${ts()}] [scheduler] pending queue cleared`);
}
private buildMessage(entries: PendingEntry[]): string {
const lines: string[] = ['🔔 OGraph Projection Changes Detected\n'];
for (const entry of entries) {
const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000);
lines.push(`• **${entry.name}**`);
lines.push(` Changes: ${entry.changeCount} | Age: ${age}s`);
lines.push(` Previous: ${JSON.stringify(entry.previousValue)}`);
lines.push(` Current: ${JSON.stringify(entry.currentValue)}`);
lines.push('');
}
lines.push(`Total: ${entries.length} projection(s) changed`);
return lines.join('\n');
}
}

View File

@ -0,0 +1,59 @@
// OGraph Dispatcher — shared types
export interface DispatcherConfig {
ograph: {
endpoint: string; // OGraph API endpoint
token?: string; // OGraph API token
projections: string[]; // projection 名称列表
};
oc: {
statusEndpoint: string; // session-status plugin URL
statusToken: string; // Bearer token
minAvailable: number; // 最少空闲槽位(默认 2)
};
intervals: {
watcherIdle: number; // 无变化时 poll 间隔 ms(默认 30000)
watcherActive: number; // 有变化时 poll 间隔 ms(默认 5000)
schedulerIdle: number; // pending 空时 poll 间隔 ms(默认 60000)
schedulerActive: number; // pending 有时 poll 间隔 ms(默认 5000)
cooldownAfterPush: number; // 推送后冷却 ms(默认 60000)
};
}
/** 一次 projection 变化记录 */
export interface ProjectionChange {
name: string;
previousValue: unknown;
currentValue: unknown;
detectedAt: number; // unix ms
}
/** pending queue 中存放的合并条目(以 projection name 为 key) */
export interface PendingEntry {
name: string;
previousValue: unknown; // 最早那次的旧值
currentValue: unknown; // 最新值
firstDetectedAt: number;
lastDetectedAt: number;
changeCount: number;
}
/** OC session-status API response */
export interface OcStatusResponse {
ok: boolean;
timestamp: string;
sessions: {
running: number;
idle: number;
total: number;
maxConcurrent: number;
available: number;
details: Array<{
key: string;
status: 'running' | 'idle';
agent: string;
kind: string;
updatedAt: number;
}>;
};
}

View File

@ -0,0 +1,127 @@
// Loop A: Projection Watcher
// Polls OGraph projections, diffs against last snapshot, feeds pending queue.
import type { DispatcherConfig, PendingEntry } from './types.js';
import { OGraphClient } from './ograph-client.js';
function ts(): string {
return new Date().toISOString();
}
export class ProjectionWatcher {
private readonly client: OGraphClient;
private snapshot: Map<string, unknown> = new Map();
private hasChanges = false;
private running = false;
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private readonly config: DispatcherConfig,
/** Shared pending queue (owned by Scheduler, written by Watcher) */
private readonly pending: Map<string, PendingEntry>,
) {
this.client = new OGraphClient(config);
}
start(): void {
if (this.running) return;
this.running = true;
console.log(`[${ts()}] [watcher] started — watching: ${this.config.ograph.projections.join(', ') || '(none)'}`);
void this.poll();
}
stop(): void {
this.running = false;
if (this.timer !== null) {
clearTimeout(this.timer);
this.timer = null;
}
console.log(`[${ts()}] [watcher] stopped`);
}
private scheduleNext(delayMs: number): void {
if (!this.running) return;
this.timer = setTimeout(() => {
void this.poll();
}, delayMs);
}
private async poll(): Promise<void> {
if (!this.running) return;
const { watcherIdle, watcherActive } = this.config.intervals;
try {
const { values: current, errors } = await this.client.fetchAll();
// Log any fetch errors (graceful degradation — don't crash)
for (const [name, errMsg] of errors.entries()) {
console.warn(`[${ts()}] [watcher] fetch error for "${name}": ${errMsg}`);
}
let changed = false;
for (const [name, value] of current.entries()) {
if (!this.snapshot.has(name)) {
// First time seeing this projection — initialize snapshot, no change
this.snapshot.set(name, value);
console.log(`[${ts()}] [watcher] initial snapshot: ${name} = ${JSON.stringify(value)}`);
continue;
}
const prev = this.snapshot.get(name);
const prevStr = JSON.stringify(prev);
const currStr = JSON.stringify(value);
if (prevStr !== currStr) {
changed = true;
console.log(`[${ts()}] [watcher] change detected: ${name}`);
this.mergePending(name, prev, value);
this.snapshot.set(name, value);
}
}
// Handle projections that disappeared from results (treat as change → undefined)
for (const name of this.snapshot.keys()) {
if (!current.has(name)) {
const prev = this.snapshot.get(name);
if (prev !== undefined) {
changed = true;
console.log(`[${ts()}] [watcher] projection gone: ${name}`);
this.mergePending(name, prev, undefined);
this.snapshot.set(name, undefined);
}
}
}
this.hasChanges = changed || this.pending.size > 0;
} catch (err) {
console.error(`[${ts()}] [watcher] poll error: ${err instanceof Error ? err.message : String(err)}`);
// Graceful degradation: keep previous snapshot, retry at idle interval
this.hasChanges = false;
}
const interval = this.hasChanges ? watcherActive : watcherIdle;
this.scheduleNext(interval);
}
private mergePending(name: string, previousValue: unknown, currentValue: unknown): void {
const now = Date.now();
const existing = this.pending.get(name);
if (existing) {
// Merge: keep original previousValue, update current, bump count
existing.currentValue = currentValue;
existing.lastDetectedAt = now;
existing.changeCount += 1;
} else {
this.pending.set(name, {
name,
previousValue,
currentValue,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
}
}

View File

@ -0,0 +1,95 @@
// Tests for config deepMerge logic
import { describe, it, expect } from 'vitest';
import type { DispatcherConfig } from '../src/types.js';
// ── inline copy of deepMerge (same logic as config.ts) ───────────────────────
function deepMerge<T extends object>(base: T, override: Partial<T>): T {
const result = { ...base };
for (const key of Object.keys(override) as (keyof T)[]) {
const overrideVal = override[key];
const baseVal = base[key];
if (
overrideVal !== null &&
typeof overrideVal === 'object' &&
!Array.isArray(overrideVal) &&
baseVal !== null &&
typeof baseVal === 'object' &&
!Array.isArray(baseVal)
) {
result[key] = deepMerge(baseVal as object, overrideVal as object) as T[keyof T];
} else if (overrideVal !== undefined) {
result[key] = overrideVal as T[keyof T];
}
}
return result;
}
// ── helpers ───────────────────────────────────────────────────────────────────
const DEFAULTS: DispatcherConfig = {
ograph: {
endpoint: 'https://ograph.example.com',
token: undefined,
projections: [],
},
oc: {
statusEndpoint: 'http://localhost:18789/status',
statusToken: 'default-token',
minAvailable: 2,
},
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
},
};
// ── tests ─────────────────────────────────────────────────────────────────────
describe('config deepMerge logic', () => {
it('returns defaults unchanged when no overrides supplied', () => {
const result = deepMerge(DEFAULTS, {});
expect(result).toEqual(DEFAULTS);
});
it('overrides a scalar field without touching siblings', () => {
const result = deepMerge(DEFAULTS, {
ograph: { endpoint: 'https://custom.example.com', token: undefined, projections: [] },
});
expect(result.ograph.endpoint).toBe('https://custom.example.com');
expect(result.oc.statusToken).toBe('default-token'); // unchanged
});
it('deep-merges nested objects — partial override preserves untouched sibling keys', () => {
const result = deepMerge(DEFAULTS, {
oc: { statusToken: 'my-token' } as DispatcherConfig['oc'],
});
expect(result.oc.statusToken).toBe('my-token');
expect(result.oc.statusEndpoint).toBe(DEFAULTS.oc.statusEndpoint); // preserved
expect(result.oc.minAvailable).toBe(2); // preserved
});
it('env var override merges on top of file config', () => {
const fileConfig: Partial<DispatcherConfig> = {
ograph: { endpoint: 'https://file-endpoint.example.com', token: undefined, projections: ['p1'] },
};
let config = deepMerge(DEFAULTS, fileConfig);
// Simulate env var override (as done in loadConfig)
config.ograph.endpoint = 'https://env-endpoint.example.com';
expect(config.ograph.endpoint).toBe('https://env-endpoint.example.com');
expect(config.ograph.projections).toEqual(['p1']); // file value preserved
expect(config.oc.minAvailable).toBe(2); // default preserved
});
it('does not overwrite a field when override value is undefined', () => {
const result = deepMerge(DEFAULTS, {
ograph: { endpoint: undefined as unknown as string, token: undefined, projections: [] },
});
// endpoint should keep the default since override is undefined
expect(result.ograph.endpoint).toBe(DEFAULTS.ograph.endpoint);
});
});

View File

@ -0,0 +1,72 @@
// Tests for scheduler cooldown logic
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
// ── minimal cooldown state machine ───────────────────────────────────────────
// Mirrors the cooldown guard in OcScheduler.poll() so we can test it in isolation.
function makeCooldownGuard(cooldownMs: number) {
let lastPushAt = 0;
return {
recordPush() {
lastPushAt = Date.now();
},
isInCooldown(): boolean {
if (lastPushAt === 0) return false;
return Date.now() - lastPushAt < cooldownMs;
},
remainingMs(): number {
if (lastPushAt === 0) return 0;
const elapsed = Date.now() - lastPushAt;
return Math.max(0, cooldownMs - elapsed);
},
};
}
// ── tests ─────────────────────────────────────────────────────────────────────
describe('scheduler cooldown logic', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it('not in cooldown before any push', () => {
const guard = makeCooldownGuard(60_000);
expect(guard.isInCooldown()).toBe(false);
});
it('enters cooldown immediately after a push', () => {
const guard = makeCooldownGuard(60_000);
guard.recordPush();
expect(guard.isInCooldown()).toBe(true);
});
it('remains in cooldown within the window', () => {
const guard = makeCooldownGuard(60_000);
guard.recordPush();
vi.advanceTimersByTime(59_999); // just under 60 s
expect(guard.isInCooldown()).toBe(true);
expect(guard.remainingMs()).toBeGreaterThan(0);
});
it('exits cooldown after the window expires', () => {
const guard = makeCooldownGuard(60_000);
guard.recordPush();
vi.advanceTimersByTime(60_001); // just over 60 s
expect(guard.isInCooldown()).toBe(false);
expect(guard.remainingMs()).toBe(0);
});
it('resets cooldown on a second push', () => {
const guard = makeCooldownGuard(60_000);
guard.recordPush();
vi.advanceTimersByTime(30_000); // 30 s into first cooldown
guard.recordPush(); // second push resets the clock
vi.advanceTimersByTime(30_000); // only 30 s since second push
expect(guard.isInCooldown()).toBe(true); // still cooling down
});
});

View File

@ -0,0 +1,155 @@
// Tests for watcher diff logic
// We test the core diffing behaviour by exercising the internals directly
// without needing to spin up a real OGraph server.
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { DispatcherConfig, PendingEntry } from '../src/types.js';
// ── helpers ──────────────────────────────────────────────────────────────────
function makeConfig(overrides: Partial<DispatcherConfig['intervals']> = {}): DispatcherConfig {
return {
ograph: { endpoint: 'http://localhost', token: undefined, projections: [] },
oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 },
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
...overrides,
},
};
}
// Minimal inline diff engine extracted from the watcher logic so we can unit-test
// it without I/O dependencies.
function runDiff(
snapshot: Map<string, unknown>,
current: Map<string, unknown>,
pending: Map<string, PendingEntry>,
): { changed: boolean } {
let changed = false;
const now = Date.now();
for (const [name, value] of current.entries()) {
if (!snapshot.has(name)) {
// first-run: initialise snapshot, no change
snapshot.set(name, value);
continue;
}
const prev = snapshot.get(name);
if (JSON.stringify(prev) !== JSON.stringify(value)) {
changed = true;
const existing = pending.get(name);
if (existing) {
existing.currentValue = value;
existing.lastDetectedAt = now;
existing.changeCount += 1;
} else {
pending.set(name, {
name,
previousValue: prev,
currentValue: value,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
snapshot.set(name, value);
}
}
// projections that disappeared
for (const name of snapshot.keys()) {
if (!current.has(name)) {
const prev = snapshot.get(name);
if (prev !== undefined) {
changed = true;
snapshot.set(name, undefined);
pending.set(name, {
name,
previousValue: prev,
currentValue: undefined,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
}
}
return { changed };
}
// ── tests ─────────────────────────────────────────────────────────────────────
describe('watcher diff logic', () => {
let snapshot: Map<string, unknown>;
let pending: Map<string, PendingEntry>;
beforeEach(() => {
snapshot = new Map();
pending = new Map();
});
it('first run: initialises snapshot without reporting changes', () => {
const current = new Map([['proj-a', { count: 1 }]]);
const { changed } = runDiff(snapshot, current, pending);
expect(changed).toBe(false);
expect(pending.size).toBe(0);
expect(snapshot.get('proj-a')).toEqual({ count: 1 });
});
it('second run: no diff when value unchanged', () => {
const current = new Map([['proj-a', { count: 1 }]]);
runDiff(snapshot, current, pending); // first run — init
const { changed } = runDiff(snapshot, current, pending); // second run — same value
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
it('detects a value change between runs', () => {
const v1 = new Map([['proj-a', { count: 1 }]]);
const v2 = new Map([['proj-a', { count: 2 }]]);
runDiff(snapshot, v1, pending); // init
const { changed } = runDiff(snapshot, v2, pending);
expect(changed).toBe(true);
expect(pending.has('proj-a')).toBe(true);
expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 });
expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 });
});
it('merges multiple changes into one pending entry', () => {
runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init
runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1
runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2
expect(pending.get('proj-a')!.changeCount).toBe(2);
expect(pending.get('proj-a')!.currentValue).toBe(3);
expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change
});
it('detects a disappeared projection', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone
expect(changed).toBe(true);
expect(pending.get('proj-a')!.currentValue).toBeUndefined();
});
it('does not re-fire when a disappeared projection stays absent', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
runDiff(snapshot, new Map(), pending); // gone → change
pending.clear();
const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
});

View File

@ -0,0 +1,26 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"allowJs": false,
"sourceMap": false,
"outDir": "./dist",
"rootDir": "./src",
"types": ["node"],
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noImplicitReturns": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}