From 2a51bce166b6eeef27c32b1527173d1d3d84cc90 Mon Sep 17 00:00:00 2001 From: Alex Bush <6716812+alexjbush@users.noreply.github.com> Date: Fri, 21 Jun 2019 11:55:39 +0100 Subject: [PATCH] Fix partitioning issue (#3) * Fix for batching * Add extra safety around the batch partitioner --- .../scala/com/coxautodata/SparkDistCP.scala | 8 +-- .../coxautodata/objects/CopyPartitioner.scala | 11 +++-- .../com/coxautodata/TestSparkDistCP.scala | 49 ++++++++++++++----- .../objects/TestCopyPartitioner.scala | 6 +-- 4 files changed, 52 insertions(+), 22 deletions(-) diff --git a/src/main/scala/com/coxautodata/SparkDistCP.scala b/src/main/scala/com/coxautodata/SparkDistCP.scala index 024f86f..ac7bf7c 100644 --- a/src/main/scala/com/coxautodata/SparkDistCP.scala +++ b/src/main/scala/com/coxautodata/SparkDistCP.scala @@ -6,9 +6,9 @@ import com.coxautodata.objects._ import com.coxautodata.utils.{CopyUtils, FileListUtils, PathUtils} import org.apache.hadoop.fs._ import org.apache.log4j.Level -import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.{HashPartitioner, TaskContext} /** * Spark-based DistCp application. @@ -188,9 +188,11 @@ object SparkDistCP extends Logging { * and repartition the RDD so files in the same batches are in the same partitions */ private[coxautodata] def batchAndPartitionFiles(rdd: RDD[CopyDefinitionWithDependencies], maxFilesPerTask: Int, maxBytesPerTask: Long): RDD[((Int, Int), CopyDefinitionWithDependencies)] = { - val batched = rdd.mapPartitionsWithIndex(generateBatchedFileKeys(maxFilesPerTask, maxBytesPerTask)) + val partitioner = rdd.partitioner.getOrElse(new HashPartitioner(rdd.partitions.length)) + val sorted = rdd.map(v => (v.source.uri.toString, v)).repartitionAndSortWithinPartitions(partitioner).map(_._2) + val batched = sorted.mapPartitionsWithIndex(generateBatchedFileKeys(maxFilesPerTask, maxBytesPerTask)) //sorted - batched.partitionBy(CopyPartitioner(batched.map(_._1).reduceByKey(_ max _).collect())) + batched.partitionBy(CopyPartitioner(batched)) } /** diff --git a/src/main/scala/com/coxautodata/objects/CopyPartitioner.scala b/src/main/scala/com/coxautodata/objects/CopyPartitioner.scala index 8f9d077..aa02d84 100644 --- a/src/main/scala/com/coxautodata/objects/CopyPartitioner.scala +++ b/src/main/scala/com/coxautodata/objects/CopyPartitioner.scala @@ -1,6 +1,7 @@ package com.coxautodata.objects import org.apache.spark.Partitioner +import org.apache.spark.rdd.RDD /** * Custom partitioner based on the indexes array containing (partitionid, number of batches within partition) @@ -8,7 +9,7 @@ import org.apache.spark.Partitioner */ case class CopyPartitioner(indexes: Array[(Int, Int)]) extends Partitioner { - private val indexesAsMap = indexes.toMap + val indexesAsMap: Map[Int, Int] = indexes.toMap override val numPartitions: Int = indexes.map(_._2).sum + indexes.length @@ -19,9 +20,13 @@ case class CopyPartitioner(indexes: Array[(Int, Int)]) extends Partitioner { override def getPartition(key: Any): Int = key match { case (p: Int, i: Int) => if (!indexesAsMap.keySet.contains(p)) throw new RuntimeException(s"Key partition $p of key [($p, $i)] was not found in the indexes [${indexesAsMap.keySet.mkString(", ")}].") - else if (i > indexesAsMap(p)) throw new RuntimeException(s"Key index $i of key [($p, $i)] is outside range [<=${indexesAsMap(p)}].") - else partitionOffsets(p) + i + // Modulo the batch id to prevent exceptions if the batch id is out of the range + partitionOffsets(p) + (i % (indexesAsMap(p) + 1)) case u => throw new RuntimeException(s"Partitioned does not support key [$u]. Must be (Int, Int).") } } + +object CopyPartitioner { + def apply(rdd: RDD[((Int, Int), CopyDefinitionWithDependencies)]): CopyPartitioner = new CopyPartitioner(rdd.map(_._1).reduceByKey(_ max _).collect()) +} diff --git a/src/test/scala/com/coxautodata/TestSparkDistCP.scala b/src/test/scala/com/coxautodata/TestSparkDistCP.scala index a80ae98..d75d54a 100644 --- a/src/test/scala/com/coxautodata/TestSparkDistCP.scala +++ b/src/test/scala/com/coxautodata/TestSparkDistCP.scala @@ -1,7 +1,7 @@ package com.coxautodata import com.coxautodata.SparkDistCP._ -import com.coxautodata.objects.{CopyDefinitionWithDependencies, Directory, File, SerializableFileStatus} +import com.coxautodata.objects.{CopyDefinitionWithDependencies, CopyPartitioner, Directory, File, SerializableFileStatus} import com.coxautodata.utils.FileListUtils.listFiles import com.coxautodata.utils.FileListing import org.apache.hadoop.fs.Path @@ -67,12 +67,12 @@ class TestSparkDistCP extends TestSpec { case ((pp, i), d) => ((p, pp, i), d.source.getPath.toString) } } should contain theSameElementsAs Seq( - ((0, 0, 0), "/one"), - ((2, 1, 0), "/two"), - ((0, 0, 0), "/three"), - ((2, 1, 0), "/file1"), - ((1, 0, 1), "/file2"), - ((3, 1, 1), "/file3") + ((0, 0, 0), "/file1"), + ((0, 0, 0), "/file3"), + ((1, 1, 0), "/file2"), + ((1, 1, 0), "/one"), + ((2, 1, 1), "/three"), + ((2, 1, 1), "/two") ) @@ -106,18 +106,41 @@ class TestSparkDistCP extends TestSpec { case ((pp, i), d) => ((p, pp, i), d.source.getPath.toString) } } should contain theSameElementsAs Seq( - ((0, 0, 0), "/one"), - ((0, 0, 0), "/two"), - ((1, 0, 1), "/three"), - ((1, 0, 1), "/file1"), - ((2, 0, 2), "/file2"), - ((3, 0, 3), "/file3") + ((0, 0, 0), "/file1"), + ((0, 0, 0), "/file2"), + ((1, 0, 1), "/file3"), + ((1, 0, 1), "/one"), + ((2, 0, 2), "/three"), + ((2, 0, 2), "/two") ) spark.stop() } + it("produce predictable batching") { + val spark = new SparkContext(new SparkConf().setAppName("test").setMaster("local[1]")) + + val in = List( + CopyDefinitionWithDependencies(SerializableFileStatus(new Path("/1").toUri, 1, File), new Path("/dest/file1").toUri, Seq.empty), + CopyDefinitionWithDependencies(SerializableFileStatus(new Path("/3").toUri, 3000, File), new Path("/dest/file3").toUri, Seq.empty), + CopyDefinitionWithDependencies(SerializableFileStatus(new Path("/2").toUri, 1, File), new Path("/dest/file2").toUri, Seq.empty) + ) + + val inRDD = spark + .parallelize(in) + .repartition(1) + + + val unsorted = batchAndPartitionFiles(inRDD, 3, 2000).partitioner.get.asInstanceOf[CopyPartitioner] + + val sorted = batchAndPartitionFiles(inRDD.sortBy(_.source.uri.toString), 3, 2000).partitioner.get.asInstanceOf[CopyPartitioner] + + unsorted.indexesAsMap should be (sorted.indexesAsMap) + + spark.stop() + } + } describe("run") { diff --git a/src/test/scala/com/coxautodata/objects/TestCopyPartitioner.scala b/src/test/scala/com/coxautodata/objects/TestCopyPartitioner.scala index c423d37..f22ea9f 100644 --- a/src/test/scala/com/coxautodata/objects/TestCopyPartitioner.scala +++ b/src/test/scala/com/coxautodata/objects/TestCopyPartitioner.scala @@ -30,9 +30,9 @@ class TestCopyPartitioner extends FunSpec with Matchers { partitioner.getPartition((4, 1)) }.getMessage should be("Key partition 4 of key [(4, 1)] was not found in the indexes [0, 1, 2, 3].") - intercept[RuntimeException] { - partitioner.getPartition((2, 1)) - }.getMessage should be("Key index 1 of key [(2, 1)] is outside range [<=0].") + partitioner.getPartition((2, 0)) should be(5) + + partitioner.getPartition((2, 1)) should be(5) }