diff options
author | Nezih Yigitbasi <nyigitbasi@netflix.com> | 2016-04-19 14:35:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-19 14:35:26 -0700 |
commit | 3c91afec20607e0d853433a904105ee22df73c73 (patch) | |
tree | 428fd278fbffed6115e6fbf6b6b2bd9a3903c0f4 /core/src/test | |
parent | 0b8369d8548c0204b9c24d826c731063b72360b8 (diff) | |
download | spark-3c91afec20607e0d853433a904105ee22df73c73.tar.gz spark-3c91afec20607e0d853433a904105ee22df73c73.tar.bz2 spark-3c91afec20607e0d853433a904105ee22df73c73.zip |
[SPARK-14042][CORE] Add custom coalescer support
## What changes were proposed in this pull request?
This PR adds support for specifying an optional custom coalescer to the `coalesce()` method. Currently I have only added this feature to the `RDD` interface, and once we sort out the details we can proceed with adding this feature to the other APIs (`Dataset` etc.)
## How was this patch tested?
Added a unit test for this functionality.
/cc rxin (per our discussion on the mailing list)
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>
Closes #11865 from nezihyigitbasi/custom_coalesce_policy.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 99 |
1 files changed, 98 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 24daedab20..8dc463d56d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -17,13 +17,15 @@ package org.apache.spark.rdd -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.ClassTag import com.esotericsoftware.kryo.KryoException +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} import org.apache.spark._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} @@ -31,6 +33,20 @@ import org.apache.spark.rdd.RDDSuiteUtils._ import org.apache.spark.util.Utils class RDDSuite extends SparkFunSuite with SharedSparkContext { + var tempDir: File = _ + + override def beforeAll(): Unit = { + super.beforeAll() + tempDir = Utils.createTempDir() + } + + override def afterAll(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterAll() + } + } test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) @@ -951,6 +967,32 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(thrown.getMessage.contains("SPARK-5063")) } + test("custom RDD coalescer") { + val maxSplitSize = 512 + val outDir = new File(tempDir, "output").getAbsolutePath + sc.makeRDD(1 to 1000, 10).saveAsTextFile(outDir) + val hadoopRDD = + sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) + val coalescedHadoopRDD = + hadoopRDD.coalesce(2, partitionCoalescer = Option(new SizeBasedCoalescer(maxSplitSize))) + assert(coalescedHadoopRDD.partitions.size <= 10) + var totalPartitionCount = 0L + coalescedHadoopRDD.partitions.foreach(partition => { + var splitSizeSum = 0L + partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => { + val split = partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] + splitSizeSum += split.getLength + totalPartitionCount += 1 + }) + assert(splitSizeSum <= maxSplitSize) + }) + assert(totalPartitionCount == 10) + } + + // NOTE + // Below tests calling sc.stop() have to be the last tests in this suite. If there are tests + // running after them and if they access sc those tests will fail as sc is already closed, because + // sc is shared (this suite mixins SharedSparkContext) test("cannot run actions after SparkContext has been stopped (SPARK-5063)") { val existingRDD = sc.parallelize(1 to 100) sc.stop() @@ -971,5 +1013,60 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assertFails { sc.parallelize(1 to 100) } assertFails { sc.textFile("/nonexistent-path") } } +} +/** + * Coalesces partitions based on their size assuming that the parent RDD is a [[HadoopRDD]]. + * Took this class out of the test suite to prevent "Task not serializable" exceptions. + */ +class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable { + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { + val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions + val groups = ArrayBuffer[PartitionGroup]() + var currentGroup = new PartitionGroup() + var currentSum = 0L + var totalSum = 0L + var index = 0 + + // sort partitions based on the size of the corresponding input splits + partitions.sortWith((partition1, partition2) => { + val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength + val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength + partition1Size < partition2Size + }) + + def updateGroups(): Unit = { + groups += currentGroup + currentGroup = new PartitionGroup() + currentSum = 0 + } + + def addPartition(partition: Partition, splitSize: Long): Unit = { + currentGroup.partitions += partition + currentSum += splitSize + totalSum += splitSize + } + + while (index < partitions.size) { + val partition = partitions(index) + val fileSplit = + partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] + val splitSize = fileSplit.getLength + if (currentSum + splitSize < maxSize) { + addPartition(partition, splitSize) + index += 1 + if (index == partitions.size) { + updateGroups + } + } else { + if (currentGroup.partitions.size == 0) { + addPartition(partition, splitSize) + index += 1 + } else { + updateGroups + } + } + } + groups.toArray + } } |