diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-06-14 00:10:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-06-14 00:10:54 -0700 |
commit | 2cc188fd546fa061812f9fd4f72cf936bd01a0e6 (patch) | |
tree | 768aa1af9ae3d3d008a21864eb5872a870572427 /core | |
parent | 6738178d0daf1bbe7441db7c0c773a29bb2ec388 (diff) | |
download | spark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.tar.gz spark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.tar.bz2 spark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.zip |
SPARK-774: cogroup should also disable map side combine by default
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 7599ba1a02..8966f9f86e 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -6,7 +6,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer -import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext} +import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -49,12 +49,16 @@ private[spark] class CoGroupAggregator * * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output. - * @param mapSideCombine flag indicating whether to merge values before shuffle step. + * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag + * is on, Spark does an extra pass over the data on the map side to merge + * all values belonging to the same key together. This can reduce the amount + * of data shuffled if and only if the number of distinct keys is very small, + * and the ratio of key size to value size is also very small. */ class CoGroupedRDD[K]( @transient var rdds: Seq[RDD[(K, _)]], part: Partitioner, - val mapSideCombine: Boolean = true, + val mapSideCombine: Boolean = false, val serializerClass: String = null) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { |