-
Notifications
You must be signed in to change notification settings - Fork 114
Support adaptive bucketed scan for FilterIndexRule #332
base: master
Are you sure you want to change the base?
Conversation
55f2b62
to
2f0b295
Compare
val foundPrunedBuckets = fileSourceScanNodes.head.collect { | ||
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) | ||
if optionalBucketSet.isDefined && (optionalBucketSet.get | ||
.cardinality() * 1.0 < index.bucketSpec.numBuckets * 0.8) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how you came up with 0.8?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get the list of files after bucket pruning here and use it with useBucektSpec = false
? For example, let's say only one bucket is selected. In this case, only one task will be launched to process the file (and the file can be huge). It may be better to split the files to multiple tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be possible if we extract genBucketSet
https://github.com/apache/spark/blob/d1177b52304217f4cb86506fd1887ec98879ed16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L116
and use it to filter the file paths before transforming the plan.
In this way, we don't need to generate the plan twice so I think it would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imback82 I tried several options.
- create InMemoryFileIndex with filtered file list - this isn't compatible with INMEMORYFILEINDEX_INDEX_ONLY tag and it incurs additional overhead from the newly created InMemoryFileIndex. I'd like to keep the current tag behavior.
- IndexHadoopFsRelation returns filtered file list - we need to rewrite
InMemoryFileIndex
as most of spark code refersrelation.location.xxx
to get the file list.
A possible better(?) option is introducing a custom FileSourceScanExec or InMemoryFileIndex.. but it will cause some complexity in codebase.
(btw I found the related issue: https://issues.apache.org/jira/browse/SPARK-32986)
So.. how about setting bucketSpec for FilterIndexRule if bucket pruning is applicable for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, maybe I wasn't clear. Is there a scenario where bucket read performs better when there are no buckets pruned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imback82 if the config "bucketCheck.enabled" is enabled, bucket scan won't be performed if there's no pruned bucket.
#329 will be used as a static config, for experimental purposes?
bucket read might be faster when job scheduling does matter(e.g. cache) or as non-bucketed scan will depend on the default spark configs - https://github.com/apache/spark/blob/2145f07e22a53e2da2f6a424c6fd169c2bb63a48/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala#L86
it might not work well with every dataset/landscape/env setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bucket read might be faster when job scheduling does matter(e.g. cache)
Hmm. usually scheduling is very fast that it shouldn't be a bottleneck; how much (percentage) was the scheduling overhead in the cache scenario?
OK, let me finish #329 and let me think if we can remove the inter-dependent configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- create InMemoryFileIndex with filtered file list - this isn't compatible with INMEMORYFILEINDEX_INDEX_ONLY tag and it incurs additional overhead from the newly created InMemoryFileIndex. I'd like to keep the current tag behavior.
BTW, this seems like a better option as Spark 3.2 will also decouple bucket scan vs. bucket filter: apache/spark@76baaf7
This will require some changes in RuleUtils
to support INMEMORYFILEINDEX_INDEX_ONLY
but seems doable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes plan + INMEMORYFILEINDEX_INDEX_ONLY will cache it properly, but different file lists might incur unnecessary overhead. Filtering files in DataSourceScanExec is clear and doable like the link.
15ab449
to
63dfc53
Compare
689cef9
to
e548325
Compare
e548325
to
04aace9
Compare
What is the context for this pull request?
What changes were proposed in this pull request?
This PR applies bucketing for Filter index if the given query is applicable for bucket pruning.
We introduced switchable config for it in #329, but it's a global config and it might be difficult to set the flag for each query.
To do this, I extracted bucket pruning logic(
genBucketSet
) fromFileSourceStrategy
in Spark.I introduced the following config to work with the static config (spark.hyperspace.index.filterRule.useBucketSpec)
Note: we could exclude unnecessary index files using selected bucket ids, but it'll cause some cost as InMemoryIndexFile can be different for each index file list (refer #324). Creating a new custom strategy or other class could filter the paths internally, but it'll bring some complexity in codebase. Thus for now, we set bucketSpec for pruning unnecessary data.
Does this PR introduce any user-facing change?
Yes, bucketSpec for filter index is applied if bucket pruning is applicable for the given query.
How was this patch tested?
Unit test