Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
  • Loading branch information
binmahone committed Dec 17, 2024
1 parent 7c05a0a commit 3298da2
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,16 @@ object AggregateUtils extends Logging {
if (needRepartitionAgain(bucket)) {
if (recursiveDepth >= maxRecursiveDepth) {
// Normally this should not happen, because we are repartitioning data that has
// already went through first round of aggregation, so there shouldn't be too many
// already gone through first round of aggregation, so there shouldn't be too many
// duplicated rows (the duplication only happens in different batches) to prevent
// repartitioning out (considering we're changing seed each time we repartition).
// However for some test cases with really small batch size, this can happen. So
// However, for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
log.warn("The bucket is still too large after " + recursiveDepth +
" times of repartition. Size for each batch in " +
"current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " +
bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: "
+ targetMergeBatchSize)
log.warn(s"The bucket is still too large after $recursiveDepth repartitions. " +
s"See https://github.com/NVIDIA/spark-rapids/issues/11834. " +
s"Sizes for each batch in current bucket: ${bucket.map(_.sizeInBytes).mkString(", ")}"
+ s" rows: ${bucket.map(_.numRows()).mkString(", ")}" +
s" targetMergeBatchSize: $targetMergeBatchSize")
ArrayBuffer(bucket)
} else {
val nextLayerBuckets =
Expand Down

0 comments on commit 3298da2

Please sign in to comment.