aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorNezih Yigitbasi <nyigitbasi@netflix.com>2016-04-19 14:35:26 -0700
committerReynold Xin <rxin@databricks.com>2016-04-19 14:35:26 -0700
commit3c91afec20607e0d853433a904105ee22df73c73 (patch)
tree428fd278fbffed6115e6fbf6b6b2bd9a3903c0f4 /core/src/test/scala/org/apache
parent0b8369d8548c0204b9c24d826c731063b72360b8 (diff)
downloadspark-3c91afec20607e0d853433a904105ee22df73c73.tar.gz
spark-3c91afec20607e0d853433a904105ee22df73c73.tar.bz2
spark-3c91afec20607e0d853433a904105ee22df73c73.zip
[SPARK-14042][CORE] Add custom coalescer support
## What changes were proposed in this pull request? This PR adds support for specifying an optional custom coalescer to the `coalesce()` method. Currently I have only added this feature to the `RDD` interface, and once we sort out the details we can proceed with adding this feature to the other APIs (`Dataset` etc.) ## How was this patch tested? Added a unit test for this functionality. /cc rxin (per our discussion on the mailing list) Author: Nezih Yigitbasi <nyigitbasi@netflix.com> Closes #11865 from nezihyigitbasi/custom_coalesce_policy.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala99
1 files changed, 98 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 24daedab20..8dc463d56d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -17,13 +17,15 @@
package org.apache.spark.rdd
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.KryoException
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@@ -31,6 +33,20 @@ import org.apache.spark.rdd.RDDSuiteUtils._
import org.apache.spark.util.Utils
class RDDSuite extends SparkFunSuite with SharedSparkContext {
+ var tempDir: File = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ tempDir = Utils.createTempDir()
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterAll()
+ }
+ }
test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
@@ -951,6 +967,32 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(thrown.getMessage.contains("SPARK-5063"))
}
+ test("custom RDD coalescer") {
+ val maxSplitSize = 512
+ val outDir = new File(tempDir, "output").getAbsolutePath
+ sc.makeRDD(1 to 1000, 10).saveAsTextFile(outDir)
+ val hadoopRDD =
+ sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+ val coalescedHadoopRDD =
+ hadoopRDD.coalesce(2, partitionCoalescer = Option(new SizeBasedCoalescer(maxSplitSize)))
+ assert(coalescedHadoopRDD.partitions.size <= 10)
+ var totalPartitionCount = 0L
+ coalescedHadoopRDD.partitions.foreach(partition => {
+ var splitSizeSum = 0L
+ partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => {
+ val split = partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
+ splitSizeSum += split.getLength
+ totalPartitionCount += 1
+ })
+ assert(splitSizeSum <= maxSplitSize)
+ })
+ assert(totalPartitionCount == 10)
+ }
+
+ // NOTE
+ // Below tests calling sc.stop() have to be the last tests in this suite. If there are tests
+ // running after them and if they access sc those tests will fail as sc is already closed, because
+ // sc is shared (this suite mixins SharedSparkContext)
test("cannot run actions after SparkContext has been stopped (SPARK-5063)") {
val existingRDD = sc.parallelize(1 to 100)
sc.stop()
@@ -971,5 +1013,60 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assertFails { sc.parallelize(1 to 100) }
assertFails { sc.textFile("/nonexistent-path") }
}
+}
+/**
+ * Coalesces partitions based on their size assuming that the parent RDD is a [[HadoopRDD]].
+ * Took this class out of the test suite to prevent "Task not serializable" exceptions.
+ */
+class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable {
+ override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
+ val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
+ val groups = ArrayBuffer[PartitionGroup]()
+ var currentGroup = new PartitionGroup()
+ var currentSum = 0L
+ var totalSum = 0L
+ var index = 0
+
+ // sort partitions based on the size of the corresponding input splits
+ partitions.sortWith((partition1, partition2) => {
+ val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
+ val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
+ partition1Size < partition2Size
+ })
+
+ def updateGroups(): Unit = {
+ groups += currentGroup
+ currentGroup = new PartitionGroup()
+ currentSum = 0
+ }
+
+ def addPartition(partition: Partition, splitSize: Long): Unit = {
+ currentGroup.partitions += partition
+ currentSum += splitSize
+ totalSum += splitSize
+ }
+
+ while (index < partitions.size) {
+ val partition = partitions(index)
+ val fileSplit =
+ partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
+ val splitSize = fileSplit.getLength
+ if (currentSum + splitSize < maxSize) {
+ addPartition(partition, splitSize)
+ index += 1
+ if (index == partitions.size) {
+ updateGroups
+ }
+ } else {
+ if (currentGroup.partitions.size == 0) {
+ addPartition(partition, splitSize)
+ index += 1
+ } else {
+ updateGroups
+ }
+ }
+ }
+ groups.toArray
+ }
}