布隆过滤器
{Back to Index}

Table of Contents

1 为什么要使用布隆过滤器

如果要经常判断元素是否存在,通常会使用哈希表(HashSet、HashMap),虽然时间复杂度可以达到 \(O(1)\) ,但是空间利用率不高,需要占用较多的内存资源。

当数据规模增大,甚至是达到海量级别,显然,这时 HashSet、HashMap 不再是好的选择,可以考虑使用布隆过滤器(Bloom Filter)。

2 布隆过滤器特点

布隆过滤器是一个空间效率高的 概率型 数据结构,可以用来判断一个元素 一定不存在 或者 可能存在

虽然布隆过滤器空间效率和查询时间都远远超过一般的算法,但它存在一定的 误判率 ,而且不支持或较难实现删除的功能。 尽管存在误判率,但误判率是 可以控制的

3 原理

布隆过滤器的基础操作有两个:添加,查询。

  • 添加元素
    将每一个哈希函数生成的索引位置都设为 1
  • 查询元素是否存在
    • 如果有一个哈希函数生成的索引位置不为 1,就代表不存在 (100%准确)
    • 如果每一个哈希函数生成的索引位置都为 1,就代表存在 (存在一定的误判率)

4 误判率计算

误判率 p 受 3 个因素影响:二进制位的个数 m ,哈希函数的个数 k ,数据规模 n ,误判率 p 的公式如下:

\[\left( 1 - e^{-\frac{kn}{m}} \right)\]

4.1 bit 位数计算

已知误判率 p ,数据规模 n ,则 bit 位数 m 可以以下公式计算得到:

\[ m = -\frac{n\ln p}{(\ln2)^2} \]

4.2 哈希函数个数计算

\[ k = \frac{m}{n}\ln2 \]

5 代码实现

public class BloomFilter<T> {
    /**
     * 二进制向量的长度(一共有多少个二进制位)
     */
    private int bitSize;
    /**
     * 二进制向量
     */
    private long[] bits;
    /**
     * 哈希函数的个数
     */
    private int hashSize;

    /**
     * 布隆过滤器的构造
     * @param n 数据规模
     * @param p 误判率, 取值范围(0, 1)
     */
    public BloomFilter(int n, double p){
        if (n <= 0 || p <= 0 || p >= 1) { // 非法输入检测
            throw new IllegalArgumentException("wrong n or p");
        }

        // 根据公式求出对应的数据
        double ln2 = Math.log(2);
        // 求出二进制向量的长度
        bitSize = (int) (- (n * Math.log(p)) / (ln2 * ln2));
        hashSize = (int) (bitSize * ln2 / n);
        System.err.println("bitSize: " + bitSize);
        System.err.println("hashSize: " + hashSize);
        // bits数组的长度
        bits = new long[(bitSize + Long.SIZE - 1) / Long.SIZE]; // 分页公式


        // 分页问题:
        // 每一页显示 100 条数据, pageSize = 100
        // 一共有 999999 条数据, n = 999999
        // 总页数: pageCount = (n + pageSize - 1) / pageSize
    }

    /**
     * 添加元素
     */
    public boolean put(@NonNull T value) {
        int hash1 = value.hashCode();
        int hash2 = hash1 >>> 16;

        boolean result = false;
        for (int i = 1; i <= hashSize; i++) {
            int combinedHash = hash1 + (i * hash2);
            if (combinedHash < 0) {
                combinedHash = ~combinedHash;
            }
            int index = combinedHash % bitSize;
            // 设置 index 位置的 bit 为 1
            if (set(index)) result = true;
        }
        return result;
    }

    /**
     * 判断一个元素是否存在
     */
    public boolean contains(@NonNull T value) {
        int hash1 = value.hashCode();
        int hash2 = hash1 >>> 16;

        for (int i = 1; i <= hashSize; i++) {
            int combinedHash = hash1 + (i * hash2);
            if (combinedHash < 0) {
                combinedHash = ~combinedHash;
            }
            int index = combinedHash % bitSize;
            if (!get(index)) return false;
        }
        return true;
    }

    /**
     * 设置index位置的bit为1
     */
    private boolean set(int index){
        long originValue = bits[index / Long.SIZE];
        bits[index / Long.SIZE] |= (1 << (index % Long.SIZE));
        return originValue != bits[index / Long.SIZE];
    }

    /**
     * 查看index位置的二进制的值
     * @param index
     * @return true代表1, false代表0
     */
    private boolean get(int index) {
        long value = bits[index / Long.SIZE];
        return (value & (1 << (index % Long.SIZE))) != 0;
    }

}

Author: Hao Ruan (ruanhao1116@gmail.com)

Created: 2021-03-27 Sat 16:58

Updated: 2021-08-17 Tue 11:23

Emacs 27.1 (Org mode 9.3)