aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/PartitioningSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala64
1 files changed, 60 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 4658a08064..fc0cee3e87 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.collection.mutable.ArrayBuffer
import scala.math.abs
import org.scalatest.{FunSuite, PrivateMethodTester}
@@ -52,14 +53,12 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(p2 === p2)
assert(p4 === p4)
- assert(p2 != p4)
- assert(p4 != p2)
+ assert(p2 === p4)
assert(p4 === anotherP4)
assert(anotherP4 === p4)
assert(descendingP2 === descendingP2)
assert(descendingP4 === descendingP4)
- assert(descendingP2 != descendingP4)
- assert(descendingP4 != descendingP2)
+ assert(descendingP2 === descendingP4)
assert(p2 != descendingP2)
assert(p4 != descendingP4)
assert(descendingP2 != p2)
@@ -102,6 +101,63 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
partitioner.getPartition(Row(100))
}
+ test("RangPartitioner.sketch") {
+ val rdd = sc.makeRDD(0 until 20, 20).flatMap { i =>
+ val random = new java.util.Random(i)
+ Iterator.fill(i)(random.nextDouble())
+ }.cache()
+ val sampleSizePerPartition = 10
+ val (count, sketched) = RangePartitioner.sketch(rdd, sampleSizePerPartition)
+ assert(count === rdd.count())
+ sketched.foreach { case (idx, n, sample) =>
+ assert(n === idx)
+ assert(sample.size === math.min(n, sampleSizePerPartition))
+ }
+ }
+
+ test("RangePartitioner.determineBounds") {
+ assert(RangePartitioner.determineBounds(ArrayBuffer.empty[(Int, Float)], 10).isEmpty,
+ "Bounds on an empty candidates set should be empty.")
+ val candidates = ArrayBuffer(
+ (0.7, 2.0f), (0.1, 1.0f), (0.4, 1.0f), (0.3, 1.0f), (0.2, 1.0f), (0.5, 1.0f), (1.0, 3.0f))
+ assert(RangePartitioner.determineBounds(candidates, 3) === Array(0.4, 0.7))
+ }
+
+ test("RangePartitioner should run only one job if data is roughly balanced") {
+ val rdd = sc.makeRDD(0 until 20, 20).flatMap { i =>
+ val random = new java.util.Random(i)
+ Iterator.fill(5000 * i)((random.nextDouble() + i, i))
+ }.cache()
+ for (numPartitions <- Seq(10, 20, 40)) {
+ val partitioner = new RangePartitioner(numPartitions, rdd)
+ assert(partitioner.numPartitions === numPartitions)
+ val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values
+ assert(counts.max < 3.0 * counts.min)
+ }
+ }
+
+ test("RangePartitioner should work well on unbalanced data") {
+ val rdd = sc.makeRDD(0 until 20, 20).flatMap { i =>
+ val random = new java.util.Random(i)
+ Iterator.fill(20 * i * i * i)((random.nextDouble() + i, i))
+ }.cache()
+ for (numPartitions <- Seq(2, 4, 8)) {
+ val partitioner = new RangePartitioner(numPartitions, rdd)
+ assert(partitioner.numPartitions === numPartitions)
+ val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values
+ assert(counts.max < 3.0 * counts.min)
+ }
+ }
+
+ test("RangePartitioner should return a single partition for empty RDDs") {
+ val empty1 = sc.emptyRDD[(Int, Double)]
+ val partitioner1 = new RangePartitioner(0, empty1)
+ assert(partitioner1.numPartitions === 1)
+ val empty2 = sc.makeRDD(0 until 2, 2).flatMap(i => Seq.empty[(Int, Double)])
+ val partitioner2 = new RangePartitioner(2, empty2)
+ assert(partitioner2.numPartitions === 1)
+ }
+
test("HashPartitioner not equal to RangePartitioner") {
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)