diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-08 18:13:00 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-13 14:59:20 -0700 |
commit | 10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (patch) | |
tree | 4869648ee9f4466e1e5c88caae19db82d8a83af1 /core/src/main | |
parent | 4775c55641f281523f105f9272f164033242a0aa (diff) | |
download | spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.gz spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.bz2 spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.zip |
Remove mapSideCombine field from Aggregator.
Instead, the presence or absense of a ShuffleDependency's aggregator
will control whether map-side combining is performed.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/Aggregator.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ShuffledRDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 2 |
4 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 8d4f982413..df8ce9c054 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -9,15 +9,11 @@ import scala.collection.JavaConversions._ * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. - * @param mapSideCombine whether to apply combiners on map partitions, also - * known as map-side aggregations. When set to false, - * mergeCombiners function is not used. */ case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C, - val mapSideCombine: Boolean = true) { + val mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 19a51dd5b8..5a67073ef4 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -22,7 +22,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * Represents a dependency on the output of a shuffle stage. * @param shuffleId the shuffle id * @param rdd the parent RDD - * @param aggregator optional aggregator; this allows for map-side combining + * @param aggregator optional aggregator; if provided, map-side combining will be performed * @param partitioner partitioner used to partition the shuffle output */ class ShuffleDependency[K, V, C]( diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 04234491a6..8b1c29b065 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -14,6 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { /** * The resulting RDD from a shuffle (e.g. repartitioning of data). + * @param parent the parent RDD. + * @param aggregator if provided, this aggregator will be used to perform map-side combining. + * @param part the partitioner used to partition the RDD + * @tparam K the key class. + * @tparam V the value class. + * @tparam C if map side combiners are used, then this is the combiner type; otherwise, + * this is the same as V. */ class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 86796d3677..c97be18844 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -114,7 +114,7 @@ private[spark] class ShuffleMapTask( val partitioner = dep.partitioner val bucketIterators = - if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { + if (dep.aggregator.isDefined) { val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) |