aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/spark/ShuffleSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/spark/ShuffleSuite.scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala39
1 files changed, 2 insertions, 37 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 7f8ec5d48f..8170100f1d 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -12,8 +12,8 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
-import spark.rdd.ShuffledAggregatedRDD
-import SparkContext._
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
@@ -216,41 +216,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
-
- test("map-side combine") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2)
-
- // Test with map-side combine on.
- val sums = pairs.reduceByKey(_+_).collect()
- assert(sums.toSet === Set((1, 8), (2, 1)))
-
- // Turn off map-side combine and test the results.
- val aggregator = new Aggregator[Int, Int, Int](
- (v: Int) => v,
- _+_,
- _+_,
- false)
- val shuffledRdd = new ShuffledAggregatedRDD(
- pairs, aggregator, new HashPartitioner(2))
- assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
-
- // Turn map-side combine off and pass a wrong mergeCombine function. Should
- // not see an exception because mergeCombine should not have been called.
- val aggregatorWithException = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException, new HashPartitioner(2))
- assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
-
- // Now run the same mergeCombine function with map-side combine on. We
- // expect to see an exception thrown.
- val aggregatorWithException1 = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException1, new HashPartitioner(2))
- evaluating { shuffledRdd2.collect() } should produce [SparkException]
- }
}
object ShuffleSuite {