diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-08 18:13:00 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-10-13 14:59:20 -0700 |
commit | 10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (patch) | |
tree | 4869648ee9f4466e1e5c88caae19db82d8a83af1 /core/src/test/scala | |
parent | 4775c55641f281523f105f9272f164033242a0aa (diff) | |
download | spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.gz spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.bz2 spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.zip |
Remove mapSideCombine field from Aggregator.
Instead, the presence or absense of a ShuffleDependency's aggregator
will control whether map-side combining is performed.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 20 |
1 files changed, 5 insertions, 15 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index fc262d5c4c..397eb759c0 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -228,8 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val aggregator = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, - _+_, - false) + _+_) // Turn off map-side combine and test the results. var shuffledRdd : RDD[(Int, Int)] = @@ -237,22 +236,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { shuffledRdd = shuffledRdd.mapPartitions(aggregator.combineValuesByKey(_)) assert(shuffledRdd.collect().toSet === Set((1,8), (2, 1))) - // Turn map-side combine off and pass a wrong mergeCombine function. Should - // not see an exception because mergeCombine should not have been called. + // Run a wrong mergeCombine function with map-side combine on. + // We expect to see an exception thrown. val aggregatorWithException = new Aggregator[Int, Int, Int]( - (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - var shuffledRdd1 : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2)) - shuffledRdd1 = shuffledRdd1.mapPartitions(aggregatorWithException.combineValuesByKey(_)) - assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) - - // Now run the same mergeCombine function with map-side combine on. We - // expect to see an exception thrown. - val aggregatorWithException1 = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) var shuffledRdd2 : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException1), new HashPartitioner(2)) - shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException1.combineCombinersByKey(_)) + new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2)) + shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException.combineCombinersByKey(_)) evaluating { shuffledRdd2.collect() } should produce [SparkException] } } |