From 680f42e6cd1ee8593136323a539dc5117b165377 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sun, 10 Feb 2013 02:27:03 -0600 Subject: Change defaultPartitioner to use upstream split size. Previously it used the SparkContext.defaultParallelism, which occassionally ended up being a very bad guess. Looking at upstream RDDs seems to make better use of the context. Also sorted the upstream RDDs by partition size first, as if we have a hugely-partitioned RDD and tiny-partitioned RDD, it is unlikely we want the resulting RDD to be tiny-partitioned. --- core/src/main/scala/spark/PairRDDFunctions.scala | 8 ++++++-- core/src/test/scala/spark/PartitioningSuite.scala | 8 ++++---- core/src/test/scala/spark/ShuffleSuite.scala | 19 +++++++++++++++++++ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index cc3cca2571..18b4a1eca4 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -439,12 +439,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + * + * The number of partitions will be the same as the number of partitions in the largest upstream + * RDD, as this should be least likely to cause out-of-memory errors. */ def defaultPartitioner(rdds: RDD[_]*): Partitioner = { - for (r <- rdds if r.partitioner != None) { + val bySize = rdds.sortBy(_.splits.size).reverse + for (r <- bySize if r.partitioner != None) { return r.partitioner.get } - return new HashPartitioner(self.context.defaultParallelism) + return new HashPartitioner(bySize.head.splits.size) } /** diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index af1107cd19..60db759c25 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner) assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner) - assert(grouped2.join(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 3493b9511f..ab7060a1ac 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -211,6 +211,25 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } + + test("default partition size uses split size") { + sc = new SparkContext("local", "test") + // specify 2000 splits + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 splits + val c = b.groupByKey() + assert(c.splits.size === 2000) + } + + test("default partition uses largest partitioner") { + sc = new SparkContext("local", "test") + val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val c = a.join(b) + assert(c.splits.size === 2000) + } } object ShuffleSuite { -- cgit v1.2.3 From 6cd68c31cbebb1e3c6b35026f067a3c82ce9fdfb Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:29:11 -0600 Subject: Update default.parallelism docs, have StandaloneSchedulerBackend use it. Only brand new RDDs (e.g. parallelize and makeRDD) now use default parallelism, everything else uses their largest parent's partitioner or partition size. --- core/FileServerSuite.txt | 1 + core/src/main/scala/spark/PairRDDFunctions.scala | 16 +--------------- core/src/main/scala/spark/Partitioner.scala | 19 +++++++++++++++++++ core/src/main/scala/spark/RDD.scala | 20 ++++++++++++++------ core/src/main/scala/spark/SparkContext.scala | 2 +- core/src/main/scala/spark/api/java/JavaPairRDD.scala | 2 +- .../cluster/StandaloneSchedulerBackend.scala | 3 ++- docs/tuning.md | 8 ++++---- 8 files changed, 43 insertions(+), 28 deletions(-) create mode 120000 core/FileServerSuite.txt diff --git a/core/FileServerSuite.txt b/core/FileServerSuite.txt new file mode 120000 index 0000000000..0a21b7bf25 --- /dev/null +++ b/core/FileServerSuite.txt @@ -0,0 +1 @@ +/tmp/1359046053333-0/test/FileServerSuite.txt \ No newline at end of file diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 18b4a1eca4..d840118b82 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -23,6 +23,7 @@ import spark.partial.BoundedDouble import spark.partial.PartialResult import spark.rdd._ import spark.SparkContext._ +import spark.Partitioner._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -436,21 +437,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } - /** - * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of - * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. - * - * The number of partitions will be the same as the number of partitions in the largest upstream - * RDD, as this should be least likely to cause out-of-memory errors. - */ - def defaultPartitioner(rdds: RDD[_]*): Partitioner = { - val bySize = rdds.sortBy(_.splits.size).reverse - for (r <- bySize if r.partitioner != None) { - return r.partitioner.get - } - return new HashPartitioner(bySize.head.splits.size) - } - /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 9d5b966e1e..69f9534d23 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable { def getPartition(key: Any): Int } +object Partitioner { + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + * + * The number of partitions will be the same as the number of partitions in the largest upstream + * RDD, as this should be least likely to cause out-of-memory errors. + * + * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. + */ + def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { + val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse + for (r <- bySize if r.partitioner != None) { + return r.partitioner.get + } + return new HashPartitioner(bySize.head.splits.size) + } +} + /** * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. * diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6abb5c4792..b3188956d9 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import spark.Partitioner._ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator @@ -299,19 +300,26 @@ abstract class RDD[T: ClassManifest]( */ def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + /** + * Return an RDD of grouped items. + */ + def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = + groupBy[K](f, defaultPartitioner(this)) + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { - val cleanF = sc.clean(f) - this.map(t => (cleanF(t), t)).groupByKey(numSplits) - } - + def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = + groupBy(f, new HashPartitioner(numSplits)) + /** * Return an RDD of grouped items. */ - def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) + def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + val cleanF = sc.clean(f) + this.map(t => (cleanF(t), t)).groupByKey(p) + } /** * Return an RDD created by piping elements to a forked external process. diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0efc00d5dd..ff367eafb4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -693,7 +693,7 @@ class SparkContext( checkpointDir = Some(dir) } - /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: Int = taskScheduler.defaultParallelism /** Default min number of splits for Hadoop RDDs when not given by user */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 8a123bdb47..4fba8b858c 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -237,7 +237,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { - val partitioner = rdd.defaultPartitioner(rdd) + val partitioner = Partitioner.defaultPartitioner(rdd) fromRDD(reduceByKey(partitioner, func)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 082022be1c..537d9c2e41 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -147,7 +147,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } - override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) + .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) } private[spark] object StandaloneSchedulerBackend { diff --git a/docs/tuning.md b/docs/tuning.md index 9aaa53cd65..e9b4d6717c 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -213,10 +213,10 @@ but at a high level, managing how frequently full GC takes place can help in red Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of "map" tasks to run on each file according to its size -(though you can control it through optional parameters to `SparkContext.textFile`, etc), but for -distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses a default value of 8. -You can pass the level of parallelism as a second argument (see the -[`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation), +(though you can control it through optional parameters to `SparkContext.textFile`, etc), and for +distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses the largest +parent RDD's number of partitions. You can pass the level of parallelism as a second argument +(see the [`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation), or set the system property `spark.default.parallelism` to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster. -- cgit v1.2.3 From 37397106ce64106b1d0dedd6ab428649e08a56cd Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:31:07 -0600 Subject: Remove fileServerSuite.txt. --- core/FileServerSuite.txt | 1 - 1 file changed, 1 deletion(-) delete mode 120000 core/FileServerSuite.txt diff --git a/core/FileServerSuite.txt b/core/FileServerSuite.txt deleted file mode 120000 index 0a21b7bf25..0000000000 --- a/core/FileServerSuite.txt +++ /dev/null @@ -1 +0,0 @@ -/tmp/1359046053333-0/test/FileServerSuite.txt \ No newline at end of file -- cgit v1.2.3 From 6a2d95784366c8e7e54140ae6482e14acbd4d759 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:33:49 -0600 Subject: Tweak test names. --- core/src/test/scala/spark/ShuffleSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index ab7060a1ac..0cff5b7ff7 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.values.collect().toList === List("a", "b")) } - test("default partition size uses split size") { + test("default partitioner uses split size") { sc = new SparkContext("local", "test") // specify 2000 splits val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) @@ -223,7 +223,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(c.splits.size === 2000) } - test("default partition uses largest partitioner") { + test("default partitioner uses largest partitioner") { sc = new SparkContext("local", "test") val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) -- cgit v1.2.3 From 4281e579c236d0125f44f5ca1d999adb5f894c24 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:45:03 -0600 Subject: Update more javadocs. --- core/src/main/scala/spark/PairRDDFunctions.scala | 13 +++++++------ core/src/main/scala/spark/api/java/JavaPairRDD.scala | 19 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index d840118b82..4c4ab60f43 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -247,8 +247,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } /** - * Simplified version of combineByKey that hash-partitions the resulting RDD using the default - * parallelism level. + * Simplified version of combineByKey that hash-partitions the resulting RDD using the + * existing partitioner/parallelism level. */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { @@ -258,7 +258,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. + * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ + * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) @@ -266,7 +267,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the default parallelism level. + * resulting RDD with the existing partitioner/parallelism level. */ def groupByKey(): RDD[(K, Seq[V])] = { groupByKey(defaultPartitioner(self)) @@ -294,7 +295,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * using the default level of parallelism. + * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) @@ -314,7 +315,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting - * RDD using the default parallelism level. + * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 4fba8b858c..cdfb8930fd 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -19,6 +19,7 @@ import spark.OrderedRDDFunctions import spark.storage.StorageLevel import spark.HashPartitioner import spark.Partitioner +import spark.Partitioner._ import spark.RDD import spark.SparkContext.rddToPairRDDFunctions @@ -220,30 +221,30 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(rdd.rightOuterJoin(other, partitioner)) /** - * Simplified version of combineByKey that hash-partitions the resulting RDD using the default - * parallelism level. + * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing + * partitioner/parallelism level. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = { implicit val cm: ClassManifest[C] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] - fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners)) + fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd))) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. + * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ + * parallelism level. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { - val partitioner = Partitioner.defaultPartitioner(rdd) - fromRDD(reduceByKey(partitioner, func)) + fromRDD(reduceByKey(defaultPartitioner(rdd), func)) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the default parallelism level. + * resulting RDD with the existing partitioner/parallelism level. */ def groupByKey(): JavaPairRDD[K, JList[V]] = fromRDD(groupByResultToJava(rdd.groupByKey())) @@ -268,7 +269,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * using the default level of parallelism. + * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] = fromRDD(rdd.leftOuterJoin(other)) @@ -286,7 +287,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting - * RDD using the default parallelism level. + * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] = fromRDD(rdd.rightOuterJoin(other)) -- cgit v1.2.3 From c44ccf2862e8be183ccecac3bf61f9651b21984a Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sun, 24 Feb 2013 23:54:03 -0600 Subject: Use default parallelism if its set. --- core/src/main/scala/spark/Partitioner.scala | 23 ++++++++++++++++++----- core/src/test/scala/spark/ShuffleSuite.scala | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 03966f1c96..eec0e8dd79 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -10,12 +10,21 @@ abstract class Partitioner extends Serializable { } object Partitioner { + + private val useDefaultParallelism = System.getProperty("spark.default.parallelism") != null + /** - * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of - * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. + * + * If any of the RDDs already has a partitioner, choose that one. * - * The number of partitions will be the same as the number of partitions in the largest upstream - * RDD, as this should be least likely to cause out-of-memory errors. + * Otherwise, we use a default HashPartitioner. For the number of partitions, if + * spark.default.parallelism is set, then we'll use the value from SparkContext + * defaultParallelism, otherwise we'll use the max number of upstream partitions. + * + * Unless spark.default.parallelism is set, He number of partitions will be the + * same as the number of partitions in the largest upstream RDD, as this should + * be least likely to cause out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ @@ -24,7 +33,11 @@ object Partitioner { for (r <- bySize if r.partitioner != None) { return r.partitioner.get } - return new HashPartitioner(bySize.head.partitions.size) + if (useDefaultParallelism) { + return new HashPartitioner(rdd.context.defaultParallelism) + } else { + return new HashPartitioner(bySize.head.partitions.size) + } } } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 2099999ed7..8411291b2c 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -235,7 +235,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.values.collect().toList === List("a", "b")) } - test("default partitioner uses split size") { + test("default partitioner uses partition size") { sc = new SparkContext("local", "test") // specify 2000 partitions val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) -- cgit v1.2.3