aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-08 18:13:00 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-13 14:59:20 -0700
commit10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (patch)
tree4869648ee9f4466e1e5c88caae19db82d8a83af1 /core/src/test/scala
parent4775c55641f281523f105f9272f164033242a0aa (diff)
downloadspark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.gz
spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.bz2
spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.zip
Remove mapSideCombine field from Aggregator.
Instead, the presence or absense of a ShuffleDependency's aggregator will control whether map-side combining is performed.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala20
1 files changed, 5 insertions, 15 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index fc262d5c4c..397eb759c0 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -228,8 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
val aggregator = new Aggregator[Int, Int, Int](
(v: Int) => v,
_+_,
- _+_,
- false)
+ _+_)
// Turn off map-side combine and test the results.
var shuffledRdd : RDD[(Int, Int)] =
@@ -237,22 +236,13 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
shuffledRdd = shuffledRdd.mapPartitions(aggregator.combineValuesByKey(_))
assert(shuffledRdd.collect().toSet === Set((1,8), (2, 1)))
- // Turn map-side combine off and pass a wrong mergeCombine function. Should
- // not see an exception because mergeCombine should not have been called.
+ // Run a wrong mergeCombine function with map-side combine on.
+ // We expect to see an exception thrown.
val aggregatorWithException = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- var shuffledRdd1 : RDD[(Int, Int)] =
- new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2))
- shuffledRdd1 = shuffledRdd1.mapPartitions(aggregatorWithException.combineValuesByKey(_))
- assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
-
- // Now run the same mergeCombine function with map-side combine on. We
- // expect to see an exception thrown.
- val aggregatorWithException1 = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
var shuffledRdd2 : RDD[(Int, Int)] =
- new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException1), new HashPartitioner(2))
- shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException1.combineCombinersByKey(_))
+ new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2))
+ shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException.combineCombinersByKey(_))
evaluating { shuffledRdd2.collect() } should produce [SparkException]
}
}