aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala16
-rw-r--r--core/src/main/scala/spark/Partitioner.scala19
-rw-r--r--core/src/main/scala/spark/RDD.scala20
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala3
6 files changed, 38 insertions, 24 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 18b4a1eca4..d840118b82 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -23,6 +23,7 @@ import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.rdd._
import spark.SparkContext._
+import spark.Partitioner._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -437,21 +438,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
- * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
- *
- * The number of partitions will be the same as the number of partitions in the largest upstream
- * RDD, as this should be least likely to cause out-of-memory errors.
- */
- def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
- val bySize = rdds.sortBy(_.splits.size).reverse
- for (r <- bySize if r.partitioner != None) {
- return r.partitioner.get
- }
- return new HashPartitioner(bySize.head.splits.size)
- }
-
- /**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 9d5b966e1e..69f9534d23 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -9,6 +9,25 @@ abstract class Partitioner extends Serializable {
def getPartition(key: Any): Int
}
+object Partitioner {
+ /**
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
+ * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ *
+ * The number of partitions will be the same as the number of partitions in the largest upstream
+ * RDD, as this should be least likely to cause out-of-memory errors.
+ *
+ * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
+ */
+ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
+ val bySize = (Seq(rdd) ++ others).sortBy(_.splits.size).reverse
+ for (r <- bySize if r.partitioner != None) {
+ return r.partitioner.get
+ }
+ return new HashPartitioner(bySize.head.splits.size)
+ }
+}
+
/**
* A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
*
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6abb5c4792..b3188956d9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import spark.Partitioner._
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
@@ -300,18 +301,25 @@ abstract class RDD[T: ClassManifest](
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
+ * Return an RDD of grouped items.
+ */
+ def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+ groupBy[K](f, defaultPartitioner(this))
+
+ /**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
- val cleanF = sc.clean(f)
- this.map(t => (cleanF(t), t)).groupByKey(numSplits)
- }
-
+ def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] =
+ groupBy(f, new HashPartitioner(numSplits))
+
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
+ def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ val cleanF = sc.clean(f)
+ this.map(t => (cleanF(t), t)).groupByKey(p)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0efc00d5dd..ff367eafb4 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -693,7 +693,7 @@ class SparkContext(
checkpointDir = Some(dir)
}
- /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
+ /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = taskScheduler.defaultParallelism
/** Default min number of splits for Hadoop RDDs when not given by user */
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 8a123bdb47..4fba8b858c 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -237,7 +237,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
- val partitioner = rdd.defaultPartitioner(rdd)
+ val partitioner = Partitioner.defaultPartitioner(rdd)
fromRDD(reduceByKey(partitioner, func))
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 082022be1c..537d9c2e41 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -147,7 +147,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
driverActor ! ReviveOffers
}
- override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+ override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
}
private[spark] object StandaloneSchedulerBackend {