aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-03-26 00:09:44 -0700
committerReynold Xin <rxin@apache.org>2014-03-26 00:09:44 -0700
commitb859853ba47b6323af0e31a4e2099e943221e1b1 (patch)
tree50ab4dd5357e772354c7f3d2dd5f19c47c79a630 /core
parent4f7d547b85ed89ba4706e05d7d0984f16749120e (diff)
downloadspark-b859853ba47b6323af0e31a4e2099e943221e1b1.tar.gz
spark-b859853ba47b6323af0e31a4e2099e943221e1b1.tar.bz2
spark-b859853ba47b6323af0e31a4e2099e943221e1b1.zip
SPARK-1321 Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation
Also updated the documentation for top and takeOrdered. On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X). Author: Reynold Xin <rxin@apache.org> Closes #229 from rxin/takeOrdered and squashes the following commits: 0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Utils.scala39
2 files changed, 72 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 4f9d39f865..6af42248a5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
}
/**
- * Returns the top K elements from this RDD as defined by
- * the specified implicit Ordering[T].
+ * Returns the top K (largest) elements from this RDD as defined by the specified
+ * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
+ * {{{
+ * sc.parallelize([10, 4, 2, 12, 3]).top(1)
+ * // returns [12]
+ *
+ * sc.parallelize([2, 3, 4, 5, 6]).top(2)
+ * // returns [6, 5]
+ * }}}
+ *
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- val queue = new BoundedPriorityQueue[T](num)
- queue ++= items
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord.reverse)
- }
+ def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
/**
- * Returns the first K elements from this RDD as defined by
- * the specified implicit Ordering[T] and maintains the
- * ordering.
+ * Returns the first K (smallest) elements from this RDD as defined by the specified
+ * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
+ * For example:
+ * {{{
+ * sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
+ * // returns [12]
+ *
+ * sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
+ * // returns [2, 3]
+ * }}}
+ *
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+ mapPartitions { items =>
+ // Priority keeps the largest elements, so let's reverse the ordering.
+ val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+ queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
new file mode 100644
index 0000000000..c5268c0fae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
+
+import com.google.common.collect.{Ordering => GuavaOrdering}
+
+/**
+ * Utility functions for collections.
+ */
+private[spark] object Utils {
+
+ /**
+ * Returns the first K elements from the input as defined by the specified implicit Ordering[T]
+ * and maintains the ordering.
+ */
+ def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
+ val ordering = new GuavaOrdering[T] {
+ override def compare(l: T, r: T) = ord.compare(l, r)
+ }
+ collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
+ }
+}