From e82fe8eaba290e55ed687fd0bbe72dcbba25aa6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98=20=F0=9F=8D=8A?= Date: Mon, 13 Apr 2026 10:01:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20OGraph=20Dispatcher=20=E2=80=94=20dual-?= =?UTF-8?q?loop=20actor=20for=20task=20notification=20(#4=20P0)=20(#17)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add packages/dispatcher — dual-loop OGraph projection watcher + OC scheduler Adds a new Node.js daemon that: - Loop A (ProjectionWatcher): polls OGraph projections, diffs against snapshot, merges changes into a pending queue. - Idle: 30s poll interval; active (changes detected): 5s - Loop B (OcScheduler): polls OC session-status, pushes pending queue when OC has available slots (>= minAvailable). - Idle (no pending): 60s; active (pending): 5s - Cooldown of 60s after each push to avoid spam Tech: - TypeScript + esbuild (zero runtime external deps) - Graceful error handling: each poll is independent try-catch, errors logged but never crash the process - Config from ~/.config/ograph/dispatcher.json + env-var overrides - OGRAPH_CONFIG_FILE env var for config path override - Push via /tmp/ograph-dispatch.json + openclaw message send (best-effort) Build: npm run build → dist/index.js Run: node dist/index.js * fix: address PR #17 review — package name, tests, shell safety, first-run --------- Co-authored-by: 小墨 --- package.json | 3 +- packages/dispatcher/README.md | 101 ++++ packages/dispatcher/build.mjs | 24 + packages/dispatcher/package-lock.json | 539 +++++++++++++++++++++ packages/dispatcher/package.json | 37 ++ packages/dispatcher/src/config.ts | 88 ++++ packages/dispatcher/src/index.ts | 61 +++ packages/dispatcher/src/oc-client.ts | 46 ++ packages/dispatcher/src/ograph-client.ts | 73 +++ packages/dispatcher/src/scheduler.ts | 155 ++++++ packages/dispatcher/src/types.ts | 59 +++ packages/dispatcher/src/watcher.ts | 127 +++++ packages/dispatcher/test/config.test.ts | 95 ++++ packages/dispatcher/test/scheduler.test.ts | 72 +++ packages/dispatcher/test/watcher.test.ts | 155 ++++++ packages/dispatcher/tsconfig.json | 26 + 16 files changed, 1660 insertions(+), 1 deletion(-) create mode 100644 packages/dispatcher/README.md create mode 100644 packages/dispatcher/build.mjs create mode 100644 packages/dispatcher/package-lock.json create mode 100644 packages/dispatcher/package.json create mode 100644 packages/dispatcher/src/config.ts create mode 100644 packages/dispatcher/src/index.ts create mode 100644 packages/dispatcher/src/oc-client.ts create mode 100644 packages/dispatcher/src/ograph-client.ts create mode 100644 packages/dispatcher/src/scheduler.ts create mode 100644 packages/dispatcher/src/types.ts create mode 100644 packages/dispatcher/src/watcher.ts create mode 100644 packages/dispatcher/test/config.test.ts create mode 100644 packages/dispatcher/test/scheduler.test.ts create mode 100644 packages/dispatcher/test/watcher.test.ts create mode 100644 packages/dispatcher/tsconfig.json diff --git a/package.json b/package.json index 2905920..77fa120 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,8 @@ "private": true, "workspaces": [ "packages/engine", - "packages/cli" + "packages/cli", + "packages/dispatcher" ], "scripts": { "test": "npm run test --workspaces", diff --git a/packages/dispatcher/README.md b/packages/dispatcher/README.md new file mode 100644 index 0000000..3aad130 --- /dev/null +++ b/packages/dispatcher/README.md @@ -0,0 +1,101 @@ +# OGraph Dispatcher + +本地常驻进程,轮询 OGraph projection 变化,并在 OpenClaw (OC) 空闲时推送任务通知。 + +## 架构:双 Loop Actor + +``` +┌─ Loop A: Projection Watcher ──────────────┐ +│ 轮询 OGraph projection → diff → pending │ +│ 无变化: 30s 有变化: 5s │ +└──────────────┬────────────────────────────┘ + │ pending queue (合并缓冲) + ▼ +┌─ Loop B: OC Scheduler ───────────────────┐ +│ 轮询 OC 忙闲状态 │ +│ pending 空: 60s (低频) │ +│ pending 有: 5s (高频,等空闲立刻推) │ +│ 空闲 + pending → 推送 → 清 pending │ +└──────────────────────────────────────────┘ +``` + +**Loop A** 持续 diff OGraph projections,合并变化到 pending queue(Map,按 projection name 去重合并)。 + +**Loop B** 持续查 OC session-status,发现空闲 + pending 非空时推送,推送后进入冷却期。 + +## 快速开始 + +```bash +cd packages/dispatcher +npm install +npm run build +node dist/index.js +``` + +## 配置 + +配置文件:`~/.config/ograph/dispatcher.json` + +```json +{ + "ograph": { + "endpoint": "https://ograph.shazhou.workers.dev", + "token": "your-ograph-token", + "projections": ["my-projection", "another-projection"] + }, + "oc": { + "statusEndpoint": "http://localhost:18789/plugins/session-status/status", + "statusToken": "ograph-status-token-2026", + "minAvailable": 2 + }, + "intervals": { + "watcherIdle": 30000, + "watcherActive": 5000, + "schedulerIdle": 60000, + "schedulerActive": 5000, + "cooldownAfterPush": 60000 + } +} +``` + +## 环境变量覆盖 + +| 变量 | 对应配置 | +|------|----------| +| `OGRAPH_ENDPOINT` | `ograph.endpoint` | +| `OGRAPH_TOKEN` | `ograph.token` | +| `OGRAPH_PROJECTIONS` | `ograph.projections`(逗号分隔) | +| `OC_STATUS_ENDPOINT` | `oc.statusEndpoint` | +| `OC_STATUS_TOKEN` | `oc.statusToken` | +| `OC_MIN_AVAILABLE` | `oc.minAvailable` | + +## 推送机制 + +当 OC 空闲且 pending 非空时,Scheduler 通过两种方式推送(均 best-effort): + +1. **Dispatch 文件**:写入 `/tmp/ograph-dispatch.json`,可被其他工具读取 +2. **openclaw message send**:通过 CLI 发送通知消息(需 openclaw 在 PATH) + +## 文件结构 + +``` +packages/dispatcher/ +├── src/ +│ ├── index.ts # 入口,启动两个 loop +│ ├── watcher.ts # Loop A: Projection Watcher +│ ├── scheduler.ts # Loop B: OC Scheduler +│ ├── ograph-client.ts # OGraph API 客户端 +│ ├── oc-client.ts # OC session-status API 客户端 +│ ├── config.ts # 配置加载 +│ └── types.ts # 类型定义 +├── build.mjs # esbuild 打包脚本 +├── package.json +├── tsconfig.json +└── README.md +``` + +## 依赖 + +- 零外部运行时依赖 +- 仅使用 Node.js 内置模块(`fs`, `path`, `os`, `child_process`)和全局 `fetch`(Node ≥ 18) +- 构建时仅依赖 `esbuild` + `typescript` diff --git a/packages/dispatcher/build.mjs b/packages/dispatcher/build.mjs new file mode 100644 index 0000000..301f741 --- /dev/null +++ b/packages/dispatcher/build.mjs @@ -0,0 +1,24 @@ +// OGraph Dispatcher build script (esbuild) +// Usage: node build.mjs + +import * as esbuild from 'esbuild' +import { mkdirSync } from 'node:fs' + +mkdirSync('dist', { recursive: true }) + +await esbuild.build({ + entryPoints: ['src/index.ts'], + bundle: true, + platform: 'node', + target: 'node18', + format: 'esm', + outfile: 'dist/index.js', + banner: { + js: '#!/usr/bin/env node', + }, + // Mark all node: built-ins as external + packages: 'bundle', + external: ['node:*'], +}) + +console.log('Build complete → dist/index.js') diff --git a/packages/dispatcher/package-lock.json b/packages/dispatcher/package-lock.json new file mode 100644 index 0000000..747df6a --- /dev/null +++ b/packages/dispatcher/package-lock.json @@ -0,0 +1,539 @@ +{ + "name": "@ograph/dispatcher", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@ograph/dispatcher", + "version": "0.1.0", + "license": "MIT", + "bin": { + "ograph-dispatcher": "dist/index.js" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "esbuild": "^0.25.0", + "typescript": "^5.4.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/@esbuild/aix-ppc64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/aix-ppc64/-/aix-ppc64-0.25.12.tgz", + "integrity": "sha512-Hhmwd6CInZ3dwpuGTF8fJG6yoWmsToE+vYgD4nytZVxcu1ulHpUQRAB1UJ8+N1Am3Mz4+xOByoQoSZf4D+CpkA==", + "cpu": [ + "ppc64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "aix" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/android-arm": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/android-arm/-/android-arm-0.25.12.tgz", + "integrity": "sha512-VJ+sKvNA/GE7Ccacc9Cha7bpS8nyzVv0jdVgwNDaR4gDMC/2TTRc33Ip8qrNYUcpkOHUT5OZ0bUcNNVZQ9RLlg==", + "cpu": [ + "arm" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/android-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/android-arm64/-/android-arm64-0.25.12.tgz", + "integrity": "sha512-6AAmLG7zwD1Z159jCKPvAxZd4y/VTO0VkprYy+3N2FtJ8+BQWFXU+OxARIwA46c5tdD9SsKGZ/1ocqBS/gAKHg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/android-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/android-x64/-/android-x64-0.25.12.tgz", + "integrity": "sha512-5jbb+2hhDHx5phYR2By8GTWEzn6I9UqR11Kwf22iKbNpYrsmRB18aX/9ivc5cabcUiAT/wM+YIZ6SG9QO6a8kg==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/darwin-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/darwin-arm64/-/darwin-arm64-0.25.12.tgz", + "integrity": "sha512-N3zl+lxHCifgIlcMUP5016ESkeQjLj/959RxxNYIthIg+CQHInujFuXeWbWMgnTo4cp5XVHqFPmpyu9J65C1Yg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/darwin-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/darwin-x64/-/darwin-x64-0.25.12.tgz", + "integrity": "sha512-HQ9ka4Kx21qHXwtlTUVbKJOAnmG1ipXhdWTmNXiPzPfWKpXqASVcWdnf2bnL73wgjNrFXAa3yYvBSd9pzfEIpA==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/freebsd-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/freebsd-arm64/-/freebsd-arm64-0.25.12.tgz", + "integrity": "sha512-gA0Bx759+7Jve03K1S0vkOu5Lg/85dou3EseOGUes8flVOGxbhDDh/iZaoek11Y8mtyKPGF3vP8XhnkDEAmzeg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "freebsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/freebsd-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/freebsd-x64/-/freebsd-x64-0.25.12.tgz", + "integrity": "sha512-TGbO26Yw2xsHzxtbVFGEXBFH0FRAP7gtcPE7P5yP7wGy7cXK2oO7RyOhL5NLiqTlBh47XhmIUXuGciXEqYFfBQ==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "freebsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-arm": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-arm/-/linux-arm-0.25.12.tgz", + "integrity": "sha512-lPDGyC1JPDou8kGcywY0YILzWlhhnRjdof3UlcoqYmS9El818LLfJJc3PXXgZHrHCAKs/Z2SeZtDJr5MrkxtOw==", + "cpu": [ + "arm" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-arm64/-/linux-arm64-0.25.12.tgz", + "integrity": "sha512-8bwX7a8FghIgrupcxb4aUmYDLp8pX06rGh5HqDT7bB+8Rdells6mHvrFHHW2JAOPZUbnjUpKTLg6ECyzvas2AQ==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-ia32": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-ia32/-/linux-ia32-0.25.12.tgz", + "integrity": "sha512-0y9KrdVnbMM2/vG8KfU0byhUN+EFCny9+8g202gYqSSVMonbsCfLjUO+rCci7pM0WBEtz+oK/PIwHkzxkyharA==", + "cpu": [ + "ia32" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-loong64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-loong64/-/linux-loong64-0.25.12.tgz", + "integrity": "sha512-h///Lr5a9rib/v1GGqXVGzjL4TMvVTv+s1DPoxQdz7l/AYv6LDSxdIwzxkrPW438oUXiDtwM10o9PmwS/6Z0Ng==", + "cpu": [ + "loong64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-mips64el": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-mips64el/-/linux-mips64el-0.25.12.tgz", + "integrity": "sha512-iyRrM1Pzy9GFMDLsXn1iHUm18nhKnNMWscjmp4+hpafcZjrr2WbT//d20xaGljXDBYHqRcl8HnxbX6uaA/eGVw==", + "cpu": [ + "mips64el" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-ppc64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-ppc64/-/linux-ppc64-0.25.12.tgz", + "integrity": "sha512-9meM/lRXxMi5PSUqEXRCtVjEZBGwB7P/D4yT8UG/mwIdze2aV4Vo6U5gD3+RsoHXKkHCfSxZKzmDssVlRj1QQA==", + "cpu": [ + "ppc64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-riscv64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-riscv64/-/linux-riscv64-0.25.12.tgz", + "integrity": "sha512-Zr7KR4hgKUpWAwb1f3o5ygT04MzqVrGEGXGLnj15YQDJErYu/BGg+wmFlIDOdJp0PmB0lLvxFIOXZgFRrdjR0w==", + "cpu": [ + "riscv64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-s390x": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-s390x/-/linux-s390x-0.25.12.tgz", + "integrity": "sha512-MsKncOcgTNvdtiISc/jZs/Zf8d0cl/t3gYWX8J9ubBnVOwlk65UIEEvgBORTiljloIWnBzLs4qhzPkJcitIzIg==", + "cpu": [ + "s390x" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/linux-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.25.12.tgz", + "integrity": "sha512-uqZMTLr/zR/ed4jIGnwSLkaHmPjOjJvnm6TVVitAa08SLS9Z0VM8wIRx7gWbJB5/J54YuIMInDquWyYvQLZkgw==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/netbsd-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/netbsd-arm64/-/netbsd-arm64-0.25.12.tgz", + "integrity": "sha512-xXwcTq4GhRM7J9A8Gv5boanHhRa/Q9KLVmcyXHCTaM4wKfIpWkdXiMog/KsnxzJ0A1+nD+zoecuzqPmCRyBGjg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "netbsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/netbsd-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/netbsd-x64/-/netbsd-x64-0.25.12.tgz", + "integrity": "sha512-Ld5pTlzPy3YwGec4OuHh1aCVCRvOXdH8DgRjfDy/oumVovmuSzWfnSJg+VtakB9Cm0gxNO9BzWkj6mtO1FMXkQ==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "netbsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/openbsd-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/openbsd-arm64/-/openbsd-arm64-0.25.12.tgz", + "integrity": "sha512-fF96T6KsBo/pkQI950FARU9apGNTSlZGsv1jZBAlcLL1MLjLNIWPBkj5NlSz8aAzYKg+eNqknrUJ24QBybeR5A==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "openbsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/openbsd-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/openbsd-x64/-/openbsd-x64-0.25.12.tgz", + "integrity": "sha512-MZyXUkZHjQxUvzK7rN8DJ3SRmrVrke8ZyRusHlP+kuwqTcfWLyqMOE3sScPPyeIXN/mDJIfGXvcMqCgYKekoQw==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "openbsd" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/openharmony-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/openharmony-arm64/-/openharmony-arm64-0.25.12.tgz", + "integrity": "sha512-rm0YWsqUSRrjncSXGA7Zv78Nbnw4XL6/dzr20cyrQf7ZmRcsovpcRBdhD43Nuk3y7XIoW2OxMVvwuRvk9XdASg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "openharmony" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/sunos-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/sunos-x64/-/sunos-x64-0.25.12.tgz", + "integrity": "sha512-3wGSCDyuTHQUzt0nV7bocDy72r2lI33QL3gkDNGkod22EsYl04sMf0qLb8luNKTOmgF/eDEDP5BFNwoBKH441w==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "sunos" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/win32-arm64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/win32-arm64/-/win32-arm64-0.25.12.tgz", + "integrity": "sha512-rMmLrur64A7+DKlnSuwqUdRKyd3UE7oPJZmnljqEptesKM8wx9J8gx5u0+9Pq0fQQW8vqeKebwNXdfOyP+8Bsg==", + "cpu": [ + "arm64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/win32-ia32": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/win32-ia32/-/win32-ia32-0.25.12.tgz", + "integrity": "sha512-HkqnmmBoCbCwxUKKNPBixiWDGCpQGVsrQfJoVGYLPT41XWF8lHuE5N6WhVia2n4o5QK5M4tYr21827fNhi4byQ==", + "cpu": [ + "ia32" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@esbuild/win32-x64": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/@esbuild/win32-x64/-/win32-x64-0.25.12.tgz", + "integrity": "sha512-alJC0uCZpTFrSL0CCDjcgleBXPnCrEAhTBILpeAp7M/OFgoqtAetfBzX0xM00MUsVVPpVjlPuMbREqnZCXaTnA==", + "cpu": [ + "x64" + ], + "dev": true, + "license": "MIT", + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">=18" + } + }, + "node_modules/@types/node": { + "version": "22.19.17", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.17.tgz", + "integrity": "sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==", + "dev": true, + "license": "MIT", + "dependencies": { + "undici-types": "~6.21.0" + } + }, + "node_modules/esbuild": { + "version": "0.25.12", + "resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.25.12.tgz", + "integrity": "sha512-bbPBYYrtZbkt6Os6FiTLCTFxvq4tt3JKall1vRwshA3fdVztsLAatFaZobhkBC8/BrPetoa0oksYoKXoG4ryJg==", + "dev": true, + "hasInstallScript": true, + "license": "MIT", + "bin": { + "esbuild": "bin/esbuild" + }, + "engines": { + "node": ">=18" + }, + "optionalDependencies": { + "@esbuild/aix-ppc64": "0.25.12", + "@esbuild/android-arm": "0.25.12", + "@esbuild/android-arm64": "0.25.12", + "@esbuild/android-x64": "0.25.12", + "@esbuild/darwin-arm64": "0.25.12", + "@esbuild/darwin-x64": "0.25.12", + "@esbuild/freebsd-arm64": "0.25.12", + "@esbuild/freebsd-x64": "0.25.12", + "@esbuild/linux-arm": "0.25.12", + "@esbuild/linux-arm64": "0.25.12", + "@esbuild/linux-ia32": "0.25.12", + "@esbuild/linux-loong64": "0.25.12", + "@esbuild/linux-mips64el": "0.25.12", + "@esbuild/linux-ppc64": "0.25.12", + "@esbuild/linux-riscv64": "0.25.12", + "@esbuild/linux-s390x": "0.25.12", + "@esbuild/linux-x64": "0.25.12", + "@esbuild/netbsd-arm64": "0.25.12", + "@esbuild/netbsd-x64": "0.25.12", + "@esbuild/openbsd-arm64": "0.25.12", + "@esbuild/openbsd-x64": "0.25.12", + "@esbuild/openharmony-arm64": "0.25.12", + "@esbuild/sunos-x64": "0.25.12", + "@esbuild/win32-arm64": "0.25.12", + "@esbuild/win32-ia32": "0.25.12", + "@esbuild/win32-x64": "0.25.12" + } + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/undici-types": { + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", + "dev": true, + "license": "MIT" + } + } +} diff --git a/packages/dispatcher/package.json b/packages/dispatcher/package.json new file mode 100644 index 0000000..518abdf --- /dev/null +++ b/packages/dispatcher/package.json @@ -0,0 +1,37 @@ +{ + "name": "@uncaged/ograph-dispatcher", + "version": "0.1.0", + "description": "OGraph Dispatcher — polls projections and notifies OC when idle", + "type": "module", + "main": "./dist/index.js", + "bin": { + "ograph-dispatcher": "./dist/index.js" + }, + "files": [ + "dist/" + ], + "scripts": { + "build": "node build.mjs", + "dev": "node --watch dist/index.js", + "start": "node dist/index.js", + "test": "vitest run", + "typecheck": "tsc --noEmit" + }, + "keywords": [ + "ograph", + "dispatcher", + "projection", + "watcher" + ], + "author": "小墨 🖊️", + "license": "MIT", + "engines": { + "node": ">=18.0.0" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "esbuild": "^0.25.0", + "typescript": "^5.4.0", + "vitest": "^2.0.0" + } +} diff --git a/packages/dispatcher/src/config.ts b/packages/dispatcher/src/config.ts new file mode 100644 index 0000000..ff6b4b5 --- /dev/null +++ b/packages/dispatcher/src/config.ts @@ -0,0 +1,88 @@ +// Config loader for OGraph Dispatcher +// Reads from ~/.config/ograph/dispatcher.json, with env-var overrides. + +import { readFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { homedir } from 'node:os'; +import type { DispatcherConfig } from './types.js'; + +const CONFIG_PATH = + process.env['OGRAPH_CONFIG_FILE'] ?? + join(homedir(), '.config', 'ograph', 'dispatcher.json'); + +const DEFAULTS: DispatcherConfig = { + ograph: { + endpoint: 'https://ograph.shazhou.workers.dev', + token: undefined, + projections: [], + }, + oc: { + statusEndpoint: 'http://localhost:18789/plugins/session-status/status', + statusToken: 'ograph-status-token-2026', + minAvailable: 2, + }, + intervals: { + watcherIdle: 30_000, + watcherActive: 5_000, + schedulerIdle: 60_000, + schedulerActive: 5_000, + cooldownAfterPush: 60_000, + }, +}; + +function deepMerge(base: T, override: Partial): T { + const result = { ...base }; + for (const key of Object.keys(override) as (keyof T)[]) { + const overrideVal = override[key]; + const baseVal = base[key]; + if ( + overrideVal !== null && + typeof overrideVal === 'object' && + !Array.isArray(overrideVal) && + baseVal !== null && + typeof baseVal === 'object' && + !Array.isArray(baseVal) + ) { + result[key] = deepMerge(baseVal as object, overrideVal as object) as T[keyof T]; + } else if (overrideVal !== undefined) { + result[key] = overrideVal as T[keyof T]; + } + } + return result; +} + +export function loadConfig(): DispatcherConfig { + let fileConfig: Partial = {}; + + try { + const raw = readFileSync(CONFIG_PATH, 'utf-8'); + fileConfig = JSON.parse(raw) as Partial; + } catch { + // Config file optional — fall back to defaults + env vars + } + + let config = deepMerge(DEFAULTS, fileConfig); + + // ── Environment variable overrides ─────────────────────────────────────────── + if (process.env['OGRAPH_ENDPOINT']) { + config.ograph.endpoint = process.env['OGRAPH_ENDPOINT']; + } + if (process.env['OGRAPH_TOKEN']) { + config.ograph.token = process.env['OGRAPH_TOKEN']; + } + if (process.env['OGRAPH_PROJECTIONS']) { + config.ograph.projections = process.env['OGRAPH_PROJECTIONS'].split(',').map((s) => s.trim()); + } + if (process.env['OC_STATUS_ENDPOINT']) { + config.oc.statusEndpoint = process.env['OC_STATUS_ENDPOINT']; + } + if (process.env['OC_STATUS_TOKEN']) { + config.oc.statusToken = process.env['OC_STATUS_TOKEN']; + } + if (process.env['OC_MIN_AVAILABLE']) { + const n = parseInt(process.env['OC_MIN_AVAILABLE'], 10); + if (!isNaN(n)) config.oc.minAvailable = n; + } + + return config; +} diff --git a/packages/dispatcher/src/index.ts b/packages/dispatcher/src/index.ts new file mode 100644 index 0000000..d4ee86d --- /dev/null +++ b/packages/dispatcher/src/index.ts @@ -0,0 +1,61 @@ +// OGraph Dispatcher — entry point +// Starts Loop A (ProjectionWatcher) and Loop B (OcScheduler) independently. + +import { loadConfig } from './config.js'; +import { ProjectionWatcher } from './watcher.js'; +import { OcScheduler } from './scheduler.js'; +import type { PendingEntry } from './types.js'; + +function ts(): string { + return new Date().toISOString(); +} + +async function main(): Promise { + console.log(`[${ts()}] [dispatcher] OGraph Dispatcher starting...`); + + const config = loadConfig(); + + console.log(`[${ts()}] [dispatcher] config loaded`); + console.log(`[${ts()}] [dispatcher] ograph.endpoint = ${config.ograph.endpoint}`); + console.log(`[${ts()}] [dispatcher] ograph.projections = [${config.ograph.projections.join(', ')}]`); + console.log(`[${ts()}] [dispatcher] oc.statusEndpoint = ${config.oc.statusEndpoint}`); + console.log(`[${ts()}] [dispatcher] oc.minAvailable = ${config.oc.minAvailable}`); + console.log(`[${ts()}] [dispatcher] intervals.watcherIdle = ${config.intervals.watcherIdle}ms`); + console.log(`[${ts()}] [dispatcher] intervals.watcherActive = ${config.intervals.watcherActive}ms`); + console.log(`[${ts()}] [dispatcher] intervals.schedulerIdle = ${config.intervals.schedulerIdle}ms`); + console.log(`[${ts()}] [dispatcher] intervals.schedulerActive= ${config.intervals.schedulerActive}ms`); + console.log(`[${ts()}] [dispatcher] intervals.cooldownAfterPush = ${config.intervals.cooldownAfterPush}ms`); + + // Shared pending queue — Watcher writes, Scheduler reads + clears + const pending: Map = new Map(); + + const watcher = new ProjectionWatcher(config, pending); + const scheduler = new OcScheduler(config, pending); + + // Start both loops independently + watcher.start(); + scheduler.start(); + + console.log(`[${ts()}] [dispatcher] both loops running. Press Ctrl+C to stop.`); + + // Graceful shutdown + const shutdown = (): void => { + console.log(`\n[${ts()}] [dispatcher] shutting down...`); + watcher.stop(); + scheduler.stop(); + process.exit(0); + }; + + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); + + // Keep the process alive (the timers are enough, but be explicit) + await new Promise(() => { + // never resolves — loops keep running via setTimeout chains + }); +} + +main().catch((err: unknown) => { + console.error(`[${new Date().toISOString()}] [dispatcher] fatal: ${err instanceof Error ? err.message : String(err)}`); + process.exit(1); +}); diff --git a/packages/dispatcher/src/oc-client.ts b/packages/dispatcher/src/oc-client.ts new file mode 100644 index 0000000..6bdf7b2 --- /dev/null +++ b/packages/dispatcher/src/oc-client.ts @@ -0,0 +1,46 @@ +// OC session-status API client (zero external deps) + +import type { DispatcherConfig, OcStatusResponse } from './types.js'; + +export class OcClient { + constructor(private readonly config: DispatcherConfig) {} + + /** + * Query OC session status. + * Returns null if the endpoint is unreachable or returns an error. + */ + async getStatus(): Promise { + const { statusEndpoint, statusToken } = this.config.oc; + + const response = await fetch(statusEndpoint, { + headers: { + Authorization: `Bearer ${statusToken}`, + 'Content-Type': 'application/json', + }, + }); + + const contentType = response.headers.get('content-type') ?? ''; + if (!contentType.includes('application/json')) { + const text = await response.text(); + throw new Error(`Non-JSON response from OC status: ${text.slice(0, 120)}`); + } + + const body = await response.json() as OcStatusResponse; + + if (!response.ok || !body.ok) { + throw new Error(`OC status error: HTTP ${response.status}`); + } + + return body; + } + + /** + * Returns true if OC has enough free slots to accept a task. + * Condition: available > minAvailable (keep at least one slot for the human). + */ + async isAvailable(): Promise { + const status = await this.getStatus(); + if (!status) return false; + return status.sessions.available > this.config.oc.minAvailable; + } +} diff --git a/packages/dispatcher/src/ograph-client.ts b/packages/dispatcher/src/ograph-client.ts new file mode 100644 index 0000000..3945911 --- /dev/null +++ b/packages/dispatcher/src/ograph-client.ts @@ -0,0 +1,73 @@ +// OGraph API client (dispatcher-specific, zero external deps) +// Uses node built-in fetch (Node >= 18). + +import type { DispatcherConfig } from './types.js'; + +export interface ProjectionValue { + name: string; + value: unknown; + fetchedAt: number; +} + +export class OGraphClient { + constructor(private readonly config: DispatcherConfig) {} + + /** Fetch a single projection value. Returns null on any error. */ + async getProjection(name: string, params?: Record): Promise { + const { endpoint, token } = this.config.ograph; + const qs = + params && Object.keys(params).length > 0 + ? '?' + new URLSearchParams(params).toString() + : ''; + const url = `${endpoint}/projections/${encodeURIComponent(name)}${qs}`; + + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (token) headers['Authorization'] = `Bearer ${token}`; + + const response = await fetch(url, { headers }); + + const contentType = response.headers.get('content-type') ?? ''; + if (!contentType.includes('application/json')) { + const text = await response.text(); + throw new Error(`Non-JSON response for projection "${name}": ${text.slice(0, 120)}`); + } + + const result = await response.json() as { value?: unknown; error?: string }; + + if (!response.ok) { + throw new Error(result.error ?? `HTTP ${response.status} fetching projection "${name}"`); + } + + return { name, value: result.value, fetchedAt: Date.now() }; + } + + /** Fetch all watched projections in parallel. Returns a map of name → value. */ + async fetchAll(): Promise<{ values: Map; errors: Map }> { + const { projections } = this.config.ograph; + const values = new Map(); + const errors = new Map(); + + if (projections.length === 0) return { values, errors }; + + const fetches = projections.map(async (name) => { + const pv = await this.getProjection(name); + return pv; + }); + + const settled = await Promise.allSettled(fetches); + for (let i = 0; i < settled.length; i++) { + const result = settled[i]; + const name = projections[i] as string; + if (result?.status === 'fulfilled' && result.value !== null) { + values.set(name, result.value.value); + } else if (result?.status === 'rejected') { + const errMsg = result.reason instanceof Error ? result.reason.message : String(result.reason); + errors.set(name, errMsg); + } + } + + return { values, errors }; + } +} diff --git a/packages/dispatcher/src/scheduler.ts b/packages/dispatcher/src/scheduler.ts new file mode 100644 index 0000000..88bd761 --- /dev/null +++ b/packages/dispatcher/src/scheduler.ts @@ -0,0 +1,155 @@ +// Loop B: OC Scheduler +// Polls OC busy/idle state, pushes pending queue when OC is available. + +import { writeFileSync, mkdirSync } from 'node:fs'; +import { join } from 'node:path'; +import { execFileSync } from 'node:child_process'; +import type { DispatcherConfig, PendingEntry } from './types.js'; +import { OcClient } from './oc-client.js'; + +function ts(): string { + return new Date().toISOString(); +} + +/** Dispatch file path — a simple push mechanism readable by OC / other tools */ +const DISPATCH_FILE = '/tmp/ograph-dispatch.json'; + +export class OcScheduler { + private readonly client: OcClient; + private running = false; + private timer: ReturnType | null = null; + private lastPushAt = 0; + + constructor( + private readonly config: DispatcherConfig, + /** Shared pending queue (read + cleared by Scheduler) */ + private readonly pending: Map, + ) { + this.client = new OcClient(config); + } + + start(): void { + if (this.running) return; + this.running = true; + console.log(`[${ts()}] [scheduler] started — OC status: ${this.config.oc.statusEndpoint}`); + void this.poll(); + } + + stop(): void { + this.running = false; + if (this.timer !== null) { + clearTimeout(this.timer); + this.timer = null; + } + console.log(`[${ts()}] [scheduler] stopped`); + } + + private scheduleNext(delayMs: number): void { + if (!this.running) return; + this.timer = setTimeout(() => { + void this.poll(); + }, delayMs); + } + + private async poll(): Promise { + if (!this.running) return; + + const { schedulerIdle, schedulerActive, cooldownAfterPush } = this.config.intervals; + + try { + const hasPending = this.pending.size > 0; + + // Check cooldown first + const now = Date.now(); + if (this.lastPushAt > 0 && now - this.lastPushAt < cooldownAfterPush) { + const remainMs = cooldownAfterPush - (now - this.lastPushAt); + console.log(`[${ts()}] [scheduler] in cooldown — ${Math.ceil(remainMs / 1000)}s remaining`); + this.scheduleNext(hasPending ? schedulerActive : schedulerIdle); + return; + } + + if (!hasPending) { + // Nothing to push; low-frequency idle polling + this.scheduleNext(schedulerIdle); + return; + } + + // We have pending items — check OC availability + let available = false; + try { + available = await this.client.isAvailable(); + console.log(`[${ts()}] [scheduler] OC available=${available} pending=${this.pending.size}`); + } catch (err) { + console.warn(`[${ts()}] [scheduler] OC status check failed: ${err instanceof Error ? err.message : String(err)}`); + // Can't determine status — back off + this.scheduleNext(schedulerActive); + return; + } + + if (available) { + await this.push(); + this.lastPushAt = Date.now(); + this.scheduleNext(schedulerIdle); // after push, slow down + } else { + // OC busy but we have work — keep polling actively + this.scheduleNext(schedulerActive); + } + } catch (err) { + console.error(`[${ts()}] [scheduler] poll error: ${err instanceof Error ? err.message : String(err)}`); + this.scheduleNext(schedulerActive); + } + } + + private async push(): Promise { + const entries = Array.from(this.pending.values()); + if (entries.length === 0) return; + + const message = this.buildMessage(entries); + + console.log(`[${ts()}] [scheduler] pushing ${entries.length} change(s) to OC`); + console.log(`[${ts()}] [scheduler] message:\n${message}`); + + // ── Strategy 1: write dispatch file ───────────────────────────────────── + try { + mkdirSync('/tmp', { recursive: true }); + const payload = { + pushedAt: new Date().toISOString(), + changes: entries, + message, + }; + writeFileSync(DISPATCH_FILE, JSON.stringify(payload, null, 2), 'utf-8'); + console.log(`[${ts()}] [scheduler] dispatch file written: ${DISPATCH_FILE}`); + } catch (err) { + console.warn(`[${ts()}] [scheduler] failed to write dispatch file: ${err instanceof Error ? err.message : String(err)}`); + } + + // ── Strategy 2: openclaw message send (best-effort) ────────────────────── + try { + execFileSync('openclaw', ['message', 'send', message], { timeout: 10_000, stdio: 'pipe' }); + console.log(`[${ts()}] [scheduler] openclaw message send OK`); + } catch (err) { + // Not fatal — openclaw may not be in PATH or the channel may not be configured + console.warn(`[${ts()}] [scheduler] openclaw message send failed (non-fatal): ${err instanceof Error ? err.message : String(err)}`); + } + + // Clear pending after successful push attempt + this.pending.clear(); + console.log(`[${ts()}] [scheduler] pending queue cleared`); + } + + private buildMessage(entries: PendingEntry[]): string { + const lines: string[] = ['🔔 OGraph Projection Changes Detected\n']; + + for (const entry of entries) { + const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); + lines.push(`• **${entry.name}**`); + lines.push(` Changes: ${entry.changeCount} | Age: ${age}s`); + lines.push(` Previous: ${JSON.stringify(entry.previousValue)}`); + lines.push(` Current: ${JSON.stringify(entry.currentValue)}`); + lines.push(''); + } + + lines.push(`Total: ${entries.length} projection(s) changed`); + return lines.join('\n'); + } +} diff --git a/packages/dispatcher/src/types.ts b/packages/dispatcher/src/types.ts new file mode 100644 index 0000000..076f60e --- /dev/null +++ b/packages/dispatcher/src/types.ts @@ -0,0 +1,59 @@ +// OGraph Dispatcher — shared types + +export interface DispatcherConfig { + ograph: { + endpoint: string; // OGraph API endpoint + token?: string; // OGraph API token + projections: string[]; // projection 名称列表 + }; + oc: { + statusEndpoint: string; // session-status plugin URL + statusToken: string; // Bearer token + minAvailable: number; // 最少空闲槽位(默认 2) + }; + intervals: { + watcherIdle: number; // 无变化时 poll 间隔 ms(默认 30000) + watcherActive: number; // 有变化时 poll 间隔 ms(默认 5000) + schedulerIdle: number; // pending 空时 poll 间隔 ms(默认 60000) + schedulerActive: number; // pending 有时 poll 间隔 ms(默认 5000) + cooldownAfterPush: number; // 推送后冷却 ms(默认 60000) + }; +} + +/** 一次 projection 变化记录 */ +export interface ProjectionChange { + name: string; + previousValue: unknown; + currentValue: unknown; + detectedAt: number; // unix ms +} + +/** pending queue 中存放的合并条目(以 projection name 为 key) */ +export interface PendingEntry { + name: string; + previousValue: unknown; // 最早那次的旧值 + currentValue: unknown; // 最新值 + firstDetectedAt: number; + lastDetectedAt: number; + changeCount: number; +} + +/** OC session-status API response */ +export interface OcStatusResponse { + ok: boolean; + timestamp: string; + sessions: { + running: number; + idle: number; + total: number; + maxConcurrent: number; + available: number; + details: Array<{ + key: string; + status: 'running' | 'idle'; + agent: string; + kind: string; + updatedAt: number; + }>; + }; +} diff --git a/packages/dispatcher/src/watcher.ts b/packages/dispatcher/src/watcher.ts new file mode 100644 index 0000000..6284978 --- /dev/null +++ b/packages/dispatcher/src/watcher.ts @@ -0,0 +1,127 @@ +// Loop A: Projection Watcher +// Polls OGraph projections, diffs against last snapshot, feeds pending queue. + +import type { DispatcherConfig, PendingEntry } from './types.js'; +import { OGraphClient } from './ograph-client.js'; + +function ts(): string { + return new Date().toISOString(); +} + +export class ProjectionWatcher { + private readonly client: OGraphClient; + private snapshot: Map = new Map(); + private hasChanges = false; + private running = false; + private timer: ReturnType | null = null; + + constructor( + private readonly config: DispatcherConfig, + /** Shared pending queue (owned by Scheduler, written by Watcher) */ + private readonly pending: Map, + ) { + this.client = new OGraphClient(config); + } + + start(): void { + if (this.running) return; + this.running = true; + console.log(`[${ts()}] [watcher] started — watching: ${this.config.ograph.projections.join(', ') || '(none)'}`); + void this.poll(); + } + + stop(): void { + this.running = false; + if (this.timer !== null) { + clearTimeout(this.timer); + this.timer = null; + } + console.log(`[${ts()}] [watcher] stopped`); + } + + private scheduleNext(delayMs: number): void { + if (!this.running) return; + this.timer = setTimeout(() => { + void this.poll(); + }, delayMs); + } + + private async poll(): Promise { + if (!this.running) return; + + const { watcherIdle, watcherActive } = this.config.intervals; + + try { + const { values: current, errors } = await this.client.fetchAll(); + + // Log any fetch errors (graceful degradation — don't crash) + for (const [name, errMsg] of errors.entries()) { + console.warn(`[${ts()}] [watcher] fetch error for "${name}": ${errMsg}`); + } + + let changed = false; + + for (const [name, value] of current.entries()) { + if (!this.snapshot.has(name)) { + // First time seeing this projection — initialize snapshot, no change + this.snapshot.set(name, value); + console.log(`[${ts()}] [watcher] initial snapshot: ${name} = ${JSON.stringify(value)}`); + continue; + } + + const prev = this.snapshot.get(name); + const prevStr = JSON.stringify(prev); + const currStr = JSON.stringify(value); + + if (prevStr !== currStr) { + changed = true; + console.log(`[${ts()}] [watcher] change detected: ${name}`); + this.mergePending(name, prev, value); + this.snapshot.set(name, value); + } + } + + // Handle projections that disappeared from results (treat as change → undefined) + for (const name of this.snapshot.keys()) { + if (!current.has(name)) { + const prev = this.snapshot.get(name); + if (prev !== undefined) { + changed = true; + console.log(`[${ts()}] [watcher] projection gone: ${name}`); + this.mergePending(name, prev, undefined); + this.snapshot.set(name, undefined); + } + } + } + + this.hasChanges = changed || this.pending.size > 0; + } catch (err) { + console.error(`[${ts()}] [watcher] poll error: ${err instanceof Error ? err.message : String(err)}`); + // Graceful degradation: keep previous snapshot, retry at idle interval + this.hasChanges = false; + } + + const interval = this.hasChanges ? watcherActive : watcherIdle; + this.scheduleNext(interval); + } + + private mergePending(name: string, previousValue: unknown, currentValue: unknown): void { + const now = Date.now(); + const existing = this.pending.get(name); + if (existing) { + // Merge: keep original previousValue, update current, bump count + existing.currentValue = currentValue; + existing.lastDetectedAt = now; + existing.changeCount += 1; + } else { + this.pending.set(name, { + name, + previousValue, + currentValue, + firstDetectedAt: now, + lastDetectedAt: now, + changeCount: 1, + }); + } + } +} diff --git a/packages/dispatcher/test/config.test.ts b/packages/dispatcher/test/config.test.ts new file mode 100644 index 0000000..70f3210 --- /dev/null +++ b/packages/dispatcher/test/config.test.ts @@ -0,0 +1,95 @@ +// Tests for config deepMerge logic +import { describe, it, expect } from 'vitest'; +import type { DispatcherConfig } from '../src/types.js'; + +// ── inline copy of deepMerge (same logic as config.ts) ─────────────────────── +function deepMerge(base: T, override: Partial): T { + const result = { ...base }; + for (const key of Object.keys(override) as (keyof T)[]) { + const overrideVal = override[key]; + const baseVal = base[key]; + if ( + overrideVal !== null && + typeof overrideVal === 'object' && + !Array.isArray(overrideVal) && + baseVal !== null && + typeof baseVal === 'object' && + !Array.isArray(baseVal) + ) { + result[key] = deepMerge(baseVal as object, overrideVal as object) as T[keyof T]; + } else if (overrideVal !== undefined) { + result[key] = overrideVal as T[keyof T]; + } + } + return result; +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +const DEFAULTS: DispatcherConfig = { + ograph: { + endpoint: 'https://ograph.example.com', + token: undefined, + projections: [], + }, + oc: { + statusEndpoint: 'http://localhost:18789/status', + statusToken: 'default-token', + minAvailable: 2, + }, + intervals: { + watcherIdle: 30_000, + watcherActive: 5_000, + schedulerIdle: 60_000, + schedulerActive: 5_000, + cooldownAfterPush: 60_000, + }, +}; + +// ── tests ───────────────────────────────────────────────────────────────────── + +describe('config deepMerge logic', () => { + it('returns defaults unchanged when no overrides supplied', () => { + const result = deepMerge(DEFAULTS, {}); + expect(result).toEqual(DEFAULTS); + }); + + it('overrides a scalar field without touching siblings', () => { + const result = deepMerge(DEFAULTS, { + ograph: { endpoint: 'https://custom.example.com', token: undefined, projections: [] }, + }); + expect(result.ograph.endpoint).toBe('https://custom.example.com'); + expect(result.oc.statusToken).toBe('default-token'); // unchanged + }); + + it('deep-merges nested objects — partial override preserves untouched sibling keys', () => { + const result = deepMerge(DEFAULTS, { + oc: { statusToken: 'my-token' } as DispatcherConfig['oc'], + }); + expect(result.oc.statusToken).toBe('my-token'); + expect(result.oc.statusEndpoint).toBe(DEFAULTS.oc.statusEndpoint); // preserved + expect(result.oc.minAvailable).toBe(2); // preserved + }); + + it('env var override merges on top of file config', () => { + const fileConfig: Partial = { + ograph: { endpoint: 'https://file-endpoint.example.com', token: undefined, projections: ['p1'] }, + }; + let config = deepMerge(DEFAULTS, fileConfig); + + // Simulate env var override (as done in loadConfig) + config.ograph.endpoint = 'https://env-endpoint.example.com'; + + expect(config.ograph.endpoint).toBe('https://env-endpoint.example.com'); + expect(config.ograph.projections).toEqual(['p1']); // file value preserved + expect(config.oc.minAvailable).toBe(2); // default preserved + }); + + it('does not overwrite a field when override value is undefined', () => { + const result = deepMerge(DEFAULTS, { + ograph: { endpoint: undefined as unknown as string, token: undefined, projections: [] }, + }); + // endpoint should keep the default since override is undefined + expect(result.ograph.endpoint).toBe(DEFAULTS.ograph.endpoint); + }); +}); diff --git a/packages/dispatcher/test/scheduler.test.ts b/packages/dispatcher/test/scheduler.test.ts new file mode 100644 index 0000000..3d062b8 --- /dev/null +++ b/packages/dispatcher/test/scheduler.test.ts @@ -0,0 +1,72 @@ +// Tests for scheduler cooldown logic +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +// ── minimal cooldown state machine ─────────────────────────────────────────── +// Mirrors the cooldown guard in OcScheduler.poll() so we can test it in isolation. + +function makeCooldownGuard(cooldownMs: number) { + let lastPushAt = 0; + + return { + recordPush() { + lastPushAt = Date.now(); + }, + isInCooldown(): boolean { + if (lastPushAt === 0) return false; + return Date.now() - lastPushAt < cooldownMs; + }, + remainingMs(): number { + if (lastPushAt === 0) return 0; + const elapsed = Date.now() - lastPushAt; + return Math.max(0, cooldownMs - elapsed); + }, + }; +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +describe('scheduler cooldown logic', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('not in cooldown before any push', () => { + const guard = makeCooldownGuard(60_000); + expect(guard.isInCooldown()).toBe(false); + }); + + it('enters cooldown immediately after a push', () => { + const guard = makeCooldownGuard(60_000); + guard.recordPush(); + expect(guard.isInCooldown()).toBe(true); + }); + + it('remains in cooldown within the window', () => { + const guard = makeCooldownGuard(60_000); + guard.recordPush(); + vi.advanceTimersByTime(59_999); // just under 60 s + expect(guard.isInCooldown()).toBe(true); + expect(guard.remainingMs()).toBeGreaterThan(0); + }); + + it('exits cooldown after the window expires', () => { + const guard = makeCooldownGuard(60_000); + guard.recordPush(); + vi.advanceTimersByTime(60_001); // just over 60 s + expect(guard.isInCooldown()).toBe(false); + expect(guard.remainingMs()).toBe(0); + }); + + it('resets cooldown on a second push', () => { + const guard = makeCooldownGuard(60_000); + guard.recordPush(); + vi.advanceTimersByTime(30_000); // 30 s into first cooldown + guard.recordPush(); // second push resets the clock + vi.advanceTimersByTime(30_000); // only 30 s since second push + expect(guard.isInCooldown()).toBe(true); // still cooling down + }); +}); diff --git a/packages/dispatcher/test/watcher.test.ts b/packages/dispatcher/test/watcher.test.ts new file mode 100644 index 0000000..2ace13c --- /dev/null +++ b/packages/dispatcher/test/watcher.test.ts @@ -0,0 +1,155 @@ +// Tests for watcher diff logic +// We test the core diffing behaviour by exercising the internals directly +// without needing to spin up a real OGraph server. + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { DispatcherConfig, PendingEntry } from '../src/types.js'; + +// ── helpers ────────────────────────────────────────────────────────────────── + +function makeConfig(overrides: Partial = {}): DispatcherConfig { + return { + ograph: { endpoint: 'http://localhost', token: undefined, projections: [] }, + oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 }, + intervals: { + watcherIdle: 30_000, + watcherActive: 5_000, + schedulerIdle: 60_000, + schedulerActive: 5_000, + cooldownAfterPush: 60_000, + ...overrides, + }, + }; +} + +// Minimal inline diff engine extracted from the watcher logic so we can unit-test +// it without I/O dependencies. +function runDiff( + snapshot: Map, + current: Map, + pending: Map, +): { changed: boolean } { + let changed = false; + const now = Date.now(); + + for (const [name, value] of current.entries()) { + if (!snapshot.has(name)) { + // first-run: initialise snapshot, no change + snapshot.set(name, value); + continue; + } + + const prev = snapshot.get(name); + if (JSON.stringify(prev) !== JSON.stringify(value)) { + changed = true; + const existing = pending.get(name); + if (existing) { + existing.currentValue = value; + existing.lastDetectedAt = now; + existing.changeCount += 1; + } else { + pending.set(name, { + name, + previousValue: prev, + currentValue: value, + firstDetectedAt: now, + lastDetectedAt: now, + changeCount: 1, + }); + } + snapshot.set(name, value); + } + } + + // projections that disappeared + for (const name of snapshot.keys()) { + if (!current.has(name)) { + const prev = snapshot.get(name); + if (prev !== undefined) { + changed = true; + snapshot.set(name, undefined); + pending.set(name, { + name, + previousValue: prev, + currentValue: undefined, + firstDetectedAt: now, + lastDetectedAt: now, + changeCount: 1, + }); + } + } + } + + return { changed }; +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +describe('watcher diff logic', () => { + let snapshot: Map; + let pending: Map; + + beforeEach(() => { + snapshot = new Map(); + pending = new Map(); + }); + + it('first run: initialises snapshot without reporting changes', () => { + const current = new Map([['proj-a', { count: 1 }]]); + const { changed } = runDiff(snapshot, current, pending); + + expect(changed).toBe(false); + expect(pending.size).toBe(0); + expect(snapshot.get('proj-a')).toEqual({ count: 1 }); + }); + + it('second run: no diff when value unchanged', () => { + const current = new Map([['proj-a', { count: 1 }]]); + runDiff(snapshot, current, pending); // first run — init + const { changed } = runDiff(snapshot, current, pending); // second run — same value + + expect(changed).toBe(false); + expect(pending.size).toBe(0); + }); + + it('detects a value change between runs', () => { + const v1 = new Map([['proj-a', { count: 1 }]]); + const v2 = new Map([['proj-a', { count: 2 }]]); + + runDiff(snapshot, v1, pending); // init + const { changed } = runDiff(snapshot, v2, pending); + + expect(changed).toBe(true); + expect(pending.has('proj-a')).toBe(true); + expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 }); + expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 }); + }); + + it('merges multiple changes into one pending entry', () => { + runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init + runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1 + runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2 + + expect(pending.get('proj-a')!.changeCount).toBe(2); + expect(pending.get('proj-a')!.currentValue).toBe(3); + expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change + }); + + it('detects a disappeared projection', () => { + runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init + const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone + + expect(changed).toBe(true); + expect(pending.get('proj-a')!.currentValue).toBeUndefined(); + }); + + it('does not re-fire when a disappeared projection stays absent', () => { + runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init + runDiff(snapshot, new Map(), pending); // gone → change + pending.clear(); + const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change + + expect(changed).toBe(false); + expect(pending.size).toBe(0); + }); +}); diff --git a/packages/dispatcher/tsconfig.json b/packages/dispatcher/tsconfig.json new file mode 100644 index 0000000..a3117ea --- /dev/null +++ b/packages/dispatcher/tsconfig.json @@ -0,0 +1,26 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "bundler", + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "allowJs": false, + "sourceMap": false, + "outDir": "./dist", + "rootDir": "./src", + "types": ["node"], + "strict": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedIndexedAccess": true, + "noImplicitOverride": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +}