From 6cd68c31cbebb1e3c6b35026f067a3c82ce9fdfb Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 16 Feb 2013 00:29:11 -0600 Subject: Update default.parallelism docs, have StandaloneSchedulerBackend use it. Only brand new RDDs (e.g. parallelize and makeRDD) now use default parallelism, everything else uses their largest parent's partitioner or partition size. --- core/FileServerSuite.txt | 1 + core/src/main/scala/spark/PairRDDFunctions.scala | 16 +--------------- core/src/main/scala/spark/Partitioner.scala | 19 +++++++++++++++++++ core/src/main/scala/spark/RDD.scala | 20 ++++++++++++++------ core/src/main/scala/spark/SparkContext.scala | 2 +- core/src/main/scala/spark/api/java/JavaPairRDD.scala | 2 +- .../cluster/StandaloneSchedulerBackend.scala | 3 ++- docs/tuning.md | 8 ++++---- 8 files changed, 43 insertions(+), 28 deletions(-) create mode 120000 core/FileServerSuite.txt diff --git a/core/FileServerSuite.txt b/core/FileServerSuite.txt new file mode 120000 index 0000000000..0a21b7bf25 --- /dev/null +++ b/core/FileServerSuite.txt @@ -0,0 +1 @@ +/tmp/1359046053333-0/test/FileServerSuite.txt \ No newline at end of file diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 18b4a1eca4..d840118b82 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -23,6 +23,7 @@ import spark.partial.BoundedDouble import spark.partial.PartialResult import spark.rdd._ import spark.SparkContext._ +import spark.Partitioner._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -436,21 +437,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } - /** - * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of - * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. - * - * The number of partitions will be the same as the number of partitions in the largest upstream - * RDD, as this should be least likely to cause out-of-memory errors. - */ - def defaultPartitioner(rdds: RDD[_]*): Partitioner = { - val bySize = rdds.sortBy(_.splits.size).reverse - for (r <- bySize if r.partitioner != None) { - return r.partitioner.get - } - return new HashPartitioner(bySize.head.splits.size) - } - /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 9d5b966e1e..69f9534d23 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable { def getPartition(key: Any): Int } +object Partitioner { + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + * + * The number of partitions will be the same as the number of partitions in the largest upstream + * RDD, as this should be least likely to cause out-of-memory errors. + * + * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. + */ + def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { + val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse + for (r <- bySize if r.partitioner != None) { + return r.partitioner.get + } + return new HashPartitioner(bySize.head.splits.size) + } +} + /** * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. * diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6abb5c4792..b3188956d9 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import spark.Partitioner._ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator @@ -299,19 +300,26 @@ abstract class RDD[T: ClassManifest]( */ def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + /** + * Return an RDD of grouped items. + */ + def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = + groupBy[K](f, defaultPartitioner(this)) + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { - val cleanF = sc.clean(f) - this.map(t => (cleanF(t), t)).groupByKey(numSplits) - } - + def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = + groupBy(f, new HashPartitioner(numSplits)) + /** * Return an RDD of grouped items. */ - def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) + def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + val cleanF = sc.clean(f) + this.map(t => (cleanF(t), t)).groupByKey(p) + } /** * Return an RDD created by piping elements to a forked external process. diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0efc00d5dd..ff367eafb4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -693,7 +693,7 @@ class SparkContext( checkpointDir = Some(dir) } - /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: Int = taskScheduler.defaultParallelism /** Default min number of splits for Hadoop RDDs when not given by user */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 8a123bdb47..4fba8b858c 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -237,7 +237,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { - val partitioner = rdd.defaultPartitioner(rdd) + val partitioner = Partitioner.defaultPartitioner(rdd) fromRDD(reduceByKey(partitioner, func)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 082022be1c..537d9c2e41 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -147,7 +147,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } - override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) + .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) } private[spark] object StandaloneSchedulerBackend { diff --git a/docs/tuning.md b/docs/tuning.md index 9aaa53cd65..e9b4d6717c 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -213,10 +213,10 @@ but at a high level, managing how frequently full GC takes place can help in red Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of "map" tasks to run on each file according to its size -(though you can control it through optional parameters to `SparkContext.textFile`, etc), but for -distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses a default value of 8. -You can pass the level of parallelism as a second argument (see the -[`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation), +(though you can control it through optional parameters to `SparkContext.textFile`, etc), and for +distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses the largest +parent RDD's number of partitions. You can pass the level of parallelism as a second argument +(see the [`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation), or set the system property `spark.default.parallelism` to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster. -- cgit v1.2.3