Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 4: BloomFilterSketch
Browse files Browse the repository at this point in the history
Implement BloomFilterSketch.
  • Loading branch information
Chungmin Lee committed Aug 2, 2021
1 parent 6ca6cc7 commit f8202d9
Show file tree
Hide file tree
Showing 23 changed files with 1,321 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.sketch

import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId}
import org.apache.spark.sql.types.DataType

import com.microsoft.hyperspace.index.dataskipping.util._
import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray

/**
* Sketch based on a bloom filter for a given expression.
*
* Being a probabilistic structure, it is more efficient in terms of the index
* data size than [[ValueListSketch]] if the number of distinct values for the
* expression is large, but can be less efficient in terms of query optimization
* than [[ValueListSketch]] due to false positives.
*
* Users can specify the target false positive rate and the expected number of
* distinct values per file. These variables determine the size of the bloom
* filters and thus the size of the index data.
*
* @param expr Expression this sketch is based on
* @param fpp Target false positive rate
* @param expectedDistinctCountPerFile Expected number of distinct values per file
*/
case class BloomFilterSketch(
override val expr: String,
fpp: Double,
expectedDistinctCountPerFile: Long,
override val dataType: Option[DataType] = None)
extends SingleExprSketch[BloomFilterSketch](expr, dataType) {
override def name: String = "BloomFilter"

override def toString: String = s"$name($expr, $fpp, $expectedDistinctCountPerFile)"

override def withNewExpression(newExpr: (String, Option[DataType])): BloomFilterSketch = {
copy(expr = newExpr._1, dataType = newExpr._2)
}

override def aggregateFunctions: Seq[Expression] = {
BloomFilterAgg(parsedExpr, expectedDistinctCountPerFile, fpp).toAggregateExpression() :: Nil
}

override def convertPredicate(
predicate: Expression,
sketchValues: Seq[Expression],
nameMap: Map[ExprId, String],
resolvedExprs: Seq[Expression]): Option[Expression] = {
val bf = sketchValues(0)
val resolvedExpr = resolvedExprs.head
val dataType = resolvedExpr.dataType
val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap)
val ExprEqualTo = EqualToExtractor(exprMatcher)
val ExprIn = InExtractor(exprMatcher)
val ExprInSet = InSetExtractor(exprMatcher)
Option(predicate).collect {
case ExprEqualTo(v) => BloomFilterMightContain(bf, v)
case ExprIn(vs) =>
BloomFilterMightContainAny(bf, toArray(vs.map(_.eval()), dataType), dataType)
case ExprInSet(vs) =>
BloomFilterMightContainAny(bf, toArray(vs.filter(_ != null).toSeq, dataType), dataType)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Aggregation function that collects elements in a bloom filter.
*/
case class BloomFilterAgg(
child: Expression,
expectedNumItems: Long,
fpp: Double,
override val mutableAggBufferOffset: Int = 0,
override val inputAggBufferOffset: Int = 0)
extends TypedImperativeAggregate[BloomFilter] {

private def bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder

override def prettyName: String = "bloom_filter"

override def dataType: DataType = bloomFilterEncoder.dataType

override def nullable: Boolean = false

override def children: Seq[Expression] = Seq(child)

override def createAggregationBuffer(): BloomFilter = {
BloomFilter.create(expectedNumItems, fpp)
}

override def update(buffer: BloomFilter, input: InternalRow): BloomFilter = {
val value = child.eval(input)
if (value != null) {
BloomFilterUtils.put(buffer, value, child.dataType)
}
buffer
}

override def merge(buffer: BloomFilter, input: BloomFilter): BloomFilter = {
buffer.mergeInPlace(input)
buffer
}

override def eval(buffer: BloomFilter): Any = bloomFilterEncoder.encode(buffer)

override def serialize(buffer: BloomFilter): Array[Byte] = {
val out = new ByteArrayOutputStream()
buffer.writeTo(out)
out.toByteArray
}

override def deserialize(bytes: Array[Byte]): BloomFilter = {
val in = new ByteArrayInputStream(bytes)
BloomFilter.readFrom(in)
}

override def withNewMutableAggBufferOffset(newOffset: Int): BloomFilterAgg =
copy(mutableAggBufferOffset = newOffset)

override def withNewInputAggBufferOffset(newOffset: Int): BloomFilterAgg =
copy(inputAggBufferOffset = newOffset)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Defines how [[BloomFilter]] should be represented in the Spark DataFrame.
*/
trait BloomFilterEncoder {

/**
* Returns the data type of the value in the DataFrame representing [[BloomFilter]].
*/
def dataType: DataType

/**
* Returns a value representing the given [[BloomFilter]]
* that can be put in the [[InternalRow]].
*/
def encode(bf: BloomFilter): Any

/**
* Returns a [[BloomFilter]] from the value in the DataFrame.
*/
def decode(value: Any): BloomFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

/**
* Provides the default implementation of [[BloomFilterEncoder]].
*/
object BloomFilterEncoderProvider {

/**
* Returns the default encoder.
*
* It should return a singleton object declared as "object".
*/
def defaultEncoder: BloomFilterEncoder = FastBloomFilterEncoder
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._

/**
* Returns true if the bloom filter (left) might contain the value (right).
*
* The bloom filter and the value must not be null.
*/
case class BloomFilterMightContain(left: Expression, right: Expression)
extends BinaryExpression
with Predicate {

override def prettyName: String = "bloom_filter_might_contain"

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val bfData = left.eval(input)
val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData)
val value = right.eval(input)
BloomFilterUtils.mightContain(bf, value, right.dataType)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val leftGen = left.genCode(ctx)
val rightGen = right.genCode(ctx)
val bloomFilterEncoder =
BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$")
val bf = s"$bloomFilterEncoder.decode(${leftGen.value})"
val result = s"${BloomFilterUtils.mightContainCodegen(bf, rightGen.value, right.dataType)}"
ev.copy(
code = code"""
${leftGen.code}
${rightGen.code}
boolean ${ev.value} = $result;""",
isNull = FalseLiteral)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import scala.collection.immutable.TreeSet

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

/**
* Returns true if the bloom filter (child) might contain one of the values.
*
* The bloom filter must not be null.
* The values must be an array without nulls.
* If the element type can be represented as a primitive type in Scala,
* then the array must be an array of the primitive type.
*/
case class BloomFilterMightContainAny(child: Expression, values: Any, elementType: DataType)
extends UnaryExpression
with Predicate {

override def prettyName: String = "bloom_filter_might_contain_any"

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val bfData = child.eval(input)
val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData)
values
.asInstanceOf[Array[_]]
.exists(BloomFilterUtils.mightContain(bf, _, elementType))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx);
val bloomFilterEncoder =
BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$")
val bf = ctx.freshName("bf")
val bfType = classOf[BloomFilter].getCanonicalName
val javaType = CodeGenerator.javaType(elementType)
val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]"
val valuesRef = ctx.addReferenceObj("values", values, arrayType)
val valuesArray = ctx.freshName("values")
val i = ctx.freshName("i")
val mightContain =
BloomFilterUtils.mightContainCodegen(bf, s"($javaType) $valuesArray[$i]", elementType)
val resultCode =
s"""
|$bfType $bf = $bloomFilterEncoder.decode(${childGen.value});
|$arrayType $valuesArray = $valuesRef;
|for (int $i = 0; $i < $valuesArray.length; $i++) {
| if ($mightContain) {
| ${ev.value} = true;
| break;
| }
|}
""".stripMargin
ev.copy(
code = code"""
${childGen.code}
boolean ${ev.value} = false;
$resultCode""",
isNull = FalseLiteral)
}

override def equals(that: Any): Boolean = {
that match {
case BloomFilterMightContainAny(thatChild, thatValues, thatElementType) =>
child == thatChild &&
values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]]) &&
elementType == thatElementType
case _ => false
}
}

override def hashCode: Int = {
(child, values.asInstanceOf[Array[_]].toSeq, elementType).hashCode
}
}
Loading

0 comments on commit f8202d9

Please sign in to comment.