From fa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 20 Jul 2014 11:06:06 -0700 Subject: [SPARK-2598] RangePartitioner's binary search does not use the given Ordering We should fix this in branch-1.0 as well. Author: Reynold Xin Closes #1500 from rxin/rangePartitioner and squashes the following commits: c0a94f5 [Reynold Xin] [SPARK-2598] RangePartitioner's binary search does not use the given Ordering. --- .../main/scala/org/apache/spark/Partitioner.scala | 4 +- .../org/apache/spark/util/CollectionsUtil.scala | 46 --------------------- .../org/apache/spark/util/CollectionsUtils.scala | 47 ++++++++++++++++++++++ .../scala/org/apache/spark/PartitioningSuite.scala | 14 +++++++ 4 files changed, 63 insertions(+), 48 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala create mode 100644 core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ec99648a84..52c018baa5 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 - if (rangeBounds.length < 1000) { - // If we have less than 100 partitions naive search + if (rangeBounds.length <= 128) { + // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala deleted file mode 100644 index e4c254b9dd..0000000000 --- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.util - -import scala.Array -import scala.reflect._ - -private[spark] object CollectionsUtils { - def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = { - classTag[K] match { - case ClassTag.Float => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float]) - case ClassTag.Double => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double]) - case ClassTag.Byte => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte]) - case ClassTag.Char => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char]) - case ClassTag.Short => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short]) - case ClassTag.Int => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int]) - case ClassTag.Long => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long]) - case _ => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala new file mode 100644 index 0000000000..85da2842e8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util + +import scala.reflect.{classTag, ClassTag} + +private[spark] object CollectionsUtils { + def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = { + // For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator. + classTag[K] match { + case ClassTag.Float => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float]) + case ClassTag.Double => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double]) + case ClassTag.Byte => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte]) + case ClassTag.Char => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char]) + case ClassTag.Short => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short]) + case ClassTag.Int => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int]) + case ClassTag.Long => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long]) + case _ => + val comparator = implicitly[Ordering[K]].asInstanceOf[java.util.Comparator[Any]] + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7c30626a0c..4658a08064 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -91,6 +91,17 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet } } + test("RangePartitioner for keys that are not Comparable (but with Ordering)") { + // Row does not extend Comparable, but has an implicit Ordering defined. + implicit object RowOrdering extends Ordering[Row] { + override def compare(x: Row, y: Row) = x.value - y.value + } + + val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x))) + val partitioner = new RangePartitioner(1500, rdd) + partitioner.getPartition(Row(100)) + } + test("HashPartitioner not equal to RangePartitioner") { val rdd = sc.parallelize(1 to 10).map(x => (x, x)) val rangeP2 = new RangePartitioner(2, rdd) @@ -177,3 +188,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet // Add other tests here for classes that should be able to handle empty partitions correctly } } + + +private sealed case class Row(value: Int) -- cgit v1.2.3