diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketch.scala new file mode 100644 index 000000000..18701b633 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketch.scala @@ -0,0 +1,96 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketch + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.types.{ArrayType, DataType} + +import com.microsoft.hyperspace.index.dataskipping.util._ +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +/** + * Sketch based on distinct values for a given expression. + * + * This is not really a sketch, as it stores all distinct values for a given + * expression. It can be useful when the number of distinct values is expected to + * be small and each file tends to store only a subset of the values. + */ +case class ValueListSketch( + override val expr: String, + override val dataType: Option[DataType] = None) + extends SingleExprSketch[ValueListSketch](expr, dataType) { + override def name: String = "ValueList" + + override def withNewExpression(newExpr: (String, Option[DataType])): ValueListSketch = { + copy(expr = newExpr._1, dataType = newExpr._2) + } + + override def aggregateFunctions: Seq[Expression] = + new ArraySort(CollectSet(parsedExpr).toAggregateExpression()) :: Nil + + override def convertPredicate( + predicate: Expression, + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + resolvedExprs: Seq[Expression]): Option[Expression] = { + val valueList = sketchValues(0) + val min = ElementAt(valueList, Literal(1)) + val max = ElementAt(valueList, Literal(-1)) + // TODO: Consider shared sketches + // HasNullSketch as described in MinMaxSketch.convertPredicate + // can be useful for ValueListSketch too, as it can be used to + // to optimize Not(EqualTo) as well as IsNull. + val resolvedExpr = resolvedExprs.head + val dataType = resolvedExpr.dataType + val ordering = TypeUtils.getInterpretedOrdering(dataType) + val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap) + val ExprIsTrue = IsTrueExtractor(exprMatcher) + val ExprIsFalse = IsFalseExtractor(exprMatcher) + val ExprIsNotNull = IsNotNullExtractor(exprMatcher) + val ExprEqualTo = EqualToExtractor(exprMatcher) + val ExprLessThan = LessThanExtractor(exprMatcher) + val ExprLessThanOrEqual = LessThanOrEqualToExtractor(exprMatcher) + val ExprGreaterThan = GreaterThanExtractor(exprMatcher) + val ExprGreaterThanOrEqual = GreaterThanOrEqualToExtractor(exprMatcher) + val ExprIn = InExtractor(exprMatcher) + val ExprInSet = InSetExtractor(exprMatcher) + def Empty(arr: Expression) = EqualTo(Size(arr), Literal(0)) + Option(predicate).collect { + case ExprIsTrue() => ArrayContains(valueList, Literal(true)) + case ExprIsFalse() => ArrayContains(valueList, Literal(false)) + case ExprIsNotNull() => Not(Empty(valueList)) + case ExprEqualTo(v) => SortedArrayContains(valueList, v) + case Not(ExprEqualTo(v)) => + Or( + GreaterThan(Size(valueList), Literal(1)), + Not(EqualTo(ElementAt(valueList, Literal(1)), v))) + case ExprLessThan(v) => LessThan(min, v) + case ExprLessThanOrEqual(v) => LessThanOrEqual(min, v) + case ExprGreaterThan(v) => GreaterThan(max, v) + case ExprGreaterThanOrEqual(v) => GreaterThanOrEqual(max, v) + case ExprIn(vs) => + SortedArrayContainsAny(valueList, toArray(vs.map(_.eval()).sorted(ordering), dataType)) + case ExprInSet(vs) => + SortedArrayContainsAny( + valueList, + toArray(vs.filter(_ != null).toArray.sorted(ordering), dataType)) + // TODO: StartsWith, Like with constant prefix + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContains.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContains.scala new file mode 100644 index 000000000..871c3de19 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContains.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} +import org.apache.spark.sql.types.BooleanType + +/** + * Returns true if the sorted array (left) might contain the value (right). + * + * The array must not be null. + * Elements in the array must be in ascending order. + * The array must not contain null elements. + * The array must not contain duplicate elements. + * The value must not be null. + */ +case class SortedArrayContains(left: Expression, right: Expression) + extends BinaryExpression + with Predicate { + + override def prettyName: String = "sorted_array_contains" + + @transient private lazy val ordering: Ordering[Any] = + TypeUtils.getInterpretedOrdering(right.dataType) + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val arr = left.eval(input).asInstanceOf[ArrayData] + val value = right.eval(input) + val dt = right.dataType + val n = arr.numElements() + if (n > 0 && + ordering.lteq(arr.get(0, dt), value) && + ordering.lteq(value, arr.get(n - 1, dt))) { + val (found, _) = SortedArrayUtils.binarySearch(arr, dt, ordering, 0, n, value) + if (found) return true + } + false + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val leftGen = left.genCode(ctx) + val arr = leftGen.value + val rightGen = right.genCode(ctx) + val value = rightGen.value + val dt = right.dataType + val n = ctx.freshName("n") + val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt) + val resultCode = + s""" + |int $n = $arr.numElements(); + |if ($n > 0 && + | !(${ctx.genGreater(dt, CodeGenerator.getValue(arr, dt, "0"), value)}) && + | !(${ctx.genGreater(dt, value, CodeGenerator.getValue(arr, dt, s"$n - 1"))})) { + | ${ev.value} = $binarySearch($arr, 0, $n, $value).found(); + |} + """.stripMargin + ev.copy( + code = code""" + ${leftGen.code} + ${rightGen.code} + boolean ${ev.value} = false; + $resultCode""", + isNull = FalseLiteral) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAny.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAny.scala new file mode 100644 index 000000000..199397a95 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAny.scala @@ -0,0 +1,141 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} +import org.apache.spark.sql.types.{ArrayType, BooleanType} + +/** + * Returns true if the sorted array (child) contains any of the values. + * + * If either array is empty, false is returned. + * + * Both arrays must not be null. + * Elements in the arrays must be in ascending order. + * The left array should not contain duplicate elements. + * The arrays must not contain null elements. + * + * If the element type can be represented as a primitive type in Scala, + * then the right array must be an array of the primitive type. + */ +case class SortedArrayContainsAny(child: Expression, values: Any) + extends UnaryExpression + with Predicate { + + override def prettyName: String = "sorted_array_contains_any" + + @transient private lazy val ordering: Ordering[Any] = + TypeUtils.getInterpretedOrdering(child.dataType.asInstanceOf[ArrayType].elementType) + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val arr1 = child.eval(input).asInstanceOf[ArrayData] + val arr2 = values.asInstanceOf[Array[_]] + val dt = child.dataType.asInstanceOf[ArrayType].elementType + val n = arr1.numElements() + val m = arr2.length + if (n > 0 && m > 0 && + ordering.lteq(arr1.get(0, dt), arr2(m - 1)) && + ordering.lteq(arr2(0), arr1.get(n - 1, dt))) { + var i = 0 + var j = 0 + do { + val v = arr1.get(i, dt) + while (j < m && ordering.lt(arr2(j), v)) j += 1 + if (j == m) return false + val u = arr2(j) + j += 1 + val (found, k) = SortedArrayUtils.binarySearch(arr1, dt, ordering, i, n, u) + if (found) return true + if (k == n) return false + i = k + } while (j < m) + } + false + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val arr1 = childGen.value + val arr2 = ctx.freshName("values") + val dt = child.dataType.asInstanceOf[ArrayType].elementType + val javaType = CodeGenerator.javaType(dt) + val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]" + val valuesRef = ctx.addReferenceObj("values", values, arrayType) + val n = ctx.freshName("n") + val m = ctx.freshName("m") + val i = ctx.freshName("i") + val j = ctx.freshName("j") + val v = ctx.freshName("v") + val u = ctx.freshName("u") + val result = ctx.freshName("result") + val binarySearchResultType = + SortedArrayUtils.BinarySearchResult.getClass.getCanonicalName.stripSuffix("$") + val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt) + import CodeGenerator.getValue + val resultCode = + s""" + |int $n = $arr1.numElements(); + |int $m = $arr2.length; + |if ($n > 0 && $m > 0 && + | !(${ctx.genGreater(dt, getValue(arr1, dt, "0"), s"(($javaType) $arr2[$m - 1])")}) && + | !(${ctx.genGreater(dt, s"(($javaType) $arr2[0])", getValue(arr1, dt, s"$n - 1"))})) { + | int $i = 0; + | int $j = 0; + | do { + | $javaType $v = ${getValue(arr1, dt, i)}; + | while ($j < $m && ${ctx.genGreater(dt, v, s"(($javaType) $arr2[$j])")}) $j += 1; + | if ($j == $m) break; + | $javaType $u = ($javaType) $arr2[$j]; + | $j += 1; + | $binarySearchResultType $result = $binarySearch($arr1, $i, $n, $u); + | if ($result.found()) { + | ${ev.value} = true; + | break; + | } + | if ($result.index() == $n) break; + | $i = $result.index(); + | } while ($j < $m); + |} + """.stripMargin + ev.copy( + code = code""" + ${childGen.code} + $arrayType $arr2 = $valuesRef; + boolean ${ev.value} = false; + $resultCode""", + isNull = FalseLiteral) + } + + override def equals(that: Any): Boolean = { + that match { + case SortedArrayContainsAny(thatChild, thatValues) => + child == thatChild && + values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]]) + case _ => false + } + } + + override def hashCode: Int = { + (child, values.asInstanceOf[Array[_]].toSeq).hashCode + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala index e418cfe10..25b3890d3 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala @@ -17,8 +17,8 @@ package com.microsoft.hyperspace.index.dataskipping import org.apache.hadoop.fs.Path -import org.apache.spark.sql.functions.{input_file_name, max, min} -import org.apache.spark.sql.types.{LongType, StringType} +import org.apache.spark.sql.functions.{array_sort, collect_set, input_file_name, max, min} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} import org.apache.spark.util.sketch.BloomFilter import com.microsoft.hyperspace.HyperspaceException @@ -86,6 +86,19 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTest checkAnswer(indexData, withFileId(expectedSketchValues)) } + test("createIndex works correctly with a ValueListSketch.") { + val sourceData = + createSourceData(spark.range(100).selectExpr("cast(id / 10 as int) as A").toDF) + val indexConfig = DataSkippingIndexConfig("MyIndex", ValueListSketch("A")) + val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map()) + assert(index.sketches === Seq(ValueListSketch("A", Some(IntegerType)))) + val expectedSketchValues = sourceData + .groupBy(input_file_name().as(fileNameCol)) + .agg(array_sort(collect_set("A"))) + checkAnswer(indexData, withFileId(expectedSketchValues)) + assert(indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "ValueList_A__0")) + } + test("createIndex works correctly with a BloomFilterSketch.") { val sourceData = createSourceData(spark.range(100).toDF("A")) val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala index 1c8f323c1..5d493f333 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala @@ -228,6 +228,125 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite { assert(ex.getCause().getMessage().contains("BloomFilter does not support DoubleType")) } + test("ValueList index is applied for a filter query (EqualTo).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("ValueList index is applied for a filter query (Not(EqualTo)).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(10).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A != 1") + checkIndexApplied(query, 9) + } + } + } + + test( + "ValueList index is applied for a filter query (EqualTo) " + + "where some source data files has only null values.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq[Integer](1, 2, 3, null, 5, null, 7, 8, 9, null).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("ValueList index is applied for a filter query (multiple EqualTo's).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1 or A = 12 or A = 20") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query (In).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A in (20, 30, 10, 20)") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query (In) - string type.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq.range(0, 100).map(n => s"foo$n").toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A in ('foo31', 'foo12', 'foo1')") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query with UDF returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + spark.udf.register("F", (a: Int) => a < 15) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A)"))) + def query: DataFrame = df.filter("F(A)") + checkIndexApplied(query, 2) + } + } + } + + test( + "ValueList index is applied for a filter query with UDF " + + "taking two arguments and returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + spark.udf.register("F", (a: Int, b: Int) => a < 15 || b > 190) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A, B)"))) + def query: DataFrame = df.filter("F(A, B)") + checkIndexApplied(query, 3) + } + } + } + + test( + "ValueList index is applied for a filter query with UDF " + + "taking binary and returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData( + Seq( + Array[Byte](0, 0, 0, 0), + Array[Byte](0, 1, 0, 1), + Array[Byte](1, 2, 3, 4), + Array[Byte](5, 6, 7, 8), + Array[Byte](32, 32, 32, 32), + Array[Byte](64, 64, 64, 64), + Array[Byte](1, 1, 1, 1), + Array[Byte](-128, -128, -128, -128), + Array[Byte](127, 127, 127, 127), + Array[Byte](-1, 1, 0, 0)).toDF("A")) + spark.udf.register("F", (a: Array[Byte]) => a.sum == 0) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A)"))) + def query: DataFrame = df.filter("F(A)") + checkIndexApplied(query, 4) + } + } + } + test( "DataSkippingIndex works correctly for CSV where the same source data files can be " + "interpreted differently.") { diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala index 95634f43f..7165da53f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.functions.{input_file_name, max, min} import org.apache.spark.sql.types.IntegerType import com.microsoft.hyperspace.index.{Content, FileInfo, Index, IndexConstants} -import com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch +import com.microsoft.hyperspace.index.dataskipping.sketch.{MinMaxSketch, ValueListSketch} import com.microsoft.hyperspace.util.JsonUtils class DataSkippingIndexTest extends DataSkippingSuite { @@ -44,11 +44,27 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(index.indexedColumns === Seq("A", "B")) } + test("indexedColumns returns indexed columns of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B"))) + assert(index.indexedColumns === Seq("A", "B")) + } + + test("indexedColumns returns indexed columns without duplicates.") { + val index = + DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("A"))) + assert(index.indexedColumns === Seq("A")) + } + test("referencedColumns returns indexed columns of sketches.") { val index = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B"))) assert(index.referencedColumns === Seq("A", "B")) } + test("referencedColumns returns indexed columns of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B"))) + assert(index.referencedColumns === Seq("A", "B")) + } + test( "withNewProperties returns a new index which copies the original index except the " + "properties.") { @@ -63,11 +79,22 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(index.statistics() === Map("sketches" -> "MinMax(A), MinMax(B)")) } + test("statistics returns a string-formatted list of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B"))) + assert(index.statistics() === Map("sketches" -> "MinMax(A), ValueList(B)")) + } + test("canHandleDeletedFiles returns true.") { val index = DataSkippingIndex(Seq(MinMaxSketch("A"))) assert(index.canHandleDeletedFiles === true) } + test("Two indexes are equal if they have the same set of sketches.") { + val index1 = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B"))) + val index2 = DataSkippingIndex(Seq(ValueListSketch("B"), MinMaxSketch("A"))) + assert(index1 === index2) + } + test("write writes the index data in a Parquet format.") { val sourceData = createSourceData(spark.range(100).toDF("A")) val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A")) @@ -238,11 +265,25 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(ds1.hashCode === ds2.hashCode) } + test("Indexes are equal if they have the same sketches and data types (mixed sketch types).") { + val ds1 = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B"))) + val ds2 = DataSkippingIndex(Seq(ValueListSketch("B"), MinMaxSketch("A"))) + assert(ds1 === ds2) + assert(ds1.hashCode === ds2.hashCode) + } + test("Indexes are not equal to objects which are not indexes.") { val ds = DataSkippingIndex(Seq(MinMaxSketch("A"))) assert(ds !== "ds") } + test("Indexes are not equal if they don't have the same sketches.") { + val ds1 = DataSkippingIndex(Seq(MinMaxSketch("A"))) + val ds2 = DataSkippingIndex(Seq(ValueListSketch("A"))) + assert(ds1 !== ds2) + assert(ds1.hashCode !== ds2.hashCode) + } + test("Index can be serialized.") { val ds = DataSkippingIndex(Seq(MinMaxSketch("A", Some(IntegerType))), Map("a" -> "b")) val json = JsonUtils.toJson(ds) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala index 2e64c57ed..84a8e4a21 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala @@ -104,6 +104,9 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { 17, null, 19, 20).toDF("A")), "source [A:Int] with nulls") + def dataIS: SourceData = + SourceData(() => createSourceData(spark.range(10).toDF("A")), "source [A:Int] small") + def dataIIP: SourceData = SourceData( () => @@ -235,12 +238,24 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "!(A < 20)", MinMaxSketch("A"), 8), Param(dataI, "not (A not in (1, 2, 3))", MinMaxSketch("A"), 1), Param(dataS, "A < 'foo'", MinMaxSketch("A"), 1), + Param(dataS, "A in ('foo1', 'foo9')", ValueListSketch("A"), 2), Param(dataS, "A in ('foo1', 'foo5', 'foo9')", BloomFilterSketch("A", 0.01, 10), 3), Param( dataS, "A in ('foo1','goo1','hoo1','i1','j','k','l','m','n','o','p')", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A = 10", ValueListSketch("A"), 1), + Param(dataI, "10 = A", ValueListSketch("a"), 1), + Param(dataIS, "A != 5", ValueListSketch("A"), 9), + Param(dataIS, "5 != A", ValueListSketch("A"), 9), + Param(dataIN, "a!=9", ValueListSketch("a"), 6), + Param(dataIN, "9 != A", ValueListSketch("A"), 6), + Param(dataI, "A != 5", ValueListSketch("A"), 10), + Param(dataI, "A < 34", ValueListSketch("A"), 4), + Param(dataI, "34 > A", ValueListSketch("A"), 4), + Param(dataIN, "A < 9", ValueListSketch("a"), 2), + Param(dataIN, "9 > A", ValueListSketch("A"), 2), Param(dataI, "A = 10", BloomFilterSketch("A", 0.01, 10), 1), Param(dataI, "A <=> 20", BloomFilterSketch("A", 0.01, 10), 1), Param(dataI, "A <=> null", BloomFilterSketch("A", 0.01, 10), 10), @@ -258,6 +273,7 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "A != 10", BloomFilterSketch("A", 0.001, 10), 10), Param(dataI, "a = 10", MinMaxSketch("A"), 1), Param(dataI, "A = 10", MinMaxSketch("a"), 1), + Param(dataI, "A in (2, 3, 10, 99)", ValueListSketch("a"), 3), Param(dataI, "A in (1, 2, 3, 10)", MinMaxSketch("A"), 2), Param(dataI, "A in (10,9,8,7,6,5,4,3,2,1,50,49,48,47,46,45)", MinMaxSketch("A"), 4), Param(dataS, "A in ('foo1', 'foo5', 'foo9')", MinMaxSketch("A"), 3), @@ -274,6 +290,21 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { "A in (x'00',x'01',x'02',x'03',x'04',x'05',x'06',x'07',x'08',x'09',x'0a',x'20202020')", MinMaxSketch("A"), 1), + Param(dataI, "A in (10,9,8,7,6,5,4,3,2,1,50,49,48,47,46,45)", ValueListSketch("A"), 4), + Param(dataS, "A in ('foo1', 'foo5', 'foo9')", ValueListSketch("A"), 3), + Param( + dataS, + "A in ('foo1','a','b','c','d','e','f','g','h','i','j','k')", + ValueListSketch("A"), + 1), + Param(dataD, "A in (1,2,3,15,16,17)", ValueListSketch("A"), 2), + Param(dataD, "A in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)", ValueListSketch("A"), 2), + Param(dataB, "A in (x'00000000', x'0001', x'0002', x'05060708')", ValueListSketch("A"), 2), + Param( + dataB, + "A in (x'00',x'01',x'02',x'03',x'04',x'05',x'06',x'07',x'08',x'09',x'0a',x'20202020')", + ValueListSketch("A"), + 1), Param(dataI, "A BETWEEN 27 AND 51", MinMaxSketch("A"), 4), Param(dataI, "IF(A=1,2,3)=2", MinMaxSketch("A"), 10), Param(dataII, "A = 10 OR B = 50", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 2), @@ -322,6 +353,12 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { MinMaxSketch("is_less_than_23(A)"), 8, () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), + Param( + dataI, + "!is_less_than_23(A)", + ValueListSketch("is_less_than_23(A)"), + 8, + () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), Param( dataII, "A < 50 and F(A,B) < 20", @@ -339,7 +376,13 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { "IF(A IS NULL,NULL,F(A))=2", MinMaxSketch("A"), 10, - () => spark.udf.register("F", (a: Int) => a * 2))).foreach { + () => spark.udf.register("F", (a: Int) => a * 2)), + Param( + dataB, + "F(A)", + ValueListSketch("f(A)"), + 4, + () => spark.udf.register("F", (a: Array[Byte]) => a.sum == 0))).foreach { case Param(sourceData, filter, sketches, numExpectedFiles, setup) => test( s"applyIndex works as expected for ${sourceData.description}: " + diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketchTest.scala new file mode 100644 index 000000000..3cc4cede1 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/ValueListSketchTest.scala @@ -0,0 +1,249 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketch + +import org.apache.spark.sql.{Column, QueryTest} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.util._ + +class ValueListSketchTest extends QueryTest with HyperspaceSuite { + import spark.implicits._ + + test("indexedColumns returns the indexed column.") { + val sketch = ValueListSketch("A") + assert(sketch.indexedColumns === Seq("A")) + } + + test("referencedColumns returns the indexed column.") { + val sketch = ValueListSketch("A") + assert(sketch.referencedColumns === Seq("A")) + } + + test("aggregateFunctions returns an aggregation function that collects all unique values.") { + val sketch = ValueListSketch("A") + val aggrs = sketch.aggregateFunctions.map(new Column(_)) + val data = Seq(1, -1, 10, 2, 4, 2, 0, 10).toDF("A") + checkAnswer(data.select(aggrs: _*), Seq(Array(-1, 0, 1, 2, 4, 10)).toDF) + } + + test("toString returns a reasonable string.") { + val sketch = ValueListSketch("A") + assert(sketch.toString === "ValueList(A)") + } + + test("Two sketches are equal if their columns are equal.") { + assert(ValueListSketch("A") === ValueListSketch("A")) + assert(ValueListSketch("A") !== ValueListSketch("a")) + assert(ValueListSketch("b") !== ValueListSketch("B")) + assert(ValueListSketch("B") === ValueListSketch("B")) + } + + test("hashCode is reasonably implemented.") { + assert(ValueListSketch("A").hashCode === ValueListSketch("A").hashCode) + assert(ValueListSketch("A").hashCode !== ValueListSketch("a").hashCode) + } + + test("covertPredicate converts EqualTo(, ).") { + val sketch = ValueListSketch("A") + val predicate = EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(SortedArrayContains(sketchValues(0), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts EqualTo(, ).") { + val sketch = ValueListSketch("A") + val predicate = EqualTo(Literal(42), AttributeReference("A", IntegerType)(ExprId(0))) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(SortedArrayContains(sketchValues(0), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts EqualTo(, ) - string type.") { + val sketch = ValueListSketch("A") + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some(SortedArrayContains(sketchValues(0), Literal.create("hello", StringType))) + assert(result === expected) + } + + test("covertPredicate converts EqualTo(, ) - double type.") { + val sketch = ValueListSketch("A") + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal(3.14, DoubleType)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some(SortedArrayContains(sketchValues(0), Literal(3.14, DoubleType))) + assert(result === expected) + } + + test("covertPredicate converts Not(EqualTo(, )).") { + val sketch = ValueListSketch("A") + val predicate = Not(EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42))) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some( + Or( + GreaterThan(Size(sketchValues(0)), Literal(1)), + Not(EqualTo(ElementAt(sketchValues(0), Literal(1)), Literal(42))))) + assert(result === expected) + } + + test("covertPredicate converts LessThan.") { + val sketch = ValueListSketch("A") + val predicate = LessThan(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(LessThan(ElementAt(sketchValues(0), Literal(1)), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts LessThan - string type.") { + val sketch = ValueListSketch("A") + val predicate = LessThan( + AttributeReference("A", StringType)(ExprId(0)), + Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = + Some(LessThan(ElementAt(sketchValues(0), Literal(1)), Literal.create("hello", StringType))) + assert(result === expected) + } + + test("covertPredicate converts LessThanOrEqual.") { + val sketch = ValueListSketch("A") + val predicate = LessThanOrEqual(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(LessThanOrEqual(ElementAt(sketchValues(0), Literal(1)), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts GreaterThan.") { + val sketch = ValueListSketch("A") + val predicate = GreaterThan(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(GreaterThan(ElementAt(sketchValues(0), Literal(-1)), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts GreaterThanOrEqual.") { + val sketch = ValueListSketch("A") + val predicate = + GreaterThanOrEqual(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(GreaterThanOrEqual(ElementAt(sketchValues(0), Literal(-1)), Literal(42))) + assert(result === expected) + } + + test("covertPredicate converts In.") { + val sketch = ValueListSketch("A") + val predicate = + In(AttributeReference("A", IntegerType)(ExprId(0)), Seq(Literal(42), Literal(23))) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some(SortedArrayContainsAny(sketchValues(0), Array(23, 42))) + assert(result === expected) + } + + test("covertPredicate converts In - string type.") { + val sketch = ValueListSketch("A") + val predicate = + In( + AttributeReference("A", StringType)(ExprId(0)), + Seq(Literal.create("world", StringType), Literal.create("hello", StringType))) + val sketchValues = Seq(UnresolvedAttribute("valueList")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = + Some( + SortedArrayContainsAny( + sketchValues(0), + Array(UTF8String.fromString("hello"), UTF8String.fromString("world")))) + assert(result === expected) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAnyTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAnyTest.scala new file mode 100644 index 000000000..2af99630e --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsAnyTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.ArrayTestUtils +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +class SortedArrayContainsAnyTest extends HyperspaceSuite with ArrayTestUtils { + def test(arr1: Expression, arr2: Expression, expected: Boolean): Unit = { + val elementType = arr2.dataType.asInstanceOf[ArrayType].elementType + assert( + SortedArrayContainsAny( + arr1, + toArray( + arr2.asInstanceOf[Literal].value.asInstanceOf[ArrayData].toObjectArray(elementType), + elementType)).eval() === + expected) + } + + test("SortedArrayContainsAny returns true if two arrays intersect.") { + val array1 = createArray(Seq.range(0, 100000).map(_ * 2), IntegerType) + val array2 = createArray(Seq(0), IntegerType) + val array3 = createArray(Seq(2), IntegerType) + val array4 = createArray(Seq(199998), IntegerType) + val array5 = createArray(Seq(2, 4, 5), IntegerType) + val array6 = createArray(Seq(1, 3, 199998), IntegerType) + val array7 = createArray(Seq(-1, 100000), IntegerType) + val array8 = createArray(Seq(100000, 200001), IntegerType) + test(array1, array2, true) + test(array1, array3, true) + test(array1, array4, true) + test(array1, array5, true) + test(array1, array6, true) + test(array1, array7, true) + test(array1, array8, true) + test(array3, array5, true) + test(array4, array6, true) + test(array7, array8, true) + } + + test("SortedArrayContainsAny returns false if two arrays don't intersect.") { + val array1 = createArray(Seq.range(0, 100000).map(_ * 2), IntegerType) + val array2 = createArray(Seq(), IntegerType) + val array3 = createArray(Seq(-1), IntegerType) + val array4 = createArray(Seq(1), IntegerType) + val array5 = createArray(Seq(200001), IntegerType) + val array6 = createArray(Seq(1, 3, 199999), IntegerType) + val array7 = createArray(Seq(-1, 100001), IntegerType) + val array8 = createArray(Seq(49999, 100001), IntegerType) + val array9 = createArray(Seq(-3, 1, 1), IntegerType) + test(array1, array2, false) + test(array1, array3, false) + test(array1, array4, false) + test(array1, array5, false) + test(array1, array6, false) + test(array1, array7, false) + test(array1, array9, false) + test(array2, array3, false) + test(array3, array4, false) + test(array5, array6, false) + test(array6, array7, false) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsTest.scala new file mode 100644 index 000000000..bb4e3eae4 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/util/SortedArrayContainsTest.scala @@ -0,0 +1,77 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.ArrayTestUtils + +class SortedArrayContainsTest extends HyperspaceSuite with ArrayTestUtils { + test("SortedArrayContains works correctly for an empty array.") { + val array = createArray(Nil, IntegerType) + assert(SortedArrayContains(array, Literal(0, IntegerType)).eval() === false) + } + + test("SortedArrayContains works correctly for a array of size 1.") { + val array = createArray(Seq(1), IntegerType) + assert(SortedArrayContains(array, Literal(0, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(1, IntegerType)).eval() === true) + assert(SortedArrayContains(array, Literal(2, IntegerType)).eval() === false) + } + + test("SortedArrayContains works correctly for a array of size 2.") { + val array = createArray(Seq(1, 3), IntegerType) + assert(SortedArrayContains(array, Literal(0, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(1, IntegerType)).eval() === true) + assert(SortedArrayContains(array, Literal(2, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(3, IntegerType)).eval() === true) + assert(SortedArrayContains(array, Literal(4, IntegerType)).eval() === false) + } + + test("SortedArrayContains works correctly for an int array.") { + val values = Seq.range(0, 100000).map(_ * 2) + val array = createArray(values, IntegerType) + values.foreach(v => + assert(SortedArrayContains(array, Literal(v, IntegerType)).eval() === true)) + assert(SortedArrayContains(array, Literal(-10, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(1, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(999, IntegerType)).eval() === false) + assert(SortedArrayContains(array, Literal(1000000000, IntegerType)).eval() === false) + } + + test("SortedArrayContains works correctly for a long array.") { + val values = Seq.range(0L, 100000L).map(_ * 2) + val array = createArray(values, LongType) + values.foreach(v => assert(SortedArrayContains(array, Literal(v, LongType)).eval() === true)) + assert(SortedArrayContains(array, Literal(-10L, LongType)).eval() === false) + assert(SortedArrayContains(array, Literal(1L, LongType)).eval() === false) + assert(SortedArrayContains(array, Literal(999L, LongType)).eval() === false) + assert(SortedArrayContains(array, Literal(1000000000L, LongType)).eval() === false) + } + + test("SortedArrayContains works correctly for a string array.") { + val values = Seq("hello", "world", "foo", "bar", "footrix").sorted + val array = createArray(values, StringType) + values.foreach(v => + assert(SortedArrayContains(array, Literal.create(v, StringType)).eval() === true)) + assert(SortedArrayContains(array, Literal.create("abc", StringType)).eval() === false) + assert(SortedArrayContains(array, Literal.create("fooo", StringType)).eval() === false) + assert(SortedArrayContains(array, Literal.create("zoo", StringType)).eval() === false) + } +}