aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-24 23:54:03 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-24 23:54:03 -0600
commitc44ccf2862e8be183ccecac3bf61f9651b21984a (patch)
treefa5f1bf6872ccf87241177d6a3cdf07d5e2e8323
parent44032bc476be9f334d17db3b8963a8deb973123c (diff)
downloadspark-c44ccf2862e8be183ccecac3bf61f9651b21984a.tar.gz
spark-c44ccf2862e8be183ccecac3bf61f9651b21984a.tar.bz2
spark-c44ccf2862e8be183ccecac3bf61f9651b21984a.zip
Use default parallelism if its set.
-rw-r--r--core/src/main/scala/spark/Partitioner.scala23
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala2
2 files changed, 19 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 03966f1c96..eec0e8dd79 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -10,12 +10,21 @@ abstract class Partitioner extends Serializable {
}
object Partitioner {
+
+ private val useDefaultParallelism = System.getProperty("spark.default.parallelism") != null
+
/**
- * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
- * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
+ *
+ * If any of the RDDs already has a partitioner, choose that one.
*
- * The number of partitions will be the same as the number of partitions in the largest upstream
- * RDD, as this should be least likely to cause out-of-memory errors.
+ * Otherwise, we use a default HashPartitioner. For the number of partitions, if
+ * spark.default.parallelism is set, then we'll use the value from SparkContext
+ * defaultParallelism, otherwise we'll use the max number of upstream partitions.
+ *
+ * Unless spark.default.parallelism is set, He number of partitions will be the
+ * same as the number of partitions in the largest upstream RDD, as this should
+ * be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
@@ -24,7 +33,11 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- return new HashPartitioner(bySize.head.partitions.size)
+ if (useDefaultParallelism) {
+ return new HashPartitioner(rdd.context.defaultParallelism)
+ } else {
+ return new HashPartitioner(bySize.head.partitions.size)
+ }
}
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 2099999ed7..8411291b2c 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -235,7 +235,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.values.collect().toList === List("a", "b"))
}
- test("default partitioner uses split size") {
+ test("default partitioner uses partition size") {
sc = new SparkContext("local", "test")
// specify 2000 partitions
val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)