-
Notifications
You must be signed in to change notification settings - Fork 114
Check and remove unnecessary shuffle added by Hybrid Scan #331
base: master
Are you sure you want to change the base?
Conversation
e082581
to
8bec7c5
Compare
Measured val filter1 = linetable.filter(linetable("l_orderkey") isin (1234,12341234, 123456)).select("l_orderkey")
val filter2 = linetable.filter(linetable("l_orderkey") isin (1234,12341234)).select("l_orderkey")
val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)
val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)
val join = filter1.join(filter2, "l_orderkey")
val plan = join.queryExecution.optimizedPlan
measure(spark.sessionState.executePlan(plan).executedPlan)
// result (unit: ms)
duration: 233
duration: 97
duration: 97 |
Could you explain the importance of this benchmark? (I wasn't sure what to get out of this benchmark.) |
I just checked the code, and you meant to measure re-executing the updated plan? |
// If the number of shuffle is 2, the candidate index pair cannot remove | ||
// the shuffles in both left and right for join and also Hybrid Scan causes | ||
// an additional shuffle for merging appended files. We don't apply the index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused with this comment. For example, is the additional shuffle by hybrid scan also counting toward 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 for sort merge join & 1 for hybrid scan on the fly shuffle, in left or right child plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea please reword the comments.
// the shuffles in both left and right for join and also Hybrid Scan causes | ||
// an additional shuffle for merging appended files. We don't apply the index | ||
// for the child with 2 shuffle nodes using JoinIndexRule. | ||
// However, the child node is still applicable for FilterIndexRule. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify on the reference to FilterIndexRule
?
// an additional shuffle for merging appended files. We don't apply the index | ||
// for the child with 2 shuffle nodes using JoinIndexRule. | ||
// However, the child node is still applicable for FilterIndexRule. | ||
case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that rightCnt is also 2 for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, Spark will shuffle with a higher bucket number. So only 1 shuffle for sort merge join can be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, let's add this to the comment.
Here is what I suggest on how to write the benchmark paragraph (my thoughts in parenthesis): This PR introduces triggering .. result here ... I see the results to be 233ms, 97ms, 97ms (explain the reasoning on the high latency on the first iteration - prob. caching), and this doesn't look like introducing much overhead (but I would also compare when |
.hybridScanShuffleCheckEnabled(spark)) { | ||
updatedPlan | ||
} else { | ||
val shuffleCntPair = spark.sessionState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that this will be recursively called since this is called inside a rule? (I am not sure if this is a good pattern or not to be honest).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. how about just using JoinSelection
& and just check the bucket number of candidate index pair if it's SortMergeJoin?
Or we could generate the plan w/o hyperspace rules
What is the context for this pull request?
What changes were proposed in this pull request?
This PR can remove additional shuffle node by generating executedPlan for the transformed join query plan in optimizer.
(fyi, there could be max. 3 shuffle nodes in left and right, because if both children of join are bucketed with different number, Spark will shuffle the child with the smaller bucket number of two, using the larger bucket number. For example,
In this case, we don't need to shuffle for the appended files, because they will be shuffled for SortMergeJoin after all.
So Hyperspace doesn't use the transformed plan for the child.
But indexes can still be applied for the child using Filter Rule.
Does this PR introduce any user-facing change?
Yes
With this change:
Without this change:
How was this patch tested?
Unit test