From 10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 8 Oct 2012 18:13:00 -0700 Subject: Remove mapSideCombine field from Aggregator. Instead, the presence or absense of a ShuffleDependency's aggregator will control whether map-side combining is performed. --- core/src/main/scala/spark/Aggregator.scala | 6 +----- core/src/main/scala/spark/Dependency.scala | 2 +- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 7 +++++++ core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 8d4f982413..df8ce9c054 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -9,15 +9,11 @@ import scala.collection.JavaConversions._ * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. - * @param mapSideCombine whether to apply combiners on map partitions, also - * known as map-side aggregations. When set to false, - * mergeCombiners function is not used. */ case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C, - val mapSideCombine: Boolean = true) { + val mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 19a51dd5b8..5a67073ef4 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -22,7 +22,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * Represents a dependency on the output of a shuffle stage. * @param shuffleId the shuffle id * @param rdd the parent RDD - * @param aggregator optional aggregator; this allows for map-side combining + * @param aggregator optional aggregator; if provided, map-side combining will be performed * @param partitioner partitioner used to partition the shuffle output */ class ShuffleDependency[K, V, C]( diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 04234491a6..8b1c29b065 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -14,6 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { /** * The resulting RDD from a shuffle (e.g. repartitioning of data). + * @param parent the parent RDD. + * @param aggregator if provided, this aggregator will be used to perform map-side combining. + * @param part the partitioner used to partition the RDD + * @tparam K the key class. + * @tparam V the value class. + * @tparam C if map side combiners are used, then this is the combiner type; otherwise, + * this is the same as V. */ class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 86796d3677..c97be18844 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -114,7 +114,7 @@ private[spark] class ShuffleMapTask( val partitioner = dep.partitioner val bucketIterators = - if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { + if (dep.aggregator.isDefined) { val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) -- cgit v1.2.3