diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-08 18:13:00 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-13 14:59:20 -0700 |
commit | 10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (patch) | |
tree | 4869648ee9f4466e1e5c88caae19db82d8a83af1 | |
parent | 4775c55641f281523f105f9272f164033242a0aa (diff) | |
download | spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.gz spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.bz2 spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.zip |
Remove mapSideCombine field from Aggregator.
Instead, the presence or absense of a ShuffleDependency's aggregator
will control whether map-side combining is performed.
-rw-r--r-- | core/src/main/scala/spark/Aggregator.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ShuffledRDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 20 |
5 files changed, 15 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 8d4f982413..df8ce9c054 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -9,15 +9,11 @@ import scala.collection.JavaConversions._ * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. - * @param mapSideCombine whether to apply combiners on map partitions, also - * known as map-side aggregations. When set to false, - * mergeCombiners function is not used. */ case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C, - val mapSideCombine: Boolean = true) { + val mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 19a51dd5b8..5a67073ef4 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -22,7 +22,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * Represents a dependency on the output of a shuffle stage. * @param shuffleId the shuffle id * @param rdd the parent RDD - * @param aggregator optional aggregator; this allows for map-side combining + * @param aggregator optional aggregator; if provided, map-side combining will be performed * @param partitioner partitioner used to partition the shuffle output */ class ShuffleDependency[K, V, C]( diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 04234491a6..8b1c29b065 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -14,6 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { /** * The resulting RDD from a shuffle (e.g. repartitioning of data). + * @param parent the parent RDD. + * @param aggregator if provided, this aggregator will be used to perform map-side combining. + * @param part the partitioner used to partition the RDD + * @tparam K the key class. + * @tparam V the value class. + * @tparam C if map side combiners are used, then this is the combiner type; otherwise, + * this is the same as V. */ class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 86796d3677..c97be18844 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -114,7 +114,7 @@ private[spark] class ShuffleMapTask( val partitioner = dep.partitioner val bucketIterators = - if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { + if (dep.aggregator.isDefined) { val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index fc262d5c4c..397eb759c0 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -228,8 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val aggregator = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, - _+_, - false) + _+_) // Turn off map-side combine and test the results. var shuffledRdd : RDD[(Int, Int)] = @@ -237,22 +236,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { shuffledRdd = shuffledRdd.mapPartitions(aggregator.combineValuesByKey(_)) assert(shuffledRdd.collect().toSet === Set((1,8), (2, 1))) - // Turn map-side combine off and pass a wrong mergeCombine function. Should - // not see an exception because mergeCombine should not have been called. + // Run a wrong mergeCombine function with map-side combine on. + // We expect to see an exception thrown. val aggregatorWithException = new Aggregator[Int, Int, Int]( - (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - var shuffledRdd1 : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2)) - shuffledRdd1 = shuffledRdd1.mapPartitions(aggregatorWithException.combineValuesByKey(_)) - assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) - - // Now run the same mergeCombine function with map-side combine on. We - // expect to see an exception thrown. - val aggregatorWithException1 = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) var shuffledRdd2 : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException1), new HashPartitioner(2)) - shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException1.combineCombinersByKey(_)) + new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2)) + shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException.combineCombinersByKey(_)) evaluating { shuffledRdd2.collect() } should produce [SparkException] } } |