From df5fd352735005ce0322d287ae27d72d12a7fc8e Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sun, 8 Sep 2013 15:39:03 -0500 Subject: Add better docs for coalesce. Include the useful tip that if shuffle=true, coalesce can actually increase the number of partitions. This makes coalesce more like a generic `RDD.repartition` operation. (Ideally this `RDD.repartition` could automatically choose either a coalesce or a shuffle if numPartitions was either less than or greater than, respectively, the current number of partitions.) --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 17 +++++++++++++++++ core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 14 ++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e143ecd096..41a90f139e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -267,6 +267,23 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numPartitions` partitions. + * + * This results in a narrow dependency, e.g. if you go from 1000 partitions + * to 100 partitions, there will not be a shuffle, instead each of the 100 + * new partitions will claim 10 of the current partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you can pass shuffle = true. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). + * + * Note: With shuffle = true, you can actually coalesce to a larger number + * of partitions. This is useful if you have a small number of partitions, + * say 100, potentially with a few partitions being abnormally large. Calling + * coalecse(1000, shuffle = true) will result in 1000 partitions with the + * data evenly distributed into each partition. */ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { if (shuffle) { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index adc971050e..6096149b19 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(rdd.union(emptyKv).collect().size === 2) } - test("cogrouped RDDs") { + test("coalesced RDDs") { val data = sc.parallelize(1 to 10, 10) val coalesced1 = data.coalesce(2) @@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced5 = data.coalesce(1, shuffle = true) assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] != null) + + // when shuffling, we can increase the number of partitions + val coalesced6 = data.coalesce(20, shuffle = true) + assert(coalesced6.partitions.size === 20) + assert(coalesced6.collect().toList === (1 to 10).toList) } - test("cogrouped RDDs with locality") { + + test("coalesced RDDs with locality") { val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val coal3 = data3.coalesce(3) val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) @@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { val coalesced4 = data.coalesce(20) val listOfLists = coalesced4.glom().collect().map(_.toList).toList val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) } - assert( sortedList === (1 to 9). + assert(sortedList === (1 to 9). map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back") } - test("cogrouped RDDs with locality, large scale (10K partitions)") { + test("coalesced RDDs with locality, large scale (10K partitions)") { // large scale experiment import collection.mutable val rnd = scala.util.Random -- cgit v1.2.3