From f8202d93617050d33f235fb3353f9561752ec898 Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Wed, 28 Jul 2021 04:22:10 +0900 Subject: [PATCH] Data Skipping Index Part 4: BloomFilterSketch Implement BloomFilterSketch. --- .../sketch/BloomFilterSketch.scala | 79 +++++++++ .../dataskipping/util/BloomFilterAgg.scala | 83 ++++++++++ .../util/BloomFilterEncoder.scala | 42 +++++ .../util/BloomFilterEncoderProvider.scala | 30 ++++ .../util/BloomFilterMightContain.scala | 58 +++++++ .../util/BloomFilterMightContainAny.scala | 97 +++++++++++ .../dataskipping/util/BloomFilterUtils.scala | 57 +++++++ .../util/FastBloomFilterEncoder.scala | 58 +++++++ .../dataskipping/util/ReflectionHelper.scala | 51 ++++++ .../util/StreamBloomFilterEncoder.scala | 40 +++++ .../hyperspace/util/SparkTestShims.scala | 6 + .../hyperspace/util/SparkTestShims.scala | 6 + .../dataskipping/BloomFilterTestUtils.scala | 40 +++++ .../DataSkippingIndexConfigTest.scala | 27 ++- .../DataSkippingIndexIntegrationTest.scala | 67 ++++++++ .../rule/ApplyDataSkippingIndexTest.scala | 26 +++ .../sketch/BloomFilterSketchTest.scala | 154 ++++++++++++++++++ .../util/BloomFilterAggTest.scala | 51 ++++++ .../util/BloomFilterMightContainAnyTest.scala | 65 ++++++++ .../util/BloomFilterMightContainTest.scala | 60 +++++++ .../util/BloomFilterUtilsTest.scala | 149 +++++++++++++++++ .../util/FastBloomFilterEncoderTest.scala | 38 +++++ .../util/StreamBloomFilterEncoderTest.scala | 38 +++++ 23 files changed, 1321 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketch.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAgg.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoder.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoderProvider.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContain.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAny.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtils.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoder.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoder.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketchTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAggTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAnyTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtilsTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoderTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoderTest.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketch.scala new file mode 100644 index 000000000..5da60e97c --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketch.scala @@ -0,0 +1,79 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketch + +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId} +import org.apache.spark.sql.types.DataType + +import com.microsoft.hyperspace.index.dataskipping.util._ +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +/** + * Sketch based on a bloom filter for a given expression. + * + * Being a probabilistic structure, it is more efficient in terms of the index + * data size than [[ValueListSketch]] if the number of distinct values for the + * expression is large, but can be less efficient in terms of query optimization + * than [[ValueListSketch]] due to false positives. + * + * Users can specify the target false positive rate and the expected number of + * distinct values per file. These variables determine the size of the bloom + * filters and thus the size of the index data. + * + * @param expr Expression this sketch is based on + * @param fpp Target false positive rate + * @param expectedDistinctCountPerFile Expected number of distinct values per file + */ +case class BloomFilterSketch( + override val expr: String, + fpp: Double, + expectedDistinctCountPerFile: Long, + override val dataType: Option[DataType] = None) + extends SingleExprSketch[BloomFilterSketch](expr, dataType) { + override def name: String = "BloomFilter" + + override def toString: String = s"$name($expr, $fpp, $expectedDistinctCountPerFile)" + + override def withNewExpression(newExpr: (String, Option[DataType])): BloomFilterSketch = { + copy(expr = newExpr._1, dataType = newExpr._2) + } + + override def aggregateFunctions: Seq[Expression] = { + BloomFilterAgg(parsedExpr, expectedDistinctCountPerFile, fpp).toAggregateExpression() :: Nil + } + + override def convertPredicate( + predicate: Expression, + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + resolvedExprs: Seq[Expression]): Option[Expression] = { + val bf = sketchValues(0) + val resolvedExpr = resolvedExprs.head + val dataType = resolvedExpr.dataType + val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap) + val ExprEqualTo = EqualToExtractor(exprMatcher) + val ExprIn = InExtractor(exprMatcher) + val ExprInSet = InSetExtractor(exprMatcher) + Option(predicate).collect { + case ExprEqualTo(v) => BloomFilterMightContain(bf, v) + case ExprIn(vs) => + BloomFilterMightContainAny(bf, toArray(vs.map(_.eval()), dataType), dataType) + case ExprInSet(vs) => + BloomFilterMightContainAny(bf, toArray(vs.filter(_ != null).toSeq, dataType), dataType) + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAgg.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAgg.scala new file mode 100644 index 000000000..ddff9d0b0 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAgg.scala @@ -0,0 +1,83 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Aggregation function that collects elements in a bloom filter. + */ +case class BloomFilterAgg( + child: Expression, + expectedNumItems: Long, + fpp: Double, + override val mutableAggBufferOffset: Int = 0, + override val inputAggBufferOffset: Int = 0) + extends TypedImperativeAggregate[BloomFilter] { + + private def bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder + + override def prettyName: String = "bloom_filter" + + override def dataType: DataType = bloomFilterEncoder.dataType + + override def nullable: Boolean = false + + override def children: Seq[Expression] = Seq(child) + + override def createAggregationBuffer(): BloomFilter = { + BloomFilter.create(expectedNumItems, fpp) + } + + override def update(buffer: BloomFilter, input: InternalRow): BloomFilter = { + val value = child.eval(input) + if (value != null) { + BloomFilterUtils.put(buffer, value, child.dataType) + } + buffer + } + + override def merge(buffer: BloomFilter, input: BloomFilter): BloomFilter = { + buffer.mergeInPlace(input) + buffer + } + + override def eval(buffer: BloomFilter): Any = bloomFilterEncoder.encode(buffer) + + override def serialize(buffer: BloomFilter): Array[Byte] = { + val out = new ByteArrayOutputStream() + buffer.writeTo(out) + out.toByteArray + } + + override def deserialize(bytes: Array[Byte]): BloomFilter = { + val in = new ByteArrayInputStream(bytes) + BloomFilter.readFrom(in) + } + + override def withNewMutableAggBufferOffset(newOffset: Int): BloomFilterAgg = + copy(mutableAggBufferOffset = newOffset) + + override def withNewInputAggBufferOffset(newOffset: Int): BloomFilterAgg = + copy(inputAggBufferOffset = newOffset) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoder.scala new file mode 100644 index 000000000..a368319bb --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoder.scala @@ -0,0 +1,42 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Defines how [[BloomFilter]] should be represented in the Spark DataFrame. + */ +trait BloomFilterEncoder { + + /** + * Returns the data type of the value in the DataFrame representing [[BloomFilter]]. + */ + def dataType: DataType + + /** + * Returns a value representing the given [[BloomFilter]] + * that can be put in the [[InternalRow]]. + */ + def encode(bf: BloomFilter): Any + + /** + * Returns a [[BloomFilter]] from the value in the DataFrame. + */ + def decode(value: Any): BloomFilter +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoderProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoderProvider.scala new file mode 100644 index 000000000..428d33d58 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterEncoderProvider.scala @@ -0,0 +1,30 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +/** + * Provides the default implementation of [[BloomFilterEncoder]]. + */ +object BloomFilterEncoderProvider { + + /** + * Returns the default encoder. + * + * It should return a singleton object declared as "object". + */ + def defaultEncoder: BloomFilterEncoder = FastBloomFilterEncoder +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContain.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContain.scala new file mode 100644 index 000000000..16f1245cc --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContain.scala @@ -0,0 +1,58 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ + +/** + * Returns true if the bloom filter (left) might contain the value (right). + * + * The bloom filter and the value must not be null. + */ +case class BloomFilterMightContain(left: Expression, right: Expression) + extends BinaryExpression + with Predicate { + + override def prettyName: String = "bloom_filter_might_contain" + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val bfData = left.eval(input) + val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData) + val value = right.eval(input) + BloomFilterUtils.mightContain(bf, value, right.dataType) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val leftGen = left.genCode(ctx) + val rightGen = right.genCode(ctx) + val bloomFilterEncoder = + BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$") + val bf = s"$bloomFilterEncoder.decode(${leftGen.value})" + val result = s"${BloomFilterUtils.mightContainCodegen(bf, rightGen.value, right.dataType)}" + ev.copy( + code = code""" + ${leftGen.code} + ${rightGen.code} + boolean ${ev.value} = $result;""", + isNull = FalseLiteral) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAny.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAny.scala new file mode 100644 index 000000000..1c7e3d079 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAny.scala @@ -0,0 +1,97 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import scala.collection.immutable.TreeSet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.sketch.BloomFilter + +/** + * Returns true if the bloom filter (child) might contain one of the values. + * + * The bloom filter must not be null. + * The values must be an array without nulls. + * If the element type can be represented as a primitive type in Scala, + * then the array must be an array of the primitive type. + */ +case class BloomFilterMightContainAny(child: Expression, values: Any, elementType: DataType) + extends UnaryExpression + with Predicate { + + override def prettyName: String = "bloom_filter_might_contain_any" + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val bfData = child.eval(input) + val bf = BloomFilterEncoderProvider.defaultEncoder.decode(bfData) + values + .asInstanceOf[Array[_]] + .exists(BloomFilterUtils.mightContain(bf, _, elementType)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx); + val bloomFilterEncoder = + BloomFilterEncoderProvider.defaultEncoder.getClass.getCanonicalName.stripSuffix("$") + val bf = ctx.freshName("bf") + val bfType = classOf[BloomFilter].getCanonicalName + val javaType = CodeGenerator.javaType(elementType) + val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]" + val valuesRef = ctx.addReferenceObj("values", values, arrayType) + val valuesArray = ctx.freshName("values") + val i = ctx.freshName("i") + val mightContain = + BloomFilterUtils.mightContainCodegen(bf, s"($javaType) $valuesArray[$i]", elementType) + val resultCode = + s""" + |$bfType $bf = $bloomFilterEncoder.decode(${childGen.value}); + |$arrayType $valuesArray = $valuesRef; + |for (int $i = 0; $i < $valuesArray.length; $i++) { + | if ($mightContain) { + | ${ev.value} = true; + | break; + | } + |} + """.stripMargin + ev.copy( + code = code""" + ${childGen.code} + boolean ${ev.value} = false; + $resultCode""", + isNull = FalseLiteral) + } + + override def equals(that: Any): Boolean = { + that match { + case BloomFilterMightContainAny(thatChild, thatValues, thatElementType) => + child == thatChild && + values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]]) && + elementType == thatElementType + case _ => false + } + } + + override def hashCode: Int = { + (child, values.asInstanceOf[Array[_]].toSeq, elementType).hashCode + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtils.scala new file mode 100644 index 000000000..250b23008 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtils.scala @@ -0,0 +1,57 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.HyperspaceException + +object BloomFilterUtils { + def put(bf: BloomFilter, value: Any, dataType: DataType): Boolean = + dataType match { + case LongType => bf.putLong(value.asInstanceOf[Long]) + case IntegerType => bf.putLong(value.asInstanceOf[Int]) + case ByteType => bf.putLong(value.asInstanceOf[Byte]) + case ShortType => bf.putLong(value.asInstanceOf[Short]) + case StringType => bf.putBinary(value.asInstanceOf[UTF8String].getBytes) + case BinaryType => bf.putBinary(value.asInstanceOf[Array[Byte]]) + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + + def mightContain(bf: BloomFilter, value: Any, dataType: DataType): Boolean = { + dataType match { + case LongType => bf.mightContainLong(value.asInstanceOf[Long]) + case IntegerType => bf.mightContainLong(value.asInstanceOf[Int]) + case ByteType => bf.mightContainLong(value.asInstanceOf[Byte]) + case ShortType => bf.mightContainLong(value.asInstanceOf[Short]) + case StringType => bf.mightContainBinary(value.asInstanceOf[UTF8String].getBytes) + case BinaryType => bf.mightContainBinary(value.asInstanceOf[Array[Byte]]) + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + } + + def mightContainCodegen(bf: String, value: String, dataType: DataType): String = { + dataType match { + case LongType | IntegerType | ByteType | ShortType => s"$bf.mightContainLong($value)" + case StringType => s"$bf.mightContainBinary(($value).getBytes())" + case BinaryType => s"$bf.mightContainBinary($value)" + case _ => throw HyperspaceException(s"BloomFilter does not support ${dataType}") + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoder.scala new file mode 100644 index 000000000..0d133b548 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoder.scala @@ -0,0 +1,58 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StructField, StructType} +import org.apache.spark.util.sketch.BloomFilter + +/** + * A [[BloomFilterEncoder]] implementation that avoids copying arrays. + */ +object FastBloomFilterEncoder extends BloomFilterEncoder with ReflectionHelper { + override val dataType: StructType = StructType( + StructField("numHashFunctions", IntegerType, nullable = false) :: + StructField("bitCount", LongType, nullable = false) :: + StructField("data", ArrayType(LongType, containsNull = false), nullable = false) :: Nil) + + override def encode(bf: BloomFilter): InternalRow = { + val bloomFilterImplClass = bf.getClass + val bits = get(bloomFilterImplClass, "bits", bf) + val bitArrayClass = bits.getClass + InternalRow( + getInt(bloomFilterImplClass, "numHashFunctions", bf), + getLong(bitArrayClass, "bitCount", bits), + ArrayData.toArrayData(get(bitArrayClass, "data", bits).asInstanceOf[Array[Long]])) + } + + override def decode(value: Any): BloomFilter = { + val struct = value.asInstanceOf[InternalRow] + val numHashFunctions = struct.getInt(0) + val bitCount = struct.getLong(1) + val data = struct.getArray(2).toLongArray() + + val bf = BloomFilter.create(1) + val bloomFilterImplClass = bf.getClass + val bits = get(bloomFilterImplClass, "bits", bf) + val bitArrayClass = bits.getClass + setInt(bloomFilterImplClass, "numHashFunctions", bf, numHashFunctions) + setLong(bitArrayClass, "bitCount", bits, bitCount) + set(bitArrayClass, "data", bits, data) + bf + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala new file mode 100644 index 000000000..aecc5b9c4 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ReflectionHelper.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import java.lang.reflect.Field + +trait ReflectionHelper { + def getAccesibleDeclaredField(clazz: Class[_], name: String): Field = { + val field = clazz.getDeclaredField(name) + field.setAccessible(true) + field + } + + def get(clazz: Class[_], fieldName: String, obj: Any): Any = { + getAccesibleDeclaredField(clazz, fieldName).get(obj) + } + + def getInt(clazz: Class[_], fieldName: String, obj: Any): Int = { + getAccesibleDeclaredField(clazz, fieldName).getInt(obj) + } + + def getLong(clazz: Class[_], fieldName: String, obj: Any): Long = { + getAccesibleDeclaredField(clazz, fieldName).getLong(obj) + } + + def set(clazz: Class[_], fieldName: String, obj: Any, value: Any): Unit = { + getAccesibleDeclaredField(clazz, fieldName).set(obj, value) + } + + def setInt(clazz: Class[_], fieldName: String, obj: Any, value: Int): Unit = { + getAccesibleDeclaredField(clazz, fieldName).setInt(obj, value) + } + + def setLong(clazz: Class[_], fieldName: String, obj: Any, value: Long): Unit = { + getAccesibleDeclaredField(clazz, fieldName).setLong(obj, value) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoder.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoder.scala new file mode 100644 index 000000000..39f6f79de --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoder.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.util.sketch.BloomFilter + +/** + * A [[BloomFilterEncoder]] implementation based on byte array streams. + */ +object StreamBloomFilterEncoder extends BloomFilterEncoder { + val dataType: BinaryType = BinaryType + + def encode(bf: BloomFilter): Array[Byte] = { + val out = new ByteArrayOutputStream() + bf.writeTo(out) + out.toByteArray + } + + def decode(value: Any): BloomFilter = { + val in = new ByteArrayInputStream(value.asInstanceOf[Array[Byte]]) + BloomFilter.readFrom(in) + } +} diff --git a/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala b/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala index 532634a3b..fe04d7f58 100644 --- a/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala +++ b/src/test/scala-spark2/com/microsoft/hyperspace/util/SparkTestShims.scala @@ -16,6 +16,8 @@ package com.microsoft.hyperspace.util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.command.ExplainCommand @@ -32,4 +34,8 @@ object SparkTestShims { ExplainCommand(logicalPlan, extended = false) } } + + def fromRow[T](encoder: ExpressionEncoder[T], row: InternalRow): T = { + encoder.fromRow(row) + } } diff --git a/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala b/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala index 556b167a2..9e475f54e 100644 --- a/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala +++ b/src/test/scala-spark3/com/microsoft/hyperspace/util/SparkTestShims.scala @@ -16,6 +16,8 @@ package com.microsoft.hyperspace.util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.SimpleMode @@ -33,4 +35,8 @@ object SparkTestShims { ExplainCommand(logicalPlan, SimpleMode) } } + + def fromRow[T](encoder: ExpressionEncoder[T], row: InternalRow): T = { + encoder.createDeserializer().apply(row) + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala new file mode 100644 index 000000000..3a1f606ab --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/BloomFilterTestUtils.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.dataskipping.util.BloomFilterEncoderProvider +import com.microsoft.hyperspace.util.SparkTestShims + +trait BloomFilterTestUtils { + def encodeExternal(bf: BloomFilter): Any = { + val bloomFilterEncoder = BloomFilterEncoderProvider.defaultEncoder + val data = bloomFilterEncoder.encode(bf) + val dataType = bloomFilterEncoder.dataType + dataType match { + case st: StructType => + SparkTestShims.fromRow(RowEncoder(st).resolveAndBind(), data.asInstanceOf[InternalRow]) + case _ => + val encoder = RowEncoder(StructType(StructField("x", dataType) :: Nil)).resolveAndBind() + SparkTestShims.fromRow(encoder, InternalRow(data)).get(0) + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala index 944fb8eca..e418cfe10 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala @@ -19,12 +19,13 @@ package com.microsoft.hyperspace.index.dataskipping import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.{input_file_name, max, min} import org.apache.spark.sql.types.{LongType, StringType} +import org.apache.spark.util.sketch.BloomFilter import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.IndexConstants import com.microsoft.hyperspace.index.dataskipping.sketch._ -class DataSkippingIndexConfigTest extends DataSkippingSuite { +class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTestUtils { test("indexName returns the index name.") { val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A")) assert(indexConfig.indexName === "myIndex") @@ -85,6 +86,30 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite { checkAnswer(indexData, withFileId(expectedSketchValues)) } + test("createIndex works correctly with a BloomFilterSketch.") { + val sourceData = createSourceData(spark.range(100).toDF("A")) + val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20)) + val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map()) + assert(index.sketches === Seq(BloomFilterSketch("A", 0.001, 20, Some(LongType)))) + val valuesAndBloomFilters = indexData + .collect() + .map { row => + val fileId = row.getAs[Long](IndexConstants.DATA_FILE_NAME_ID) + val filePath = fileIdTracker.getIdToFileMapping().toMap.apply(fileId) + val values = spark.read.parquet(filePath).collect().toSeq.map(_.getLong(0)) + val bfData = row.getAs[Any]("BloomFilter_A__0.001__20__0") + (values, bfData) + } + valuesAndBloomFilters.foreach { + case (values, bfData) => + val bf = BloomFilter.create(20, 0.001) + values.foreach(bf.put) + assert(bfData === encodeExternal(bf)) + } + assert( + indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "BloomFilter_A__0.001__20__0")) + } + test("createIndex resolves column names and data types.") { val sourceData = createSourceData(spark.range(10).toDF("Foo")) val indexConfig = DataSkippingIndexConfig("MyIndex", MinMaxSketch("foO")) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala index 137f12315..1c8f323c1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala @@ -175,6 +175,59 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite { checkIndexApplied(query, numParallelism + 1) } + test("BloomFilter index is applied for a filter query (EqualTo).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test( + "BloomFilter index is applied for a filter query (EqualTo) " + + "where some source data files has only null values.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq[Integer](1, 2, 3, null, 5, null, 7, 8, 9, null).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("BloomFilter index is applied for a filter query (In).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A in (1, 11, 19)") + checkIndexApplied(query, 2) + } + } + } + + test("BloomFilter index support string type.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(('a' to 'z').map(_.toString).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10))) + def query: DataFrame = df.filter("A = 'a'") + checkIndexApplied(query, 1) + } + } + } + + test("BloomFilter index does not support double type.") { + val df = createSourceData((0 until 10).map(_.toDouble).toDF("A")) + val ex = intercept[SparkException]( + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.01, 10)))) + assert(ex.getCause().getMessage().contains("BloomFilter does not support DoubleType")) + } + test( "DataSkippingIndex works correctly for CSV where the same source data files can be " + "interpreted differently.") { @@ -253,6 +306,20 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite { } } + test( + "BloomFilter index can be applied without refresh when source files are deleted " + + "if hybrid scan is enabled.") { + withSQLConf( + IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true", + IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> "1") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", BloomFilterSketch("A", 0.001, 10))) + deleteFile(listFiles(dataPath()).filter(isParquet).head.getPath) + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A in (25, 50, 75)") + checkIndexApplied(query, 3) + } + } + test("Empty source data does not cause an error.") { val df = createSourceData(spark.range(0).toDF("A")) hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala index b51381e15..2e64c57ed 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala @@ -235,6 +235,27 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "!(A < 20)", MinMaxSketch("A"), 8), Param(dataI, "not (A not in (1, 2, 3))", MinMaxSketch("A"), 1), Param(dataS, "A < 'foo'", MinMaxSketch("A"), 1), + Param(dataS, "A in ('foo1', 'foo5', 'foo9')", BloomFilterSketch("A", 0.01, 10), 3), + Param( + dataS, + "A in ('foo1','goo1','hoo1','i1','j','k','l','m','n','o','p')", + BloomFilterSketch("A", 0.01, 10), + 1), + Param(dataI, "A = 10", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A <=> 20", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A <=> null", BloomFilterSketch("A", 0.01, 10), 10), + Param(dataI, "A in (2, 3, 5, 7, 11, 13, 17, 19)", BloomFilterSketch("A", 0.001, 10), 2), + Param( + dataI, + "A in (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)", + BloomFilterSketch("A", 0.001, 10), + 3), + Param( + dataIN, + "A in (0,1,10,100,1000,10000,100000,1000000,-1,-2,-3,-4,-5,-6,-7,-8,null)", + BloomFilterSketch("A", 0.001, 10), + 1), + Param(dataI, "A != 10", BloomFilterSketch("A", 0.001, 10), 10), Param(dataI, "a = 10", MinMaxSketch("A"), 1), Param(dataI, "A = 10", MinMaxSketch("a"), 1), Param(dataI, "A in (1, 2, 3, 10)", MinMaxSketch("A"), 2), @@ -262,6 +283,11 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataII, "A < 30 and B > 20", MinMaxSketch("A"), 3), Param(dataII, "A < 30 and b > 40", Seq(MinMaxSketch("a"), MinMaxSketch("B")), 1), Param(dataII, "A = 10 and B = 90", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 0), + Param( + dataII, + "A < 31 and B in (1, 2, 11, 12, 21, 22)", + Seq(MinMaxSketch("A"), BloomFilterSketch("B", 0.001, 10)), + 2), Param(dataIN, "A is not null", MinMaxSketch("A"), 7), Param(dataIN, "!(A <=> null)", MinMaxSketch("A"), 7), Param(dataIN, "A = 2", MinMaxSketch("A"), 1), diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketchTest.scala new file mode 100644 index 000000000..9e469e2ca --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/BloomFilterSketchTest.scala @@ -0,0 +1,154 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketch + +import org.apache.spark.sql.{Column, QueryTest} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.BloomFilterTestUtils +import com.microsoft.hyperspace.index.dataskipping.util._ + +class BloomFilterSketchTest extends QueryTest with HyperspaceSuite with BloomFilterTestUtils { + import spark.implicits._ + + test("indexedColumns returns the indexed column.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.indexedColumns === Seq("A")) + } + + test("referencedColumns returns the indexed column.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.referencedColumns === Seq("A")) + } + + test( + "aggregateFunctions returns an aggregation function that collects values in a bloom filter.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val aggrs = sketch.aggregateFunctions.map(new Column(_)) + assert(aggrs.length === 1) + val data = Seq(1, -1, 10, 2, 4, 2, 0, 10) + val bf = BloomFilter.create(100, 0.01) + data.foreach(bf.put) + val bfData = data.toDF("A").select(aggrs.head).collect()(0).getAs[Any](0) + assert(bfData === encodeExternal(bf)) + } + + test("toString returns a reasonable string.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + assert(sketch.toString === "BloomFilter(A, 0.01, 100)") + } + + test("Two sketches are equal if their columns are equal.") { + assert(BloomFilterSketch("A", 0.01, 100) === BloomFilterSketch("A", 0.001, 1000)) + assert(BloomFilterSketch("A", 0.01, 100) !== BloomFilterSketch("a", 0.01, 100)) + assert(BloomFilterSketch("b", 0.01, 100) !== BloomFilterSketch("B", 0.01, 100)) + assert(BloomFilterSketch("B", 0.01, 100) === BloomFilterSketch("B", 0.001, 1000)) + } + + test("hashCode is reasonably implemented.") { + assert( + BloomFilterSketch("A", 0.01, 100).hashCode === BloomFilterSketch("A", 0.001, 1000).hashCode) + assert( + BloomFilterSketch("A", 0.01, 100).hashCode !== BloomFilterSketch("a", 0.001, 1000).hashCode) + } + + test("covertPredicate converts EqualTo.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(BloomFilterMightContain(sketchValues(0), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts EqualTo - string type.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = + Some(BloomFilterMightContain(sketchValues(0), Literal.create("hello", StringType))) + assert(result === expected) + } + + test("covertPredicate converts In.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + In(AttributeReference("A", IntegerType)(ExprId(0)), Seq(Literal(42), Literal(23))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = + Some(BloomFilterMightContainAny(sketchValues(0), Array(42, 23), IntegerType)) + assert(result === expected) + } + + test("covertPredicate converts In - string type.") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = + In( + AttributeReference("A", StringType)(ExprId(0)), + Seq(Literal.create("hello", StringType), Literal.create("world", StringType))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some( + BloomFilterMightContainAny( + sketchValues(0), + Array(UTF8String.fromString("hello"), UTF8String.fromString("world")), + StringType)) + assert(result === expected) + } + + test("covertPredicate does not convert Not(EqualTo(, )).") { + val sketch = BloomFilterSketch("A", 0.01, 100) + val predicate = Not(EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42))) + val sketchValues = Seq(UnresolvedAttribute("bf")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = None + assert(result === expected) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAggTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAggTest.scala new file mode 100644 index 000000000..54e842e69 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterAggTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.BloomFilterTestUtils + +class BloomFilterAggTest extends HyperspaceSuite with BloomFilterTestUtils { + import spark.implicits._ + + test("BloomFilterAgg computes BloomFilter correctly.") { + val n = 10000 + val m = 3000 + val fpp = 0.01 + + val agg = new Column(BloomFilterAgg(col("a").expr, m, fpp).toAggregateExpression()) + val df = spark + .range(n) + .toDF("a") + .filter(col("a") % 3 === 0) + .union(Seq[Integer](null).toDF("a")) + .agg(agg) + val bfData = df.collect()(0).getAs[Any](0) + + val expectedBf = BloomFilter.create(m, fpp) + for (i <- 0 until n) { + if (i % 3 == 0) { + expectedBf.put(i.toLong) + } + } + assert(bfData === encodeExternal(expectedBf)) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAnyTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAnyTest.scala new file mode 100644 index 000000000..14bc84857 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainAnyTest.scala @@ -0,0 +1,65 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.types._ +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +class BloomFilterMightContainAnyTest extends HyperspaceSuite { + def test(values: Seq[Any], dataType: DataType): Unit = { + val bf = BloomFilter.create(values.length, 0.01) + val bfData = Literal( + BloomFilterEncoderProvider.defaultEncoder.encode(bf), + BloomFilterEncoderProvider.defaultEncoder.dataType) + for (k <- 1 to 3) { + values.grouped(k).foreach { vs => + val valuesArray = toArray(values.map(Literal.create(_, dataType).eval()), dataType) + assert( + BloomFilterMightContainAny(bfData, valuesArray, dataType).eval() === vs.contains( + bf.mightContain(_))) + } + } + } + + test("BloomFilterMightContainAny works correctly for an int array.") { + test((0 until 1000).map(_ * 2), IntegerType) + } + + test("BloomFilterMightContainAny works correctly for a long array.") { + test((0L until 1000L).map(_ * 2), LongType) + } + + test("BloomFilterMightContainAny works correctly for a byte array.") { + test(Seq(0, 1, 3, 7, 15, 31, 63, 127).map(_.toByte), ByteType) + } + + test("BloomFilterMightContainAny works correctly for a short array.") { + test(Seq(1, 3, 5, 7, 9).map(_.toShort), ShortType) + } + + test("BloomFilterMightContainAny works correctly for a string array.") { + test(Seq("hello", "world", "foo", "bar"), StringType) + } + + test("BloomFilterMightContainAny works correctly for a binary array.") { + test(Seq(Array[Byte](1, 2), Array[Byte](3, 4)), BinaryType) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainTest.scala new file mode 100644 index 000000000..e133d5f16 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterMightContainTest.scala @@ -0,0 +1,60 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.types._ +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class BloomFilterMightContainTest extends HyperspaceSuite { + def test(values: Seq[Any], dataType: DataType): Unit = { + val bf = BloomFilter.create(values.length, 0.01) + val bfData = Literal( + BloomFilterEncoderProvider.defaultEncoder.encode(bf), + BloomFilterEncoderProvider.defaultEncoder.dataType) + values.foreach { v => + val lit = Literal.create(v, dataType) + assert(BloomFilterMightContain(bfData, lit).eval() === bf.mightContain(v)) + } + } + + test("BloomFilterMightContain works correctly for an int array.") { + test((0 until 100000).map(_ * 2), IntegerType) + } + + test("BloomFilterMightContain works correctly for a long array.") { + test((0L until 100000L).map(_ * 2), LongType) + } + + test("BloomFilterMightContain works correctly for a byte array.") { + test(Seq(0, 1, 3, 7, 15, 31, 63, 127).map(_.toByte), ByteType) + } + + test("BloomFilterMightContain works correctly for a short array.") { + test(Seq(1, 3, 5, 7, 9).map(_.toShort), ShortType) + } + + test("BloomFilterMightContain works correctly for a string array.") { + test(Seq("hello", "world", "foo", "bar"), StringType) + } + + test("BloomFilterMightContain works correctly for a binary array.") { + test(Seq(Array[Byte](1, 2), Array[Byte](3, 4)), BinaryType) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtilsTest.scala new file mode 100644 index 000000000..c9a369543 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/BloomFilterUtilsTest.scala @@ -0,0 +1,149 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.HyperspaceSuite + +class BloomFilterUtilsTest extends HyperspaceSuite { + + def testPut(value: Any, dataType: DataType): Unit = { + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, dataType) + val expected = BloomFilter.create(100, 0.01) + expected.put(value) + assert(bf === expected) + } + + test("put: long") { + testPut(10L, LongType) + } + + test("put: int") { + testPut(10, IntegerType) + } + + test("put: byte") { + testPut(10.toByte, ByteType) + } + + test("put: short") { + testPut(10.toShort, ShortType) + } + + test("put: string") { + val value = UTF8String.fromString("hello") + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, StringType) + val expected = BloomFilter.create(100, 0.01) + expected.put(value.getBytes) + assert(bf === expected) + } + + test("put: binary") { + testPut(Array[Byte](1, 2, 3, 4), BinaryType) + } + + test("put throws an exception for unsupported types.") { + val ex = intercept[HyperspaceException](testPut(3.14, DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } + + def testMightContain(value: Any, value2: Any, dataType: DataType): Unit = { + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, dataType) + assert(BloomFilterUtils.mightContain(bf, value, dataType) === bf.mightContain(value)) + assert(BloomFilterUtils.mightContain(bf, value2, dataType) === bf.mightContain(value2)) + } + + test("mightContain: int") { + testMightContain(1, 0, IntegerType) + } + + test("mightContain: long") { + testMightContain(1L, 0L, LongType) + } + + test("mightContain: byte") { + testMightContain(1.toByte, 0.toByte, ByteType) + } + + test("mightContain: short") { + testMightContain(1.toShort, 0.toShort, ShortType) + } + + test("mightContain: string") { + val value = UTF8String.fromString("hello") + val value2 = UTF8String.fromString("world") + val bf = BloomFilter.create(100, 0.01) + BloomFilterUtils.put(bf, value, StringType) + assert( + BloomFilterUtils.mightContain(bf, value, StringType) === bf.mightContain(value.getBytes)) + assert( + BloomFilterUtils.mightContain(bf, value2, StringType) === bf.mightContain(value2.getBytes)) + } + + test("mightContain: binary") { + testMightContain(Array[Byte](1, 2), Array[Byte](3, 4), BinaryType) + } + + test("mightContain throws an exception for unsupported types.") { + val bf = BloomFilter.create(100, 0.01) + val ex = intercept[HyperspaceException](BloomFilterUtils.mightContain(bf, 3.14, DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } + + test("mightContainCodegen: int") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", IntegerType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: long") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", LongType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: byte") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", ByteType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: short") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", ShortType) + assert(code === "fb.mightContainLong(vl)") + } + + test("mightContainCodegen: string") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", StringType) + assert(code === "fb.mightContainBinary((vl).getBytes())") + } + + test("mightContainCodegen: binary") { + val code = BloomFilterUtils.mightContainCodegen("fb", "vl", BinaryType) + assert(code === "fb.mightContainBinary(vl)") + } + + test("mightContainCodegen throws an exception for unsupported types.") { + val ex = + intercept[HyperspaceException](BloomFilterUtils.mightContainCodegen("fb", "vl", DoubleType)) + assert(ex.msg.contains("BloomFilter does not support DoubleType")) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoderTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoderTest.scala new file mode 100644 index 000000000..d49bca3d9 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/FastBloomFilterEncoderTest.scala @@ -0,0 +1,38 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class FastBloomFilterEncoderTest extends HyperspaceSuite { + test("encode and decode restores empty bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + val data = FastBloomFilterEncoder.encode(bf) + val bf2 = FastBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } + + test("encode and decode restores the original bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + bf.put(42) + val data = FastBloomFilterEncoder.encode(bf) + val bf2 = FastBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoderTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoderTest.scala new file mode 100644 index 000000000..8b459fc0a --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/StreamBloomFilterEncoderTest.scala @@ -0,0 +1,38 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.util.sketch.BloomFilter + +import com.microsoft.hyperspace.index.HyperspaceSuite + +class StreamBloomFilterEncoderTest extends HyperspaceSuite { + test("encode and decode restores empty bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + val data = StreamBloomFilterEncoder.encode(bf) + val bf2 = StreamBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } + + test("encode and decode restores the original bloom filter.") { + val bf = BloomFilter.create(100, 0.01) + bf.put(42) + val data = StreamBloomFilterEncoder.encode(bf) + val bf2 = StreamBloomFilterEncoder.decode(data) + assert(bf2 === bf) + } +}