aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/spark/ShuffleSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/spark/ShuffleSuite.scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index f622c413f7..9d7e2591f1 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
sc = null
}
}
-
+
test("groupByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
-
+
test("groupByKey with many output partitions") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
}
-
+
test("zero-partition RDD") {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
@@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
test("map-side combine") {
@@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
_+_,
_+_,
false)
- val shuffledRdd = new ShuffledRDD(
+ val shuffledRdd = new ShuffledAggregatedRDD(
pairs, aggregator, new HashPartitioner(2))
assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
@@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// not see an exception because mergeCombine should not have been called.
val aggregatorWithException = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledRDD(
+ val shuffledRdd1 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException, new HashPartitioner(2))
assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
@@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// expect to see an exception thrown.
val aggregatorWithException1 = new Aggregator[Int, Int, Int](
(v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledRDD(
+ val shuffledRdd2 = new ShuffledAggregatedRDD(
pairs, aggregatorWithException1, new HashPartitioner(2))
evaluating { shuffledRdd2.collect() } should produce [SparkException]
}