Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Support index creation on nested fields
Browse files Browse the repository at this point in the history
  • Loading branch information
andrei-ionescu committed Mar 11, 2021
1 parent c5714e4 commit 7286f7e
Show file tree
Hide file tree
Showing 9 changed files with 1,089 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, CreateActionEvent, HyperspaceEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{ResolverUtils, SchemaUtils}

class CreateAction(
spark: SparkSession,
Expand Down Expand Up @@ -65,9 +65,15 @@ class CreateAction(
}

private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Flatten the schema to support nested fields
val fields = SchemaUtils.escapeFieldNames(SchemaUtils.flatten(schema))
// Resolve index config columns from available column names present in the schema.
ResolverUtils
.resolve(spark, config.indexedColumns ++ config.includedColumns, schema.fieldNames)
.resolve(
spark,
SchemaUtils.escapeFieldNames(config.indexedColumns)
++ SchemaUtils.escapeFieldNames(config.includedColumns),
fields)
.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils}

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -73,7 +75,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))))

val coveringIndexProperties =
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)).toMap
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation) ++
usesNestedFieldsProperty(indexConfig)).toMap

IndexLogEntry(
indexConfig.indexName,
Expand All @@ -92,23 +95,6 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

private def hasParquetAsSourceFormatProperty(
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

Expand All @@ -117,7 +103,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

// run job
val repartitionedIndexDataFrame =
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*)
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"$c")): _*)

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
Expand All @@ -140,13 +126,38 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
relations.head
}

private def hasParquetAsSourceFormatProperty(
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

private def usesNestedFieldsProperty(indexConfig: IndexConfig): Option[(String, String)] = {
if (SchemaUtils.hasNestedFields(indexConfig.indexedColumns ++ indexConfig.includedColumns)) {
Some(IndexConstants.USES_NESTED_FIELDS_PROPERTY -> "true")
} else {
None
}
}

private def resolveConfig(
df: DataFrame,
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
val spark = df.sparkSession
val dfColumnNames = df.schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val dfColumnNames = SchemaUtils.flatten(df.schema)
val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns)
val includedColumns = SchemaUtils.unescapeFieldNames(indexConfig.includedColumns)
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)

