布隆过滤器
{Back to Index}
Table of Contents
1 为什么要使用布隆过滤器
如果要经常判断元素是否存在,通常会使用哈希表(HashSet、HashMap),虽然时间复杂度可以达到 \(O(1)\) ,但是空间利用率不高,需要占用较多的内存资源。
当数据规模增大,甚至是达到海量级别,显然,这时 HashSet、HashMap 不再是好的选择,可以考虑使用布隆过滤器(Bloom Filter)。
2 布隆过滤器特点
布隆过滤器是一个空间效率高的 概率型 数据结构,可以用来判断一个元素 一定不存在 或者 可能存在 。
虽然布隆过滤器空间效率和查询时间都远远超过一般的算法,但它存在一定的 误判率 ,而且不支持或较难实现删除的功能。 尽管存在误判率,但误判率是 可以控制的 。
3 原理
布隆过滤器的基础操作有两个:添加,查询。
- 添加元素
将每一个哈希函数生成的索引位置都设为 1 - 查询元素是否存在
- 如果有一个哈希函数生成的索引位置不为 1,就代表不存在 (100%准确)
- 如果每一个哈希函数生成的索引位置都为 1,就代表存在 (存在一定的误判率)
4 误判率计算
误判率 p 受 3 个因素影响:二进制位的个数 m ,哈希函数的个数 k ,数据规模 n ,误判率 p 的公式如下:
\[\left( 1 - e^{-\frac{kn}{m}} \right)\]
4.1 bit 位数计算
已知误判率 p ,数据规模 n ,则 bit 位数 m 可以以下公式计算得到:
\[ m = -\frac{n\ln p}{(\ln2)^2} \]
4.2 哈希函数个数计算
\[ k = \frac{m}{n}\ln2 \]
5 代码实现
public class BloomFilter<T> { /** * 二进制向量的长度(一共有多少个二进制位) */ private int bitSize; /** * 二进制向量 */ private long[] bits; /** * 哈希函数的个数 */ private int hashSize; /** * 布隆过滤器的构造 * @param n 数据规模 * @param p 误判率, 取值范围(0, 1) */ public BloomFilter(int n, double p){ if (n <= 0 || p <= 0 || p >= 1) { // 非法输入检测 throw new IllegalArgumentException("wrong n or p"); } // 根据公式求出对应的数据 double ln2 = Math.log(2); // 求出二进制向量的长度 bitSize = (int) (- (n * Math.log(p)) / (ln2 * ln2)); hashSize = (int) (bitSize * ln2 / n); System.err.println("bitSize: " + bitSize); System.err.println("hashSize: " + hashSize); // bits数组的长度 bits = new long[(bitSize + Long.SIZE - 1) / Long.SIZE]; // 分页公式 // 分页问题: // 每一页显示 100 条数据, pageSize = 100 // 一共有 999999 条数据, n = 999999 // 总页数: pageCount = (n + pageSize - 1) / pageSize } /** * 添加元素 */ public boolean put(@NonNull T value) { int hash1 = value.hashCode(); int hash2 = hash1 >>> 16; boolean result = false; for (int i = 1; i <= hashSize; i++) { int combinedHash = hash1 + (i * hash2); if (combinedHash < 0) { combinedHash = ~combinedHash; } int index = combinedHash % bitSize; // 设置 index 位置的 bit 为 1 if (set(index)) result = true; } return result; } /** * 判断一个元素是否存在 */ public boolean contains(@NonNull T value) { int hash1 = value.hashCode(); int hash2 = hash1 >>> 16; for (int i = 1; i <= hashSize; i++) { int combinedHash = hash1 + (i * hash2); if (combinedHash < 0) { combinedHash = ~combinedHash; } int index = combinedHash % bitSize; if (!get(index)) return false; } return true; } /** * 设置index位置的bit为1 */ private boolean set(int index){ long originValue = bits[index / Long.SIZE]; bits[index / Long.SIZE] |= (1 << (index % Long.SIZE)); return originValue != bits[index / Long.SIZE]; } /** * 查看index位置的二进制的值 * @param index * @return true代表1, false代表0 */ private boolean get(int index) { long value = bits[index / Long.SIZE]; return (value & (1 << (index % Long.SIZE))) != 0; } }