aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-08-30 12:34:28 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2012-08-30 12:34:28 -0700
commita8a2a08a1a7e652920702f25a89e43788d538d05 (patch)
treed8efd5f16b7efa4e34db2f5040b6a5156ffb5f58 /core/src/test/scala
parent5945bcdcc56a71324357b02c21bef80dd7efd13a (diff)
downloadspark-a8a2a08a1a7e652920702f25a89e43788d538d05.tar.gz
spark-a8a2a08a1a7e652920702f25a89e43788d538d05.tar.bz2
spark-a8a2a08a1a7e652920702f25a89e43788d538d05.zip
Added a test for testing map-side combine on/off switch.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala45
1 files changed, 44 insertions, 1 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 99d13b31ef..f622c413f7 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -2,6 +2,7 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@@ -13,7 +14,7 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
-class ShuffleSuite extends FunSuite with BeforeAndAfter {
+class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
var sc: SparkContext = _
@@ -196,4 +197,46 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
+
+ test("map-side combine") {
+ sc = new SparkContext("local", "test")
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2)
+
+ // Test with map-side combine on.
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 8), (2, 1)))
+
+ // Turn off map-side combine and test the results.
+ val aggregator = new Aggregator[Int, Int, Int](
+ (v: Int) => v,
+ _+_,
+ _+_,
+ false)
+ val shuffledRdd = new ShuffledRDD(
+ pairs, aggregator, new HashPartitioner(2))
+ assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
+
+ // Turn map-side combine off and pass a wrong mergeCombine function. Should
+ // not see an exception because mergeCombine should not have been called.
+ val aggregatorWithException = new Aggregator[Int, Int, Int](
+ (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
+ val shuffledRdd1 = new ShuffledRDD(
+ pairs, aggregatorWithException, new HashPartitioner(2))
+ assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
+
+ // Now run the same mergeCombine function with map-side combine on. We
+ // expect to see an exception thrown.
+ val aggregatorWithException1 = new Aggregator[Int, Int, Int](
+ (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
+ val shuffledRdd2 = new ShuffledRDD(
+ pairs, aggregatorWithException1, new HashPartitioner(2))
+ evaluating { shuffledRdd2.collect() } should produce [SparkException]
+ }
+}
+
+object ShuffleSuite {
+ def mergeCombineException(x: Int, y: Int): Int = {
+ throw new SparkException("Exception for map-side combine.")
+ x + y
+ }
}