diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2012-09-19 12:31:45 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2012-09-19 12:31:45 -0700 |
commit | 397d3816e1ee0351cd2814dc081f368fe45094c0 (patch) | |
tree | b88c400e65ca68478d27fd9b7671e56e76b04693 /core/src/test | |
parent | 9a449e00633d4028652e8ccc46486bb6ce886635 (diff) | |
download | spark-397d3816e1ee0351cd2814dc081f368fe45094c0.tar.gz spark-397d3816e1ee0351cd2814dc081f368fe45094c0.tar.bz2 spark-397d3816e1ee0351cd2814dc081f368fe45094c0.zip |
Separated ShuffledRDD into multiple classes: RepartitionShuffledRDD,
ShuffledSortedRDD, and ShuffledAggregatedRDD.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index f622c413f7..9d7e2591f1 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() sc = null } } - + test("groupByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val valuesFor2 = groups.find(_._1 == 2).get._2 assert(valuesFor2.toList.sorted === List(1)) } - + test("groupByKey with many output partitions") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { (4, (ArrayBuffer(), ArrayBuffer('w'))) )) } - + test("zero-partition RDD") { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() @@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { assert(file.splits.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } test("map-side combine") { @@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { _+_, _+_, false) - val shuffledRdd = new ShuffledRDD( + val shuffledRdd = new ShuffledAggregatedRDD( pairs, aggregator, new HashPartitioner(2)) assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1))) @@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // not see an exception because mergeCombine should not have been called. val aggregatorWithException = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - val shuffledRdd1 = new ShuffledRDD( + val shuffledRdd1 = new ShuffledAggregatedRDD( pairs, aggregatorWithException, new HashPartitioner(2)) assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) @@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // expect to see an exception thrown. val aggregatorWithException1 = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) - val shuffledRdd2 = new ShuffledRDD( + val shuffledRdd2 = new ShuffledAggregatedRDD( pairs, aggregatorWithException1, new HashPartitioner(2)) evaluating { shuffledRdd2.collect() } should produce [SparkException] } |