aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-06-14 00:10:54 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-06-14 00:10:54 -0700
commit2cc188fd546fa061812f9fd4f72cf936bd01a0e6 (patch)
tree768aa1af9ae3d3d008a21864eb5872a870572427 /core
parent6738178d0daf1bbe7441db7c0c773a29bb2ec388 (diff)
downloadspark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.tar.gz
spark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.tar.bz2
spark-2cc188fd546fa061812f9fd4f72cf936bd01a0e6.zip
SPARK-774: cogroup should also disable map side combine by default
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 7599ba1a02..8966f9f86e 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -6,7 +6,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -49,12 +49,16 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step.
+ * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
+ * is on, Spark does an extra pass over the data on the map side to merge
+ * all values belonging to the same key together. This can reduce the amount
+ * of data shuffled if and only if the number of distinct keys is very small,
+ * and the ratio of key size to value size is also very small.
*/
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
- val mapSideCombine: Boolean = true,
+ val mapSideCombine: Boolean = false,
val serializerClass: String = null)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {