From 6cd68c31cbebb1e3c6b35026f067a3c82ce9fdfb Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:29:11 -0600 Subject: Update default.parallelism docs, have StandaloneSchedulerBackend use it. Only brand new RDDs (e.g. parallelize and makeRDD) now use default parallelism, everything else uses their largest parent's partitioner or partition size. --- core/src/main/scala/spark/PairRDDFunctions.scala | 16 +--------------- core/src/main/scala/spark/Partitioner.scala | 19 +++++++++++++++++++ core/src/main/scala/spark/RDD.scala | 20 ++++++++++++++------ core/src/main/scala/spark/SparkContext.scala | 2 +- core/src/main/scala/spark/api/java/JavaPairRDD.scala | 2 +- .../cluster/StandaloneSchedulerBackend.scala | 3 ++- 6 files changed, 38 insertions(+), 24 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 18b4a1eca4..d840118b82 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -23,6 +23,7 @@ import spark.partial.BoundedDouble import spark.partial.PartialResult import spark.rdd._ import spark.SparkContext._ +import spark.Partitioner._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -436,21 +437,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } - /** - * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of - * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. - * - * The number of partitions will be the same as the number of partitions in the largest upstream - * RDD, as this should be least likely to cause out-of-memory errors. - */ - def defaultPartitioner(rdds: RDD[_]*): Partitioner = { - val bySize = rdds.sortBy(_.splits.size).reverse - for (r <- bySize if r.partitioner != None) { - return r.partitioner.get - } - return new HashPartitioner(bySize.head.splits.size) - } - /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 9d5b966e1e..69f9534d23 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable { def getPartition(key: Any): Int } +object Partitioner { + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + * + * The number of partitions will be the same as the number of partitions in the largest upstream + * RDD, as this should be least likely to cause out-of-memory errors. + * + * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. + */ + def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { + val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse + for (r <- bySize if r.partitioner != None) { + return r.partitioner.get + } + return new HashPartitioner(bySize.head.splits.size) + } +} + /** * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. * diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6abb5c4792..b3188956d9 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import spark.Partitioner._ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator @@ -299,19 +300,26 @@ abstract class RDD[T: ClassManifest]( */ def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + /** + * Return an RDD of grouped items. + */ + def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = + groupBy[K](f, defaultPartitioner(this)) + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { - val cleanF = sc.clean(f) - this.map(t => (cleanF(t), t)).groupByKey(numSplits) - } - + def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = + groupBy(f, new HashPartitioner(numSplits)) + /** * Return an RDD of grouped items. */ - def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) + def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + val cleanF = sc.clean(f) + this.map(t => (cleanF(t), t)).groupByKey(p) + } /** * Return an RDD created by piping elements to a forked external process. diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0efc00d5dd..ff367eafb4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -693,7 +693,7 @@ class SparkContext( checkpointDir = Some(dir) } - /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: Int = taskScheduler.defaultParallelism /** Default min number of splits for Hadoop RDDs when not given by user */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 8a123bdb47..4fba8b858c 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -237,7 +237,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { - val partitioner = rdd.defaultPartitioner(rdd) + val partitioner = Partitioner.defaultPartitioner(rdd) fromRDD(reduceByKey(partitioner, func)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 082022be1c..537d9c2e41 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -147,7 +147,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } - override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) + .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) } private[spark] object StandaloneSchedulerBackend { -- cgit v1.2.3