aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
l---------core/FileServerSuite.txt1
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala16
-rw-r--r--core/src/main/scala/spark/Partitioner.scala19
-rw-r--r--core/src/main/scala/spark/RDD.scala20
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala3
-rw-r--r--docs/tuning.md8
8 files changed, 43 insertions, 28 deletions
diff --git a/core/FileServerSuite.txt b/core/FileServerSuite.txt
new file mode 120000
index 0000000000..0a21b7bf25
--- /dev/null
+++ b/core/FileServerSuite.txt
@@ -0,0 +1 @@
+/tmp/1359046053333-0/test/FileServerSuite.txt \ No newline at end of file
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 18b4a1eca4..d840118b82 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -23,6 +23,7 @@ import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.rdd._
import spark.SparkContext._
+import spark.Partitioner._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -437,21 +438,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
- * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
- *
- * The number of partitions will be the same as the number of partitions in the largest upstream
- * RDD, as this should be least likely to cause out-of-memory errors.
- */
- def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
- val bySize = rdds.sortBy(_.splits.size).reverse
- for (r <- bySize if r.partitioner != None) {
- return r.partitioner.get
- }
- return new HashPartitioner(bySize.head.splits.size)
- }
-
- /**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 9d5b966e1e..69f9534d23 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable {
def getPartition(key: Any): Int
}
+object Partitioner {
+ /**
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
+ * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ *
+ * The number of partitions will be the same as the number of partitions in the largest upstream
+ * RDD, as this should be least likely to cause out-of-memory errors.
+ *
+ * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
+ */
+ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
+ val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse
+ for (r <- bySize if r.partitioner != None) {
+ return r.partitioner.get
+ }
+ return new HashPartitioner(bySize.head.splits.size)
+ }
+}
+
/**
* A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
*
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6abb5c4792..b3188956d9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import spark.Partitioner._
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
@@ -300,18 +301,25 @@ abstract class RDD[T: ClassManifest](
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
+ * Return an RDD of grouped items.
+ */
+ def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+ groupBy[K](f, defaultPartitioner(this))
+
+ /**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
- val cleanF = sc.clean(f)
- this.map(t => (cleanF(t), t)).groupByKey(numSplits)
- }
-
+ def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] =
+ groupBy(f, new HashPartitioner(numSplits))
+
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
+ def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ val cleanF = sc.clean(f)
+ this.map(t => (cleanF(t), t)).groupByKey(p)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0efc00d5dd..ff367eafb4 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -693,7 +693,7 @@ class SparkContext(
checkpointDir = Some(dir)
}
- /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
+ /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = taskScheduler.defaultParallelism
/** Default min number of splits for Hadoop RDDs when not given by user */
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 8a123bdb47..4fba8b858c 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -237,7 +237,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
- val partitioner = rdd.defaultPartitioner(rdd)
+ val partitioner = Partitioner.defaultPartitioner(rdd)
fromRDD(reduceByKey(partitioner, func))
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 082022be1c..537d9c2e41 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -147,7 +147,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
driverActor ! ReviveOffers
}
- override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+ override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
}
private[spark] object StandaloneSchedulerBackend {
diff --git a/docs/tuning.md b/docs/tuning.md
index 9aaa53cd65..e9b4d6717c 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -213,10 +213,10 @@ but at a high level, managing how frequently full GC takes place can help in red
Clusters will not be fully utilized unless you set the level of parallelism for each operation high
enough. Spark automatically sets the number of "map" tasks to run on each file according to its size
-(though you can control it through optional parameters to `SparkContext.textFile`, etc), but for
-distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses a default value of 8.
-You can pass the level of parallelism as a second argument (see the
-[`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation),
+(though you can control it through optional parameters to `SparkContext.textFile`, etc), and for
+distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses the largest
+parent RDD's number of partitions. You can pass the level of parallelism as a second argument
+(see the [`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation),
or set the system property `spark.default.parallelism` to change the default.
In general, we recommend 2-3 tasks per CPU core in your cluster.