-
Notifications
You must be signed in to change notification settings - Fork 114
Support index creation on nested fields #379
Support index creation on nested fields #379
Conversation
7286f7e
to
14c529f
Compare
src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
9a860a3
to
3ff478e
Compare
src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
3ff478e
to
8f02f54
Compare
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala
Outdated
Show resolved
Hide resolved
@imback82, @sezruby, @rapoth: There are lots of issues on addressing nested fields in Spark. Even this example is not working in Spark:
It ends up with:
I'm able to do To access We need to create a two way convention on how to escape the field names and back. I propose the following... Given this column Steps to escape:
Steps to unescape:
The |
Hmm, how about just not supporting nested columns if their names have "." in the first cut? We can throw an exception for now, and add it later if there is a demand. So we can try the following:
Unescape: |
@imback82 I'm not sure what you mean by this:
First, and most important is that the method is NOT public API. If we are going towards this route we need to get to upper levels and use public APIs (not marked by Second, I cannot understand how Putting the two issues aside for a moment, you're suggesting the following steps:
Do I get it right? One other question for all of you @imback82, @sezruby, @rapoth: How can we speed these decisions up? It is taking a day for each back and forth message. Can we somehow speed up the process? I have some proposals:
|
@imback82, @sezruby I started implementing it in another way. I started w/o |
I think that's what's being called from |
@andrei-ionescu Our timezone is PST. 9am - 2pm PST is 6pm - 11pm EET. So I'll try to do review in the morning. You can bypass private method in this way: |
8f02f54
to
a34762b
Compare
@imback82, @sezruby: I pushed a new approach after your suggestion of using Spark's resolve mechanisms. And with this I got rid of the The following things were added:
Please have a look into this approach. I updated the PR description to reflect what we do in this index creation process. |
a34762b
to
83470ee
Compare
83470ee
to
f916e79
Compare
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
f916e79
to
cbfd4f6
Compare
src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
@imback82: I integrated your feedback, squashed the commits into a single one, and added you as co-author on it. |
Co-Authored-By: Terry Kim <12103644+imback82@users.noreply.github.com>
9bdf16e
to
4f8e392
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple of comments, but generally looking fine to me.
src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Outdated
Show resolved
Hide resolved
@imback82: Implemented your suggestion with Please have a look. I hope this is what you envisioned. |
src/test/scala/com/microsoft/hyperspace/util/SchemaUtilsTest.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
485601a
to
edae9c0
Compare
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala
Outdated
Show resolved
Hide resolved
edae9c0
to
b35b8a6
Compare
src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala
Outdated
Show resolved
Hide resolved
e59bae3
to
0b09b1f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @andrei-ionescu!
src/test/scala/com/microsoft/hyperspace/util/ResolverUtilsTest.scala
Outdated
Show resolved
Hide resolved
0b09b1f
to
2119750
Compare
@imback82, @sezruby: I integrated your suggestions. Can we move forward with this PR? There are a lot of new PRs that needs to happen to have the support for nested fields fully functional in Hyperspace for the first phase. Then the second phase will start to happen - to support arrays of nested fields, and maps with nested fields. |
aa5db97
to
7900f1b
Compare
7900f1b
to
93a6efd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @andrei-ionescu.
I have some comments, but to unblock you, I will merge this now, and I will do a follow up PR to address them.
@@ -208,16 +217,25 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |||
val dataPathColumn = "_data_path" | |||
val lineagePairs = relation.lineagePairs(fileIdTracker) | |||
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) | |||
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefixedAllIndexColumns
may not be the right name since missingPartitionColumns
are not "prefixed"?
(indexDF, prefixedIndexedColumns, prefixedIncludedColumns) | ||
} | ||
|
||
private def prepareColumns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: aliasColumns
?
private def getColumnNameFromSchema( | ||
schema: StructType, | ||
resolvedColNameParts: Seq[String], | ||
resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could just return Seq[String]
. The caller will just check if the nameParts.length > 1
to see if it's a nested column, which should make resolve
logic simpler?
// Check emitted events. | ||
MockEventLogger.emittedEvents match { | ||
case Seq( | ||
RefreshIncrementalActionEvent(_, _, "Operation started."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation seems off. I ran scalafmt in this file, and 61 lines were reformatted.
|
||
package com.microsoft.hyperspace.index | ||
|
||
import scala.collection.immutable.ListMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used.
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType} | |||
|
|||
import com.microsoft.hyperspace.HyperspaceException | |||
import com.microsoft.hyperspace.actions.Constants | |||
import com.microsoft.hyperspace.util.PathUtils | |||
import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change not needed.
@@ -305,8 +305,8 @@ object JoinIndexRule | |||
val rRequiredIndexedCols = lRMap.values.toSeq | |||
|
|||
// All required columns resolved with base relation. | |||
val lRequiredAllCols = resolve(spark, allRequiredCols(left), lBaseAttrs).get | |||
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rBaseAttrs).get | |||
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.map(_._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change required in this PR?
* The method prefixes the nested field names from a map where the keys are | ||
* the field names and the values are the nested state of that field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these seem out of sync?
* @param fieldNames The collection of prefixed field names. | ||
* @return A sequence of tuples of field names and nested status. | ||
*/ | ||
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[(String, Boolean)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to return the nested status? Introduce it when it's needed?
Created #393. |
What is the context for this pull request?
What changes were proposed in this pull request?
This PR adds support for creating indexes over nested fields (ie: structs).
Given the
nestedDataset
dataset with schemaand the following data
Creating the index on nested fields would be:
The following index dataset will be created
It is important to understand that the name of the field of the index column is a non-nested column and it gets prefixed by
__hs_nested.
prefix to recognise that the field in the index data frame is for a nested field (ie:nested.nst.field1
into__hs_nested.nested.nst.field1
).Does this PR introduce any user-facing change?
No.
How was this patch tested?