aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/spark/ShuffleSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/spark/ShuffleSuite.scala')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 3493b9511f..d6efa3db43 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -98,6 +98,19 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
+
+ test("reduceByKey with partitioner") {
+ sc = new SparkContext("local", "test")
+ val p = new Partitioner() {
+ def numPartitions = 2
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ val pairs = rddToPairRDDFunctions(sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1)))).partitionBy(p)
+ val sums = pairs.reduceByKey(p, _+_)
+ println(sums.toDebugString)
+ assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+ assert(sums.partitioner === Some(p))
+ }
test("join") {
sc = new SparkContext("local", "test")