Skip to content

teragrep/blf_02

Repository files navigation

BLF_02: Teragrep Bloom Filter Plugin for MariaDB

This package provides two user-defined functions (UDFs) for MySQL to efficiently work with Bloom filters:

  • bloommatch function to compare two bloom filters if one is contained in the other.

  • bloomupdate function to combine two bloom filters.

These UDFs enable efficient querying and manipulation of Bloom filters stored in MySQL. Bloom filters are represented as arrays of bytes in little-endian order.

License: Apache

Installation

Install the blf_02 package.

yum install blf_02.rpm

Enabling

Option 1 — Execute the pre-made query

mariadb < /opt/teragrep/blf_02/share/installdb.sql

Option 2 — Execute the queries manually

USE mysql;

DROP FUNCTION IF EXISTS bloommatch;
DROP FUNCTION IF EXISTS bloomupdate;
CREATE FUNCTION bloommatch RETURNS integer SONAME 'lib_mysqludf_bloom.so';
CREATE FUNCTION bloomupdate RETURNS STRING SONAME 'lib_mysqludf_bloom.so';

Disabling

Option 1 — Execute the pre-made query

mariadb < /opt/teragrep/blf_02/share/uninstalldb.sql

Option 2 — Execute the queries manually

USE mysql;

DROP FUNCTION IF EXISTS bloommatch;
DROP FUNCTION IF EXISTS bloomupdate;

Functions

Match Function

This function performs a byte-by-bytes check of (a & b == a). If true, then a may be found in b. If false then a is not in b.

Function in SQL:

bloommatch(blob a, blob b)

A Java example of how the function is used:

Connection con = ... // Get the db connection
InputStream is = ... // Input stream containing the bloom filter to locate in the table
PreparedStatement stmt = con.prepareStatement( "SELECT * FROM bloomTable WHERE bloommatch( ?, bloomTable.filter );" );
stmt.setBlob( 1, is );
ResultSet rs = stmt.executeQuery();
// Result set now contains all the matching bloom filters from the table.

Update Function

This function performs a byte-by-byte construct of a new filter where a | b.

Function in SQL:

bloomupdate( blob a, blob b )

A Java example of how the function is used:

Connection con = ... // Get the db connection
InputStream is = ... // Input stream containing the bloom filter to locate in the table
PreparedStatement stmt = con.prepareStatement( "UPDATE bloomTable SET filter=bloomupdate( ?, bloomTable.filter ) WHERE id=?;" );
stmt.setBlob( 1, is );
stmt.setInt( 2, 5 );
stmt.executeUpdate();
// Bloom filters on rows with id of 5 have been updated to include values from the blob.

Development

MySQL client and server headers are required to compile this code.

Please do the following in the root directory of the source tree:

aclocal
autoconf
autoheader
automake --add-missing

./configure
make
sudo make install
sudo make installdb

To remove the library from your system:

make uninstalldb
make uninstall

Spark Example

A short demo of how to use blf_02 in practice by using Apache Spark and Scala.

Creating and Storing Bloom Filter to a Database

In the following example, we generate a Bloom Filter from a Spark DataFrame and store its serialized form in a database for later use.

The filter is stored in a table alongside a string value. When searching for a token, we can first check the filter before checking the value.

// Generate and upload a spark bloomfilter to a database

import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.sql.DriverManager
import org.apache.spark.util.sketch.BloomFilter
import java.io.{ByteArrayOutputStream,ByteArrayInputStream, ObjectOutputStream, InputStream}

// Filter parameters
val expected: Long = 500
val fpp: Double = 0.3

val dburl = "DATABASE_URL"
val updatesql = "INSERT INTO `example_strings` (`value`, `filter`) VALUES (?,?)"
val conn = DriverManager.getConnection(dburl,"DB_USERNAME","DB_PASSWORD")
val value = "one two three"

// Create a Spark Dataframe with values 'one', 'two' and 'three'
// This emulates a tokenized form of the value field
val in1 = spark.sparkContext.parallelize(List("one","two","three"))
val df = in1.toDF("tokens")

val ps = conn.prepareStatement(updatesql)

// Create a bloomfilter from the Dataframe
val filter = df.stat.bloomFilter($"tokens", expected, fpp)
println(filter.mightContain("one"))

// Write a filter bit array to the output stream
val baos = new ByteArrayOutputStream
filter.writeTo(baos)
val is: InputStream = new ByteArrayInputStream(baos.toByteArray())
ps.setString(1, value)
ps.setBlob(2,is)
val update = ps.executeUpdate
println("Updated rows: "+ update)
df.show()
conn.close()

Finding Matching Filters

A Bloom Filter is created from a Spark DataFrame and compared with stored filters in the database to retrieve matching string values. Note that each comparison generates a new Bloom Filter for the SQL function.

Imagine we want to search if a value contains tokens one and two from the previous example.

// Create a bloomfilter and find matches
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.sql.DriverManager
import org.apache.spark.util.sketch.BloomFilter
import java.io.{ByteArrayOutputStream,ByteArrayInputStream, ObjectOutputStream, InputStream}

// Generated filter array must have the same length as the one it is compared to
val expected: Long = 500
val fpp: Double = 0.3

val dburl = "DATABASE_URL"
val conn = DriverManager.getConnection(dburl,"DB_USERNAME","DB_PASSWORD")

val updatesql = "SELECT `value` FROM `example_strings` WHERE bloommatch(?, `example_strings`.`filter`);"
val ps = conn.prepareStatement(updatesql)

// Creating a filter with values 'one' and 'two'
val in2 = spark.sparkContext.parallelize(List("one","two"))
val df2 = in2.toDF("tokens")
val filter = df2.stat.bloomFilter($"tokens", expected, fpp)

val baos = new ByteArrayOutputStream
            filter.writeTo(baos)
            baos.flush()
            val is :InputStream = new ByteArrayInputStream(baos.toByteArray())
            ps.setBlob(1, is)
            val rs = ps.executeQuery

// Will find a match since tokens searched are both in the filter
val resultList = Iterator.from(0).takeWhile(_ => rs.next()).map(_ => rs.getString(1)).toList
println("Found matches: " + resultList.size)
conn.close()

Contributing

You can involve yourself with our project by opening an issue or submitting a pull request.

Contribution requirements:

  1. All changes must be accompanied by a new or changed test. If you think testing is not required in your pull request, include a sufficient explanation as why you think so.

  2. Security checks must pass

  3. Pull requests must align with the principles and values of extreme programming.

  4. Pull requests must follow the principles of Object Thinking and Elegant Objects (EO).

Read more in our Contributing Guideline.

Contributor License Agreement

Contributors must sign Teragrep Contributor License Agreement before a pull request is accepted to organization’s repositories.

You need to submit the CLA only once. After submitting the CLA you can contribute to all Teragrep’s repositories.