Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support index creation on nested fields #379

Merged
merged 2 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.microsoft.hyperspace.actions
import scala.util.Try

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
Expand Down Expand Up @@ -51,7 +50,7 @@ class CreateAction(
}

// schema validity checks
if (!isValidIndexSchema(indexConfig, df.schema)) {
if (!isValidIndexSchema(indexConfig, df)) {
throw HyperspaceException("Index config is not applicable to dataframe schema.")
}

Expand All @@ -65,10 +64,13 @@ class CreateAction(
}
}

private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Resolve index config columns from available column names present in the schema.
private def isValidIndexSchema(config: IndexConfig, dataFrame: DataFrame): Boolean = {
// Resolve index config columns from available column names present in the dataframe.
ResolverUtils
.resolve(spark, config.indexedColumns ++ config.includedColumns, schema.fieldNames)
.resolve(
spark,
config.indexedColumns ++ config.includedColumns,
dataFrame.queryExecution.analyzed)
.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.{col, input_file_name}

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils}

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -104,32 +104,19 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

private def hasParquetAsSourceFormatProperty(
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

val (indexDataFrame, resolvedIndexedColumns, _) =
prepareIndexDataFrame(spark, df, indexConfig)

// run job
val repartitionedIndexDataFrame =
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*)
// Run job
val repartitionedIndexDataFrame = {
// For nested fields, resolvedIndexedColumns will have flattened names with `.` (dots),
// thus they need to be enclosed in backticks to access them as top-level columns.
// Note that backticking the non-nested columns is a no-op.
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"`$c`")): _*)
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
}

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
Expand All @@ -152,25 +139,42 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
relations.head
}

private def hasParquetAsSourceFormatProperty(
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

private def resolveConfig(
df: DataFrame,
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
indexConfig: IndexConfig): (Seq[(String, Boolean)], Seq[(String, Boolean)]) = {
val spark = df.sparkSession
val dfColumnNames = df.schema.fieldNames
val plan = df.queryExecution.analyzed
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan)

(resolvedIndexedColumns, resolvedIncludedColumns) match {
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, c, dfColumnNames)))
.collect { case c if c._2.isEmpty => c._1 }
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_._1))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns '${dfColumnNames.mkString(",")}'")
s"from available source columns:\n${df.schema.treeString}")
}
}

Expand All @@ -179,7 +183,12 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
df: DataFrame,
indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = {
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
val columnsFromIndexConfig = resolvedIndexedColumns ++ resolvedIncludedColumns
val columnsFromIndexConfig =
resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1)

val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns

andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
val indexDF = if (hasLineage(spark)) {
val relation = getRelation(spark, df)
Expand Down Expand Up @@ -208,16 +217,25 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
val dataPathColumn = "_data_path"
val lineagePairs = relation.lineagePairs(fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefixedAllIndexColumns may not be the right name since missingPartitionColumns are not "prefixed"?


df.withColumn(dataPathColumn, input_file_name())
.join(lineageDF.hint("broadcast"), dataPathColumn)
.select(
allIndexColumns.head,
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
.select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+
col(IndexConstants.DATA_FILE_NAME_ID): _*)
} else {
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*)
}

(indexDF, resolvedIndexedColumns, resolvedIncludedColumns)
(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
}

