aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-08-17 21:07:34 -0700
committerReynold Xin <reynoldx@gmail.com>2013-08-17 21:07:34 -0700
commit10af952a3d81c8d7a1178c61cbcd1b5269dc1fe8 (patch)
treee9ffa8caa7b2b30af53ba7abea4740692a9346ee /core
parent5d050a3e1f6cb79d1f10a4d2d4207b04107b2078 (diff)
downloadspark-10af952a3d81c8d7a1178c61cbcd1b5269dc1fe8.tar.gz
spark-10af952a3d81c8d7a1178c61cbcd1b5269dc1fe8.tar.bz2
spark-10af952a3d81c8d7a1178c61cbcd1b5269dc1fe8.zip
Removed the mapSideCombine option in CoGroupedRDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala38
1 files changed, 5 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c540cd36eb..019b12d2d5 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
-private[spark] class CoGroupAggregator
- extends Aggregator[Any, Any, ArrayBuffer[Any]](
- { x => ArrayBuffer(x) },
- { (b, x) => b += x },
- { (b1, b2) => b1 ++ b2 })
- with Serializable
-
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@@ -66,34 +59,21 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
- * is on, Spark does an extra pass over the data on the map side to merge
- * all values belonging to the same key together. This can reduce the amount
- * of data shuffled if and only if the number of distinct keys is very small,
- * and the ratio of key size to value size is also very small.
*/
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
- val mapSideCombine: Boolean = false,
val serializerClass: String = null)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
- private val aggr = new CoGroupAggregator
-
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd =>
+ rdds.map { rdd: RDD[(K, _)] =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- if (mapSideCombine) {
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
- } else {
- new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
- }
+ new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
}
}
@@ -145,16 +125,8 @@ class CoGroupedRDD[K](
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- if (mapSideCombine) {
- // With map side combine on, for each key, the shuffle fetcher returns a list of values.
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, values) => getSeq(key)(depNum) ++= values
- }
- } else {
- // With map side combine off, for each key the shuffle fetcher returns a single value.
- fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, value) => getSeq(key)(depNum) += value
- }
+ fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
+ case (key, value) => getSeq(key)(depNum) += value
}
}
}