aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-15 00:53:41 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-15 00:53:41 -0700
commitf961aac8b28dd537efe7a2b245794a6bd310e497 (patch)
tree60e414137fdcccab26e061d90649385e817b87da /core
parent6602d9497332457633905677b08093e98983c3e4 (diff)
parente8801d44900153eae6412963d2f3e2f19bfdc4e9 (diff)
downloadspark-f961aac8b28dd537efe7a2b245794a6bd310e497.tar.gz
spark-f961aac8b28dd537efe7a2b245794a6bd310e497.tar.bz2
spark-f961aac8b28dd537efe7a2b245794a6bd310e497.zip
Merge pull request #649 from ryanlecompte/master
Add top K method to RDD using a bounded priority queue
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala19
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala1
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala27
-rw-r--r--core/src/main/scala/spark/util/BoundedPriorityQueue.scala45
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala19
5 files changed, 109 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index e5995bea22..7cb9bfebeb 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -36,6 +36,7 @@ import spark.rdd.ZippedPartitionsRDD2
import spark.rdd.ZippedPartitionsRDD3
import spark.rdd.ZippedPartitionsRDD4
import spark.storage.StorageLevel
+import spark.util.BoundedPriorityQueue
import SparkContext._
@@ -724,6 +725,24 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Returns the top K elements from this RDD as defined by
+ * the specified implicit Ordering[T].
+ * @param num the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an array of top elements
+ */
+ def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+ mapPartitions { items =>
+ val queue = new BoundedPriorityQueue[T](num)
+ queue ++= items
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray
+ }
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index eb81ed64cd..626b499454 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -86,7 +86,6 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
-
}
object JavaRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 76b14e2e04..b555f2030a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -1,6 +1,6 @@
package spark.api.java
-import java.util.{List => JList}
+import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
@@ -359,4 +359,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def toDebugString(): String = {
rdd.toDebugString
}
+
+ /**
+ * Returns the top K elements from this RDD as defined by
+ * the specified Comparator[T].
+ * @param num the number of top elements to return
+ * @param comp the comparator that defines the order
+ * @return an array of top elements
+ */
+ def top(num: Int, comp: Comparator[T]): JList[T] = {
+ import scala.collection.JavaConversions._
+ val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
+ val arr: java.util.Collection[T] = topElems.toSeq
+ new java.util.ArrayList(arr)
+ }
+
+ /**
+ * Returns the top K elements from this RDD using the
+ * natural ordering for T.
+ * @param num the number of top elements to return
+ * @return an array of top elements
+ */
+ def top(num: Int): JList[T] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+ top(num, comp)
+ }
}
diff --git a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
new file mode 100644
index 0000000000..4bc5db8bb7
--- /dev/null
+++ b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
@@ -0,0 +1,45 @@
+package spark.util
+
+import java.io.Serializable
+import java.util.{PriorityQueue => JPriorityQueue}
+import scala.collection.generic.Growable
+import scala.collection.JavaConverters._
+
+/**
+ * Bounded priority queue. This class wraps the original PriorityQueue
+ * class and modifies it such that only the top K elements are retained.
+ * The top K elements are defined by an implicit Ordering[A].
+ */
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+ extends Iterable[A] with Growable[A] with Serializable {
+
+ private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+ override def iterator: Iterator[A] = underlying.iterator.asScala
+
+ override def ++=(xs: TraversableOnce[A]): this.type = {
+ xs.foreach { this += _ }
+ this
+ }
+
+ override def +=(elem: A): this.type = {
+ if (size < maxSize) underlying.offer(elem)
+ else maybeReplaceLowest(elem)
+ this
+ }
+
+ override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+ this += elem1 += elem2 ++= elems
+ }
+
+ override def clear() { underlying.clear() }
+
+ private def maybeReplaceLowest(a: A): Boolean = {
+ val head = underlying.peek()
+ if (head != null && ord.gt(a, head)) {
+ underlying.poll()
+ underlying.offer(a)
+ } else false
+ }
+}
+
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 3f69e99780..67f3332d44 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -317,4 +317,23 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(sample.size === checkSample.size)
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}
+
+ test("top with predefined ordering") {
+ sc = new SparkContext("local", "test")
+ val nums = Array.range(1, 100000)
+ val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
+ val topK = ints.top(5)
+ assert(topK.size === 5)
+ assert(topK.sorted === nums.sorted.takeRight(5))
+ }
+
+ test("top with custom ordering") {
+ sc = new SparkContext("local", "test")
+ val words = Vector("a", "b", "c", "d")
+ implicit val ord = implicitly[Ordering[String]].reverse
+ val rdd = sc.makeRDD(words, 2)
+ val topK = rdd.top(2)
+ assert(topK.size === 2)
+ assert(topK.sorted === Array("b", "a"))
+ }
}