Expand Down Expand Up @@ -177,8 +188,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// 2. If source data is partitioned, all partitioning key(s) are added to index schema
// as columns if they are not already part of the schema.
val partitionColumns = relation.partitionSchema.map(_.name)
val missingPartitionColumns = partitionColumns.filter(
ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val missingPartitionColumns =
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
Expand All @@ -202,10 +213,16 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
.select(
allIndexColumns.head,
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
.toDF(
SchemaUtils.escapeFieldNames(allIndexColumns) :+ IndexConstants.DATA_FILE_NAME_ID: _*)
} else {
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
.toDF(SchemaUtils.escapeFieldNames(columnsFromIndexConfig): _*)
}

(indexDF, resolvedIndexedColumns, resolvedIncludedColumns)
val escapedIndexedColumns = SchemaUtils.escapeFieldNames(resolvedIndexedColumns)
val escapedIncludedColumns = SchemaUtils.escapeFieldNames(resolvedIncludedColumns)

(indexDF, escapedIndexedColumns, escapedIncludedColumns)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ object IndexConstants {
// To provide multiple paths in the globbing pattern, separate them with commas, e.g.
// "/temp/1/*, /temp/2/*"
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern"

// Indicate whether the index has been built over a nested field.
private[hyperspace] val USES_NESTED_FIELDS_PROPERTY = "hasNestedFields"
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ case class IndexLogEntry(
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}

def usesNestedFields: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.USES_NESTED_FIELDS_PROPERTY, "false").toBoolean
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
Expand Down
60 changes: 60 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.util

import org.apache.spark.sql.types.{ArrayType, StructField, StructType}

object SchemaUtils {

val NESTED_FIELD_NEEDLE_REGEX = "\\."
val NESTED_FIELD_REPLACEMENT = "__"

def flatten(structFields: Seq[StructField], prefix: Option[String] = None): Seq[String] = {
structFields.flatMap {
case StructField(name, StructType(fields), _, _) =>
flatten(fields, Some(prefix.map(o => s"$o.$name").getOrElse(name)))
case StructField(name, ArrayType(StructType(fields), _), _, _) =>
flatten(fields, Some(prefix.map(o => s"$o.$name").getOrElse(name)))
case other =>
Seq(prefix.map(o => s"$o.${other.name}").getOrElse(other.name))
}
}

def escapeFieldNames(fields: Seq[String]): Seq[String] = {
fields.map(escapeFieldName)
}

def escapeFieldName(field: String): String = {
field.replaceAll(NESTED_FIELD_NEEDLE_REGEX, NESTED_FIELD_REPLACEMENT)
}

def unescapeFieldNames(fields: Seq[String]): Seq[String] = {
fields.map(unescapeFieldName)
}

def unescapeFieldName(field: String): String = {
field.replaceAll(NESTED_FIELD_REPLACEMENT, NESTED_FIELD_NEEDLE_REGEX)
}

def hasNestedFields(fields: Seq[String]): Boolean = {
fields.exists(isNestedField)
}

def isNestedField(field: String): Boolean = {
NESTED_FIELD_NEEDLE_REGEX.r.findFirstIn(field).isDefined
}
}
66 changes: 66 additions & 0 deletions src/test/scala/com/microsoft/hyperspace/SampleNestedData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace

import org.apache.spark.sql.SparkSession

/**
* Sample data for testing.
*/
object SampleNestedData {

val testData = Seq(
("2017-09-03", "810a20a2baa24ff3ad493bfbf064569a", "donde", 2, 1000,
SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 1))),
("2017-09-03", "fd093f8a05604515957083e70cb3dceb", "facebook", 1, 3000,
SampleNestedDataStruct("id1", SampleNestedDataLeaf("leaf_id1", 2))),
("2017-09-03", "af3ed6a197a8447cba8bc8ea21fad208", "facebook", 1, 3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 1))),
("2017-09-03", "975134eca06c4711a0406d0464cbe7d6", "facebook", 1, 4000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 2))),
("2018-09-03", "e90a6028e15b4f4593eef557daf5166d", "ibraco", 2, 3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id7", 5))),
("2018-09-03", "576ed96b0d5340aa98a47de15c9f87ce", "facebook", 2, 3000,
SampleNestedDataStruct("id2", SampleNestedDataLeaf("leaf_id9", 1))),
("2018-09-03", "50d690516ca641438166049a6303650c", "ibraco", 2, 1000,
SampleNestedDataStruct("id3", SampleNestedDataLeaf("leaf_id9", 10))),
("2019-10-03", "380786e6495d4cd8a5dd4cc8d3d12917", "facebook", 2, 3000,
SampleNestedDataStruct("id4", SampleNestedDataLeaf("leaf_id9", 12))),
("2019-10-03", "ff60e4838b92421eafc3e6ee59a9e9f1", "miperro", 2, 2000,
SampleNestedDataStruct("id5", SampleNestedDataLeaf("leaf_id9", 21))),
("2019-10-03", "187696fe0a6a40cc9516bc6e47c70bc1", "facebook", 4, 3000,
SampleNestedDataStruct("id6", SampleNestedDataLeaf("leaf_id9", 22))))

def save(
spark: SparkSession,
path: String,
columns: Seq[String],
partitionColumns: Option[Seq[String]] = None): Unit = {
val df = spark.createDataFrame(
spark.sparkContext.parallelize(testData)
).toDF(columns: _*)
partitionColumns match {
case Some(pcs) =>
df.write.partitionBy(pcs: _*).parquet(path)
case None =>
df.write.parquet(path)
}
}
}

case class SampleNestedDataStruct(id: String, leaf: SampleNestedDataLeaf)
case class SampleNestedDataLeaf(id: String, cnt: Int)
Loading

0 comments on commit 7286f7e

Please sign in to comment.