feat: @uncaged/pulse core engine — runPulse + Rule type + S combinator composition

- Moore machine model: Effects determined by state diff, not events
- S combinator rule composition: pulse = S r3 . S r2 . S r1 $ dummy
- createRule helper with accessor-based state adaptation
- 7 tests passing (compose, async, adaptive tickMs, Moore property)

Design: https://github.com/oc-xiaoju/ograph/issues/44
This commit is contained in:
2026-04-14 02:44:17 +00:00
commit 590facb45c
8 changed files with 532 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
node_modules/
dist/
+72
View File
@@ -0,0 +1,72 @@
# Pulse
**Agent 的自主神经系统。**
有状态的响应式循环——持续感知多数据源,自主执行确定性任务,只在不确定时才上报 Agent 决策。
## 核心模型
```
f(prevSnapshot, currSnapshot) → (Effect[], tickMs)
```
Runtime 只有十行。所有智能在规则里。
### Moore 机
Pulse 不逐事件响应,只看两次采样间的状态变化。Effects 由新状态决定,不由事件驱动。副作用不满足结合律——Pulse 承认采样有损,保证每个采样点的决策在当时是对的。
### S 组合子规则叠加
每条规则是一个 S 组合子。不区分确定性/不确定性规则,统一叠加:
```
pulse = S r3 . S r2 . S r1 $ dummy
```
规则签名:
```typescript
type Rule<S, E> =
(prev: S, curr: S) =>
(effects: E[], tickMs: number) =>
Promise<[E[], number]> | [E[], number]
```
每条规则接收快照,返回修饰函数——拿到前面规则的累积结果,可以追加、删除、替换 effects,调整 tickMs,或 pass through。
### Agent 关系
Agent 是意识层,Pulse 是自主神经系统。Agent 定义规则,Pulse 执行规则,遇到例外回报 Agent,Agent 调整规则。
## 包结构
| 包 | 内容 |
|---|---|
| `@uncaged/pulse` | 核心引擎:`runPulse()` + `Rule` 类型,纯泛型零依赖 |
| `@uncaged/upulse` | CLI:daemon 管理 + test/staging/promote |
| `@uncaged/pulse-rules` | 共享规则函数库 |
## Engine 目录
每个 Agent 维护自己的 engine repo:
```
~/.upulse/
config.json
engine/ ← main branch(生产)
types.ts ← Snapshot + Effect 类型
rules/ ← 规则链
collectors/ ← State 获取
effectors/ ← Effect 执行
pulse.config.ts
staging/ ← git worktree(Agent 在这改)
```
## 设计文档
详见 [RFC #44: Pulse — Agent 的自主神经系统](https://github.com/oc-xiaoju/ograph/issues/44)
## License
MIT
+28
View File
@@ -0,0 +1,28 @@
# Design Document
See [RFC #44: Pulse — Agent 的自主神经系统](https://github.com/oc-xiaoju/ograph/issues/44)
## Summary
### Core Formula
```
f(prevSnapshot, currSnapshot) → (Effect[], tickMs)
```
### Key Properties
- **Moore Machine** — Effects determined by state, not events. Pulse polls projection values, not event streams.
- **Path Dependent** — Sampling is lossy. A→B + B→C ≠ A→C at the effect level. Poll frequency = resolution of reality.
- **S Combinator** — Single composition primitive. `pulse = S r3 . S r2 . S r1 $ dummy`
- **Not Forced Pure** — Rules can be async/impure. Framework doesn't distinguish certain/uncertain.
- **Agent Managed** — The rule repo is managed by the Agent. Types, rules, collectors, effectors — all TypeScript, all under `tsc` guard.
- **Staging via git worktree** — Safe experimentation before promote to production.
### Packages
| Package | Purpose |
|---------|---------|
| `@uncaged/pulse` | Core engine: `runPulse()` + `Rule` type. Generic, zero deps. |
| `@uncaged/upulse` | CLI: daemon lifecycle + test/staging/promote |
| `@uncaged/pulse-rules` | Shared rule library (future) |
+48
View File
@@ -0,0 +1,48 @@
{
"name": "@uncaged/pulse",
"version": "0.1.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@uncaged/pulse",
"version": "0.1.0",
"license": "MIT",
"devDependencies": {
"@types/node": "^25.6.0",
"typescript": "^6.0.2"
}
},
"node_modules/@types/node": {
"version": "25.6.0",
"resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.0.tgz",
"integrity": "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"undici-types": "~7.19.0"
}
},
"node_modules/typescript": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-6.0.2.tgz",
"integrity": "sha512-bGdAIrZ0wiGDo5l8c++HWtbaNCWTS4UTv7RaTH/ThVIgjkveJt83m74bBHMJkuCbslY8ixgLBVZJIOiQlQTjfQ==",
"dev": true,
"license": "Apache-2.0",
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=14.17"
}
},
"node_modules/undici-types": {
"version": "7.19.2",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.19.2.tgz",
"integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==",
"dev": true,
"license": "MIT"
}
}
}
+33
View File
@@ -0,0 +1,33 @@
{
"name": "@uncaged/pulse",
"version": "0.1.0",
"description": "Pulse core engine — stateful reactive loop with S-combinator rule composition",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "tsc",
"test": "node --test dist/**/*.test.js"
},
"keywords": [
"pulse",
"reactive",
"agent",
"state-machine",
"moore-machine"
],
"author": "oc-xiaoju",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/oc-xiaoju/pulse",
"directory": "packages/pulse"
},
"devDependencies": {
"@types/node": "^25.6.0",
"typescript": "^6.0.2"
}
}
+173
View File
@@ -0,0 +1,173 @@
/**
* @uncaged/pulse — Tests
*/
import { describe, it, mock } from "node:test";
import assert from "node:assert/strict";
import { composeRules, createRule, type Rule } from "./index.js";
// ── Test types ─────────────────────────────────────────────────
interface TestSnapshot {
timestamp: number;
memory: number;
tasks: Record<string, string>;
}
interface TestEffect {
kind: string;
target?: string;
message?: string;
}
// ── composeRules ───────────────────────────────────────────────
describe("composeRules", () => {
it("dummy: no rules → empty effects, default tickMs", async () => {
const pulse = composeRules<TestSnapshot, TestEffect>([], 15000);
const prev: TestSnapshot = { timestamp: 0, memory: 50, tasks: {} };
const curr: TestSnapshot = { timestamp: 1, memory: 50, tasks: {} };
const [effects, tickMs] = await pulse(prev, curr);
assert.deepEqual(effects, []);
assert.equal(tickMs, 15000);
});
it("single rule appends effects", async () => {
const rule: Rule<TestSnapshot, TestEffect> = (prev, curr) => (effects, tickMs) => {
if (curr.tasks["42"] === "assigned") {
return [[...effects, { kind: "dispatch", target: "oc" }], tickMs];
}
return [effects, tickMs];
};
const pulse = composeRules([rule], 15000);
const prev: TestSnapshot = { timestamp: 0, memory: 50, tasks: {} };
const curr: TestSnapshot = { timestamp: 1, memory: 50, tasks: { "42": "assigned" } };
const [effects, tickMs] = await pulse(prev, curr);
assert.equal(effects.length, 1);
assert.equal(effects[0]!.kind, "dispatch");
});
it("S combinator: later rule can modify earlier effects", async () => {
// r1: dispatch task
const r1: Rule<TestSnapshot, TestEffect> = (_prev, _curr) => (effects, tickMs) => {
return [[...effects, { kind: "dispatch", target: "oc" }], tickMs];
};
// r2: resource guard — remove dispatch if memory > 90%
const r2: Rule<TestSnapshot, TestEffect> = (_prev, curr) => (effects, tickMs) => {
if (curr.memory > 90) {
return [effects.filter((e) => e.kind !== "dispatch"), Math.min(tickMs, 5000)];
}
return [effects, tickMs];
};
const pulse = composeRules([r1, r2], 15000);
// Normal memory: dispatch passes through
const [effects1] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 50, tasks: {} },
);
assert.equal(effects1.length, 1);
// High memory: dispatch blocked
const [effects2, tickMs2] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 95, tasks: {} },
);
assert.equal(effects2.length, 0);
assert.equal(tickMs2, 5000);
});
it("rules can adjust tickMs", async () => {
const rule: Rule<TestSnapshot, TestEffect> = (prev, curr) => (effects, tickMs) => {
// Something changed → speed up
if (JSON.stringify(prev.tasks) !== JSON.stringify(curr.tasks)) {
return [effects, 3000];
}
// Nothing changed → slow down
return [effects, Math.min(tickMs * 2, 120000)];
};
const pulse = composeRules([rule], 15000);
const [, tickMs1] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 50, tasks: { "1": "new" } },
);
assert.equal(tickMs1, 3000);
const [, tickMs2] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 50, tasks: {} },
);
assert.equal(tickMs2, 30000);
});
it("async rules work", async () => {
const asyncRule: Rule<TestSnapshot, TestEffect> = (_prev, _curr) => async (effects, tickMs) => {
await new Promise((r) => setTimeout(r, 10));
return [[...effects, { kind: "notify", message: "async!" }], tickMs];
};
const pulse = composeRules([asyncRule]);
const [effects] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 50, tasks: {} },
);
assert.equal(effects.length, 1);
assert.equal(effects[0]!.kind, "notify");
});
});
// ── createRule ─────────────────────────────────────────────────
describe("createRule", () => {
it("accessor-based rule reads a slice of state", async () => {
const guard = createRule<TestSnapshot, TestEffect, number>(
(s) => s.memory,
(prevMem, currMem) => (effects, tickMs) => {
if (currMem > 90) {
return [effects.filter((e) => e.kind !== "dispatch"), 5000];
}
return [effects, tickMs];
},
);
const pulse = composeRules([guard], 15000);
const [effects, tickMs] = await pulse(
{ timestamp: 0, memory: 50, tasks: {} },
{ timestamp: 1, memory: 95, tasks: {} },
);
assert.equal(effects.length, 0);
assert.equal(tickMs, 5000);
});
});
// ── Moore machine property ─────────────────────────────────────
describe("Moore machine property", () => {
it("same (prev, curr) always produces same effects — deterministic", async () => {
const rule: Rule<TestSnapshot, TestEffect> = (prev, curr) => (effects, tickMs) => {
const newTasks = Object.keys(curr.tasks).filter((k) => !(k in prev.tasks));
const newEffects = newTasks.map((t) => ({ kind: "dispatch", target: t }));
return [[...effects, ...newEffects], tickMs];
};
const pulse = composeRules([rule]);
const prev: TestSnapshot = { timestamp: 0, memory: 50, tasks: {} };
const curr: TestSnapshot = { timestamp: 1, memory: 50, tasks: { "42": "new" } };
const [effects1] = await pulse(prev, curr);
const [effects2] = await pulse(prev, curr);
assert.deepEqual(effects1, effects2);
});
});
+160
View File
@@ -0,0 +1,160 @@
/**
* @uncaged/pulse — Core Engine
*
* A stateful reactive loop. Ten lines of runtime.
* All intelligence lives in the rules.
*/
// ── Core Types ─────────────────────────────────────────────────
/**
* Rule: the universal composition primitive.
*
* A rule receives two snapshots (prev, curr) and returns a modifier function
* that takes the accumulated (effects, tickMs) from prior rules and produces
* updated (effects, tickMs).
*
* Rules compose via the S combinator:
* pulse = S r3 . S r2 . S r1 $ dummy
*
* A rule can:
* - Append effects
* - Remove/replace effects (conflict resolution)
* - Adjust tickMs (adaptive frequency)
* - Pass through (identity)
* - Be pure or async (framework doesn't distinguish)
*/
export type Rule<S, E> = (
prev: S,
curr: S,
) => (
effects: E[],
tickMs: number,
) => Promise<[E[], number]> | [E[], number];
/**
* Collector: gathers a snapshot of the world.
*/
export type Collector<S> = () => Promise<S>;
/**
* Effector: executes a batch of effects.
*/
export type Effector<E> = (effects: E[]) => Promise<void>;
// ── Composition ────────────────────────────────────────────────
/**
* Compose rules via S combinator folding.
*
* Given rules [r1, r2, r3], produces:
* S r3 . S r2 . S r1 $ dummy
*
* Where dummy = (prev, curr) => ([], defaultTickMs)
*/
export function composeRules<S, E>(
rules: Rule<S, E>[],
defaultTickMs: number = 15000,
): (prev: S, curr: S) => Promise<[E[], number]> {
return async (prev: S, curr: S): Promise<[E[], number]> => {
let effects: E[] = [];
let tickMs = defaultTickMs;
for (const rule of rules) {
const modifier = rule(prev, curr);
const result = modifier(effects, tickMs);
[effects, tickMs] = result instanceof Promise ? await result : result;
}
return [effects, tickMs];
};
}
// ── Runtime ────────────────────────────────────────────────────
export interface PulseOptions<S, E> {
collect: Collector<S>;
execute: Effector<E>;
rules: Rule<S, E>[];
defaultTickMs?: number;
/** Called on each tick with diagnostics. */
onTick?: (info: {
effects: E[];
tickMs: number;
durationMs: number;
}) => void;
/** Called on errors. Return true to continue, false to stop. */
onError?: (error: unknown) => boolean;
}
/**
* Run the Pulse loop.
*
* collect → pulse → execute → sleep → repeat
*
* Ten lines of logic. All intelligence lives in the rules.
*/
export async function runPulse<S, E>(
options: PulseOptions<S, E>,
): Promise<never> {
const {
collect,
execute,
rules,
defaultTickMs = 15000,
onTick,
onError,
} = options;
const pulse = composeRules(rules, defaultTickMs);
let prev = await collect();
let tickMs = defaultTickMs;
while (true) {
await sleep(tickMs);
const start = Date.now();
try {
const curr = await collect();
const [effects, nextTickMs] = await pulse(prev, curr);
if (effects.length > 0) {
await execute(effects);
}
onTick?.({
effects,
tickMs: nextTickMs,
durationMs: Date.now() - start,
});
tickMs = nextTickMs;
prev = curr;
} catch (err) {
if (onError && !onError(err)) {
throw err;
}
// On error, keep prev and tickMs unchanged, retry next tick.
}
}
}
// ── Helpers ────────────────────────────────────────────────────
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Create a rule from an accessor + pure logic.
* Avoids the need for contramap — adaptation happens at construction time.
*/
export function createRule<S, E, T>(
accessor: (s: S) => T,
logic: (
prev: T,
curr: T,
) => (effects: E[], tickMs: number) => Promise<[E[], number]> | [E[], number],
): Rule<S, E> {
return (prev, curr) => logic(accessor(prev), accessor(curr));
}
+16
View File
@@ -0,0 +1,16 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "bundler",
"declaration": true,
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"types": ["node"]
},
"include": ["src"]
}