aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-20 11:06:06 -0700
committerReynold Xin <rxin@apache.org>2014-07-20 11:06:06 -0700
commitfa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429 (patch)
tree2c3ce13c4f7fd5e3ad1dbad6cd4201bdd0f47834 /core
parent98ab4112255d4e0fdb6e084bd3fe65807c5b209b (diff)
downloadspark-fa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429.tar.gz
spark-fa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429.tar.bz2
spark-fa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429.zip
[SPARK-2598] RangePartitioner's binary search does not use the given Ordering
We should fix this in branch-1.0 as well. Author: Reynold Xin <rxin@apache.org> Closes #1500 from rxin/rangePartitioner and squashes the following commits: c0a94f5 [Reynold Xin] [SPARK-2598] RangePartitioner's binary search does not use the given Ordering.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala (renamed from core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala)7
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala14
3 files changed, 20 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index ec99648a84..52c018baa5 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
- if (rangeBounds.length < 1000) {
- // If we have less than 100 partitions naive search
+ if (rangeBounds.length <= 128) {
+ // If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala
index e4c254b9dd..85da2842e8 100644
--- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
+++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala
@@ -19,11 +19,11 @@ package org.apache.spark.util
import java.util
-import scala.Array
-import scala.reflect._
+import scala.reflect.{classTag, ClassTag}
private[spark] object CollectionsUtils {
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
+ // For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator.
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
@@ -40,7 +40,8 @@ private[spark] object CollectionsUtils {
case ClassTag.Long =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
case _ =>
- (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x)
+ val comparator = implicitly[Ordering[K]].asInstanceOf[java.util.Comparator[Any]]
+ (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7c30626a0c..4658a08064 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -91,6 +91,17 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
}
}
+ test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
+ // Row does not extend Comparable, but has an implicit Ordering defined.
+ implicit object RowOrdering extends Ordering[Row] {
+ override def compare(x: Row, y: Row) = x.value - y.value
+ }
+
+ val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
+ val partitioner = new RangePartitioner(1500, rdd)
+ partitioner.getPartition(Row(100))
+ }
+
test("HashPartitioner not equal to RangePartitioner") {
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)
@@ -177,3 +188,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
// Add other tests here for classes that should be able to handle empty partitions correctly
}
}
+
+
+private sealed case class Row(value: Int)