From 10af952a3d81c8d7a1178c61cbcd1b5269dc1fe8 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 17 Aug 2013 21:07:34 -0700 Subject: Removed the mapSideCombine option in CoGroupedRDD. --- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 38 ++++-------------------- 1 file changed, 5 insertions(+), 33 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index c540cd36eb..019b12d2d5 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer -import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext} +import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) override def hashCode(): Int = idx } -private[spark] class CoGroupAggregator - extends Aggregator[Any, Any, ArrayBuffer[Any]]( - { x => ArrayBuffer(x) }, - { (b, x) => b += x }, - { (b1, b2) => b1 ++ b2 }) - with Serializable - /** * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a @@ -66,34 +59,21 @@ private[spark] class CoGroupAggregator * * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output. - * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag - * is on, Spark does an extra pass over the data on the map side to merge - * all values belonging to the same key together. This can reduce the amount - * of data shuffled if and only if the number of distinct keys is very small, - * and the ratio of key size to value size is also very small. */ class CoGroupedRDD[K]( @transient var rdds: Seq[RDD[(K, _)]], part: Partitioner, - val mapSideCombine: Boolean = false, val serializerClass: String = null) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { - private val aggr = new CoGroupAggregator - override def getDependencies: Seq[Dependency[_]] = { - rdds.map { rdd => + rdds.map { rdd: RDD[(K, _)] => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - if (mapSideCombine) { - val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) - new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass) - } else { - new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass) - } + new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass) } } } @@ -145,16 +125,8 @@ class CoGroupedRDD[K]( case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher - if (mapSideCombine) { - // With map side combine on, for each key, the shuffle fetcher returns a list of values. - fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach { - case (key, values) => getSeq(key)(depNum) ++= values - } - } else { - // With map side combine off, for each key the shuffle fetcher returns a single value. - fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach { - case (key, value) => getSeq(key)(depNum) += value - } + fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach { + case (key, value) => getSeq(key)(depNum) += value } } } -- cgit v1.2.3