Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support index creation on nested fields #379

Merged
merged 2 commits into from
Mar 27, 2021

Conversation

andrei-ionescu
Copy link
Contributor

@andrei-ionescu andrei-ionescu commented Mar 11, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for creating indexes over nested fields (ie: structs).

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

Creating the index on nested fields would be:

hs.createIndex(
  nestedDataset, 
  IndexConfig(
    "idx_nested", 
    indexedColumns = Seq("nested.nst.field1"), 
    includedColumns = Seq("id", "name", "nested.nst.field2")))

The following index dataset will be created

root
 |-- __hs_nested.nested.nst.field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- __hs_nested.nested.nst.field2: string (nullable = true)
+-----------------------------+---+-----+-----------------------------+
|__hs_nested.nested.nst.field1| id| name|__hs_nested.nested.nst.field2|
+-----------------------------+---+-----+-----------------------------+
|                          wa1|  1|name1|                          wb1|
|                          wa2|  2|name2|                          wb2|
+-----------------------------+---+-----+-----------------------------+

It is important to understand that the name of the field of the index column is a non-nested column and it gets prefixed by __hs_nested. prefix to recognise that the field in the index data frame is for a nested field (ie: nested.nst.field1 into __hs_nested.nested.nst.field1).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Existing unit and integration tests for not breaking the existing functionalities.
  • Unit and integration test added for the new functionalities.

@andrei-ionescu
Copy link
Contributor Author

@sezruby, @imback82 Here is the first PR that add support for creating indexes over nested fields.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch 2 times, most recently from 9a860a3 to 3ff478e Compare March 12, 2021 13:48
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 3ff478e to 8f02f54 Compare March 15, 2021 10:28
@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 16, 2021

@imback82, @sezruby, @rapoth: There are lots of issues on addressing nested fields in Spark.

Even this example is not working in Spark:

case class Prec(s: Double)
case class Point(`coord.x`: Double, `coord.y`: Double, `prec.100`: Prec)
val colName = "__prefixed.precision.`prec.100`.s"
val df = Seq((1, "a", Point(1, 1, Prec(10)))).toDF("id", "name", "precision")
val df2 = df.select("precision.`prec.100`.s").toDF(colName)
df2.select(colName).show

It ends up with:

org.apache.spark.sql.AnalysisException: cannot resolve '`__prefixed.precision.``prec.100``.s`' given input columns: [__prefixed.precision.`prec.100`.s];;
'Project ['__prefixed.precision.`prec.100`.s]
+- Project [s#26 AS __prefixed.precision.`prec.100`.s#28]
   +- Project [precision#9.prec.100.s AS s#26]
      +- Project [_1#3 AS id#7, _2#4 AS name#8, _3#5 AS precision#9]
         +- LocalRelation [_1#3, _2#4, _3#5]

I'm able to do toDF with a column name but then I cannot select it.

To access s in df I have to use precision.`prec.100`.s in select (it needs precision.100 to be enclosed by backticks) but that column name brought up to the first level (not nested) cannot contain backticks.

We need to create a two way convention on how to escape the field names and back.

I propose the following...

Given this column precision.`prec.100`.s it will be escaped to __hs.precision.-prec.100-.s.

Steps to escape:

  1. Replace ` (backticks) with -
  2. Prepend prefix __hs
  3. Fully enclose it in ` (backticks)

Steps to unescape:

  1. Remove begin and end backticks
  2. Remove __hs prefix
  3. Replace - with ` (backticks)

The escapeFieldName("precision.`prec.100`.s") will return __hs.precision.-prec.100-.s.
The unescapeFieldName("__hs.precision.-prec.100-.s") will return precision.`prec.100`.s.

@imback82
Copy link
Contributor

Hmm, how about just not supporting nested columns if their names have "." in the first cut? We can throw an exception for now, and add it later if there is a demand.

So we can try the following:
Escape:

  1. Check if a column name given in the config is a nested column by using df.resolve.
  2. If it is a nested column,
  • Validate if it's supported: df.resolve will return Alias(GetStructField(_)), and you can traverse it to check if the name at each level contains "." or not. If it does contain, throw an exception.
  • After the validation, prepend __hs_nested to the column name (the name should be constructed by traversing GetStructField to make sure it is unquoted).

Unescape:
If the column name has __hs_nested as a prefix, just remove the prefix.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 17, 2021

@imback82 I'm not sure what you mean by this:

Check if a column name given in the config is a nested column by using df.resolve.

First, and most important is that the method is NOT public API. If we are going towards this route we need to get to upper levels and use public APIs (not marked by private or protected).

Second, I cannot understand how resolve method that returns a NamedExpression can tell us if is nested or not. Just calling the method is not enough as far as I know. Is there other process that you have in mind that I'm not aware of?

Putting the two issues aside for a moment, you're suggesting the following steps:

  1. Given a new IndexConfig with a "nested construct column name" - a field name that contains a . (dot) - we need to check this
  2. Check the given "nested construct column name" if is resolvable - I guess we should do it in ResolverUtils class
  3. Check the given "nested construct column name" if the resolved parts do contain any . (dot) and if so throw exception
  4. Transform the given "nested construct column name" into a top level column name (ie: from nested.field_a.leaf_1 into `__hs_nested.nested.field_a.leaf_1` (note the required additional backticks).
  5. Whenever needed use the prefix to check inside the IndexLogEntry that a column is nested without going to the data frame that usually is not available at that level.

Do I get it right?

One other question for all of you @imback82, @sezruby, @rapoth: How can we speed these decisions up? It is taking a day for each back and forth message. Can we somehow speed up the process?

I have some proposals:

  1. What's the everyday time interval that you guys are available to review code? I'm asking because I'm thinking to be around on the PR responding to your feedback in a faster way.
  2. My timezone is EET (GMT+3). What's your timezone?

@andrei-ionescu
Copy link
Contributor Author

@imback82, @sezruby I started implementing it in another way. I started w/o flatten and instead by resolving the nested field constructs with resolveQuoted method from LogicalPlan - LogicalPlan.scala#L122-L131. Changes will come soon.

@imback82
Copy link
Contributor

@imback82, @sezruby I started implementing it in another way. I started w/o flatten and instead by resolving the nested field constructs with resolveQuoted method from LogicalPlan - LogicalPlan.scala#L122-L131. Changes will come soon.

I think that's what's being called from df.resolve: https://github.com/apache/spark/blob/569fb133d09e24e4ed56ed7efff641512d98b01b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L262, which I have been suggesting.

@sezruby
Copy link
Collaborator

sezruby commented Mar 18, 2021

@andrei-ionescu Our timezone is PST. 9am - 2pm PST is 6pm - 11pm EET. So I'll try to do review in the morning.

You can bypass private method in this way:
https://github.com/microsoft/hyperspace/blob/master/src/main/scala/org/apache/spark/sql/hyperspace/utils/package.scala

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 8f02f54 to a34762b Compare March 18, 2021 08:57
@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 18, 2021

@imback82, @sezruby: I pushed a new approach after your suggestion of using Spark's resolve mechanisms. And with this I got rid of the flatten method and all related methods for escaping and back.

The following things were added:

  1. New ResolverUtils.resolve method to resolve the field name constructs agains a dataframe plan.
    1. Uses LogicalPlan.resolveQuoted method
    2. Fixes the casing (ie: nesTed.fieLD into nested.field)
    3. When there are field names that contain . (dot) and are not nested an exception is thrown
  2. New prefix methods in SchemaUtils.
  3. Unit Tests for all new functionalities.

Please have a look into this approach.

I updated the PR description to reflect what we do in this index creation process.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from a34762b to 83470ee Compare March 18, 2021 12:38
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 83470ee to f916e79 Compare March 18, 2021 17:18
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from f916e79 to cbfd4f6 Compare March 18, 2021 18:01
@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 22, 2021

@imback82: I integrated your feedback, squashed the commits into a single one, and added you as co-author on it.

Co-Authored-By: Terry Kim <12103644+imback82@users.noreply.github.com>
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 9bdf16e to 4f8e392 Compare March 22, 2021 14:42
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of comments, but generally looking fine to me.

@andrei-ionescu
Copy link
Contributor Author

@imback82: Implemented your suggestion with resolve method returning a tuple with field name along with its nested state.

Please have a look. I hope this is what you envisioned.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 485601a to edae9c0 Compare March 24, 2021 09:36
@andrei-ionescu
Copy link
Contributor Author

@imback82, @sezruby Integrated your feedback. If you have time please have another look. Thanks.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from edae9c0 to b35b8a6 Compare March 24, 2021 20:19
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch 2 times, most recently from e59bae3 to 0b09b1f Compare March 24, 2021 23:19
@andrei-ionescu
Copy link
Contributor Author

@sezruby, @imback82: Integrated your feedback.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @andrei-ionescu!

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 0b09b1f to 2119750 Compare March 25, 2021 09:19
@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 25, 2021

@imback82, @sezruby: I integrated your suggestions. Can we move forward with this PR? There are a lot of new PRs that needs to happen to have the support for nested fields fully functional in Hyperspace for the first phase. Then the second phase will start to happen - to support arrays of nested fields, and maps with nested fields.

@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch 2 times, most recently from aa5db97 to 7900f1b Compare March 25, 2021 10:46
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_actions branch from 7900f1b to 93a6efd Compare March 25, 2021 11:00
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @andrei-ionescu.

I have some comments, but to unblock you, I will merge this now, and I will do a follow up PR to address them.

@@ -208,16 +217,25 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
val dataPathColumn = "_data_path"
val lineagePairs = relation.lineagePairs(fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefixedAllIndexColumns may not be the right name since missingPartitionColumns are not "prefixed"?

(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
}

private def prepareColumns(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: aliasColumns?

private def getColumnNameFromSchema(
schema: StructType,
resolvedColNameParts: Seq[String],
resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could just return Seq[String]. The caller will just check if the nameParts.length > 1 to see if it's a nested column, which should make resolve logic simpler?

// Check emitted events.
MockEventLogger.emittedEvents match {
case Seq(
RefreshIncrementalActionEvent(_, _, "Operation started."),
Copy link
Contributor

@imback82 imback82 Mar 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation seems off. I ran scalafmt in this file, and 61 lines were reformatted.


package com.microsoft.hyperspace.index

import scala.collection.immutable.ListMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used.

@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change not needed.

@@ -305,8 +305,8 @@ object JoinIndexRule
val rRequiredIndexedCols = lRMap.values.toSeq

// All required columns resolved with base relation.
val lRequiredAllCols = resolve(spark, allRequiredCols(left), lBaseAttrs).get
val rRequiredAllCols = resolve(spark, allRequiredCols(right), rBaseAttrs).get
val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.map(_._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change required in this PR?

Comment on lines +43 to +44
* The method prefixes the nested field names from a map where the keys are
* the field names and the values are the nested state of that field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these seem out of sync?

* @param fieldNames The collection of prefixed field names.
* @return A sequence of tuples of field names and nested status.
*/
def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[(String, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to return the nested status? Introduce it when it's needed?

@imback82 imback82 merged commit b28caca into microsoft:master Mar 27, 2021
@imback82 imback82 added the enhancement New feature or request label Mar 27, 2021
@imback82 imback82 added this to the February/March 2021 (v0.5.0) milestone Mar 27, 2021
@imback82
Copy link
Contributor

I will do a follow up PR to address them.

Created #393.

@andrei-ionescu andrei-ionescu deleted the nested_fields_actions branch March 29, 2021 07:59
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Index nested fields
3 participants