Bloom Filterを書く

「大規模データセットのアルゴリズムとデータ構造」という本があります。 私は大学で情報工学を学びました。アルゴリズムとデータ構造は必修科目でした。

その当時学んだことと、この本の内容で異なるのは、この本が「大規模データ」を対象としたアルゴリズムやデータ構造である点です。

「大規模データ」に対応する代わりに欠点も持ち合わせており、そのトレードオフをコントロールすることで実用的なアルゴリズムにしているのが面白い点ですね。

今回はこの本の第3章で紹介される「ブルームフィルタ」をRustで実装してみました。

use std::{f64::consts::LN_2, marker::PhantomData};

use murmur3;

#[derive(Debug)]
pub struct BloomFilter<T> where T: AsRef<[u8]> + Clone {
    m_bit_array_length: usize,

    k_hash_functions_count: usize,

    bit_array: Vec<bool>,

    _marker: PhantomData<T>,  
}

fn optimal_m_bit_array_size(n_max_factors: usize, p_false_positive_rate: f64) -> usize {
    let m = - (p_false_positive_rate as f64).ln() * n_max_factors as f64 / LN_2.powi(2);
    m.ceil() as usize
}

fn optimal_k_hash_functions_count(n_max_factors: usize, m_bit_array_length: usize) -> usize {
    let k = m_bit_array_length as f64 * LN_2 / n_max_factors as f64;
    k.ceil() as usize
}

impl<T> BloomFilter<T> where T: AsRef<[u8]> + Clone {
    pub fn new(n_max_factors: usize, f_false_positive_rate: f64) -> Self {
        let m_bit_array_length = optimal_m_bit_array_size(n_max_factors, f_false_positive_rate);
        let k_hash_functions_count = optimal_k_hash_functions_count(n_max_factors, m_bit_array_length);

        BloomFilter {
            m_bit_array_length,
            k_hash_functions_count,
            bit_array: vec![false; m_bit_array_length],
            _marker: PhantomData,
        }
    }

    pub fn insert(&mut self, item: &T) {
        for i in 0..self.k_hash_functions_count {
            let mut cursor = std::io::Cursor::new(item.clone());
            let index = murmur3::murmur3_x64_128(&mut cursor, i as u32).unwrap() as usize % self.m_bit_array_length;
            self.bit_array[index] = true;
        }
    }

    pub fn lookup(&self, item: &T) -> bool {
        for i in 0..self.k_hash_functions_count{
            let mut cursor = std::io::Cursor::new(item.clone());
            let index = murmur3::murmur3_x64_128(&mut cursor, i as u32).unwrap() as usize % self.m_bit_array_length;
            if !self.bit_array[index] {
                return false;
            }
        }
        true
    }

    pub fn m_bit_array_length(&self) -> usize {
        self.m_bit_array_length
    }
}

Rustで構造体の型パラメータを使用する際、構造体のフィールドでその型を使用しないとエラーとなります。しかし、今回のようにメソッド定義の一部のにも型パラメータを使用する際には、PhantomDataという長さ0の構造体を使用することができるそうです(今回の実装のために調べて知りました)。

使い方はこんな感じです。

#![cfg(test)]

use std::{collections::HashSet, io::BufRead};

use crate::bloom_filter::BloomFilter;

#[test]
fn test(){
    let mut bf = BloomFilter::new(10, 0.01);
    dbg!(&bf);
    bf.insert(&"1".to_string());
    bf.insert(&"2".to_string());
    bf.insert(&"42".to_string());

    dbg!(bf.lookup(&"1".to_string()));
    dbg!(bf.lookup(&"2".to_string()));
    dbg!(bf.lookup(&"3".to_string()));
    dbg!(bf.lookup(&"42".to_string()));
    dbg!(bf.lookup(&"43".to_string()));
}

#[test]
fn test_search_data(){
    // create bloom filter
    let mut bf = BloomFilter::new(10_usize.pow(6), 0.02);

    let mut all_data =HashSet::new();

    // import data from dummy_data.txt by steasming line by line
    let file = std::fs::File::open("dummy_data.txt").unwrap();
    let reader = std::io::BufReader::new(file);
    for line in reader.lines() {
        let line = line.unwrap();
        bf.insert(&line);
        all_data.insert(line);
    }

    let num_test = 1000;
    let mut false_positives = 0;

    println!("start search test...");
    let start = std::time::Instant::now();
    for _ in 0..num_test{
        // test by random data
        let random_number: u32 = rand::random::<u32>() % 10_000_000 ;
        let test_string = random_number.to_string();
        let exists_in_bf = bf.lookup(&test_string);
        let exists_in_set = all_data.contains(&test_string);

        if exists_in_bf && !exists_in_set {
            false_positives += 1;
        } 
    }

    println!("False positive rate: {}", false_positives as f64 / num_test as f64);
    println!("Elapsed time: {:?}", start.elapsed());


    let all_data_vec = all_data.iter().cloned().collect::<Vec<String>>();
    println!("start positive data performance test...");
    let start = std::time::Instant::now();
    for _ in 0..num_test{
        // test from existing data(all positive )
        let random_index = rand::random::<u64>() % all_data.len() as u64;
        let test_string = all_data_vec[random_index as usize].clone();
        let exists_in_bf = bf.lookup(&test_string);
        assert!(exists_in_bf);
    }

    println!("Elapsed time for positive data: {:?}", start.elapsed());
}

#[test]
fn test_size(){
    let bf_normal = BloomFilter::<String>::new(10_usize.pow(6), 0.02);
    println!("Bitsize for normal bloom filter: {}", bf_normal.m_bit_array_length());

    let bf_small_fp = BloomFilter::<String>::new(10_usize.pow(4), 0.02);
    println!("Bitsize for small false positive bloom filter: {}", bf_small_fp.m_bit_array_length());

    let bf_large_fp = BloomFilter::<String>::new(10_usize.pow(8), 0.02);
    println!("Bitsize for large false positive bloom filter: {}", bf_large_fp.m_bit_array_length());
}

実行するにはダミーデータの生成が必要です。例えばこのような生成スクリプトを考えられます。

#!/usr/bin/env python3
"""Generate 10**6 random integers in [0, 10**12] and write to dummy_data.txt."""

import random
from pathlib import Path

COUNT = 10 ** 6
LOWER = 0
UPPER = 10 ** 12
OUTPUT = Path("dummy_data.txt")
CHUNK_SIZE = 10_000


def main() -> None:
    rng = random.Random()
    with OUTPUT.open("w", encoding="utf-8") as file:
        buffer: list[str] = []
        for _ in range(COUNT):
            buffer.append(str(rng.randint(LOWER, UPPER)))
            if len(buffer) >= CHUNK_SIZE:
                file.write("\n".join(buffer))
                file.write("\n")
                buffer.clear()
        if buffer:
            file.write("\n".join(buffer))
            file.write("\n")


if __name__ == "__main__":
    main()

内容については本を読んでいただければと思います。第1部はハッシュ関数を活用した確率的データ構造について解説されています。アルゴリズム、情報科学音面白さにあふれた本だと思います。

是非手に取ってみてください。