private def prepareColumns(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: aliasColumns?

originalColumns: Seq[String],
prefixedColumns: Seq[String]): Seq[Column] = {
originalColumns.zip(prefixedColumns).map {
case (original, prefixed) =>
col(original).as(prefixed)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.SchemaUtils

/**
* Base abstract class containing common code for different types of index refresh actions.
Expand Down Expand Up @@ -90,7 +91,12 @@ private[actions] abstract class RefreshActionBase(

protected lazy val indexConfig: IndexConfig = {
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included)
IndexConfig(
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
previousIndexLogEntry.name,
// As indexed & included columns in previousLogEntry are resolved & prefixed names,
// need to remove the prefix to resolve with the dataframe for refresh.
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed).map(_._1),
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included).map(_._1))
}

final override val transientState: String = REFRESHING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class RefreshIncrementalAction(
refreshDF,
indexDataPath.toString,
previousIndexLogEntry.numBuckets,
indexConfig.indexedColumns,
// previousIndexLogEntry should contain the resolved and prefixed field names.
previousIndexLogEntry.derivedDataset.properties.columns.indexed,
writeMode)
}
}
Expand All @@ -115,10 +116,6 @@ class RefreshIncrementalAction(
}
}

override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}

/**
* Create a log entry with all source data files, and all required index content. This contains
* ALL source data files (files which were indexed previously, and files which are being indexed
Expand All @@ -143,4 +140,8 @@ class RefreshIncrementalAction(
entry
}
}

override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
RefreshIncrementalActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change not needed.


// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
case class NoOpFingerprint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ object JoinIndexRule
val rRequiredIndexedCols = lRMap.values.toSeq

// All required columns resolved with base relation.
val lRequiredAllCols = resolve(spark, allRequiredCols(left), lBaseAttrs).get
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rBaseAttrs).get
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.map(_._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change required in this PR?

val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get.map(_._1)

// Make sure required indexed columns are subset of all required columns for a subplan
require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined)
Expand Down
88 changes: 88 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package com.microsoft.hyperspace.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetArrayStructFields, GetMapValue, GetStructField}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

import com.microsoft.hyperspace.HyperspaceException

/**
* [[ResolverUtils]] provides utility functions to resolve strings based on spark's resolver.
Expand Down Expand Up @@ -71,4 +76,87 @@ object ResolverUtils {
availableStrings: Seq[String]): Option[Seq[String]] = {
Some(requiredStrings.map(resolve(spark, _, availableStrings).getOrElse { return None }))
}

/**
* Finds all resolved strings for requiredStrings, from the given logical plan. Returns
* optional seq of resolved strings if all required strings are resolved, otherwise None.
*
* @param spark Spark session.
* @param requiredStrings List of strings to resolve.
* @param plan Logical plan to resolve against.
* @return Optional sequence of tuples of resolved name string and nested state boolean
* if all required strings are resolved. Else, None.
*/
def resolve(
spark: SparkSession,
requiredStrings: Seq[String],
plan: LogicalPlan): Option[Seq[(String, Boolean)]] = {
val schema = plan.schema
val resolver = spark.sessionState.conf.resolver
val resolved = requiredStrings.map { requiredField =>
plan
.resolveQuoted(requiredField, resolver)
.map { expr =>
val resolvedColNameParts = extractColumnName(expr)
validateResolvedColumnName(requiredField, resolvedColNameParts)
getColumnNameFromSchema(schema, resolvedColNameParts, resolver)
.foldLeft(("", false)) { (acc, i) =>
val name = Seq(acc._1, i._1).filter(_.nonEmpty).mkString(".")
val isNested = acc._2 || i._2
(name, isNested)
}
}
.getOrElse { return None }
}
Some(resolved)
}

// Extracts the parts of a nested field access path from an expression.
private def extractColumnName(expr: Expression): Seq[String] = {
expr match {
case a: Attribute =>
Seq(a.name)
case GetStructField(child, _, Some(name)) =>
extractColumnName(child) :+ name
case _: GetArrayStructFields =>
// TODO: Nested arrays will be supported later
throw HyperspaceException("Array types are not supported.")
case _: GetMapValue =>
// TODO: Nested maps will be supported later
throw HyperspaceException("Map types are not supported.")
case Alias(nested: ExtractValue, _) =>
extractColumnName(nested)
}
}

// Validate the resolved column name by checking if nested columns have dots in its field names.
private def validateResolvedColumnName(
origCol: String,
resolvedColNameParts: Seq[String]): Unit = {
if (resolvedColNameParts.length > 1 && resolvedColNameParts.exists(_.contains("."))) {
andrei-ionescu marked this conversation as resolved.
Show resolved Hide resolved
throw HyperspaceException(
s"Hyperspace does not support the nested column whose name contains dots: $origCol")
}
}

// Given resolved column name parts, return new column name parts using the name in the given
// schema to use the original name in order to preserve the casing.
private def getColumnNameFromSchema(
schema: StructType,
resolvedColNameParts: Seq[String],
resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could just return Seq[String]. The caller will just check if the nameParts.length > 1 to see if it's a nested column, which should make resolve logic simpler?

case h :: tail =>
val field = schema.find(f => resolver(f.name, h)).get
field match {
case StructField(name, s: StructType, _, _) =>
(name, true) +: getColumnNameFromSchema(s, tail, resolver)
case StructField(_, _: ArrayType, _, _) =>
// TODO: Nested arrays will be supported later
throw HyperspaceException("Array types are not supported.")
case StructField(_, _: MapType, _, _) =>
// TODO: Nested maps will be supported later
throw HyperspaceException("Map types are not supported")
case f => Seq((f.name, false))
}
}
}
Loading