diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-15 00:53:41 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-15 00:53:41 -0700 |
commit | f961aac8b28dd537efe7a2b245794a6bd310e497 (patch) | |
tree | 60e414137fdcccab26e061d90649385e817b87da | |
parent | 6602d9497332457633905677b08093e98983c3e4 (diff) | |
parent | e8801d44900153eae6412963d2f3e2f19bfdc4e9 (diff) | |
download | spark-f961aac8b28dd537efe7a2b245794a6bd310e497.tar.gz spark-f961aac8b28dd537efe7a2b245794a6bd310e497.tar.bz2 spark-f961aac8b28dd537efe7a2b245794a6bd310e497.zip |
Merge pull request #649 from ryanlecompte/master
Add top K method to RDD using a bounded priority queue
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 19 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/BoundedPriorityQueue.scala | 45 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 19 |
5 files changed, 109 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index e5995bea22..7cb9bfebeb 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -36,6 +36,7 @@ import spark.rdd.ZippedPartitionsRDD2 import spark.rdd.ZippedPartitionsRDD3 import spark.rdd.ZippedPartitionsRDD4 import spark.storage.StorageLevel +import spark.util.BoundedPriorityQueue import SparkContext._ @@ -724,6 +725,24 @@ abstract class RDD[T: ClassManifest]( } /** + * Returns the top K elements from this RDD as defined by + * the specified implicit Ordering[T]. + * @param num the number of top elements to return + * @param ord the implicit ordering for T + * @return an array of top elements + */ + def top(num: Int)(implicit ord: Ordering[T]): Array[T] = { + mapPartitions { items => + val queue = new BoundedPriorityQueue[T](num) + queue ++= items + Iterator.single(queue) + }.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray + } + + /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index eb81ed64cd..626b499454 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -86,7 +86,6 @@ JavaRDDLike[T, JavaRDD[T]] { */ def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - } object JavaRDD { diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 76b14e2e04..b555f2030a 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -1,6 +1,6 @@ package spark.api.java -import java.util.{List => JList} +import java.util.{List => JList, Comparator} import scala.Tuple2 import scala.collection.JavaConversions._ @@ -359,4 +359,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def toDebugString(): String = { rdd.toDebugString } + + /** + * Returns the top K elements from this RDD as defined by + * the specified Comparator[T]. + * @param num the number of top elements to return + * @param comp the comparator that defines the order + * @return an array of top elements + */ + def top(num: Int, comp: Comparator[T]): JList[T] = { + import scala.collection.JavaConversions._ + val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp)) + val arr: java.util.Collection[T] = topElems.toSeq + new java.util.ArrayList(arr) + } + + /** + * Returns the top K elements from this RDD using the + * natural ordering for T. + * @param num the number of top elements to return + * @return an array of top elements + */ + def top(num: Int): JList[T] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] + top(num, comp) + } } diff --git a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala new file mode 100644 index 0000000000..4bc5db8bb7 --- /dev/null +++ b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala @@ -0,0 +1,45 @@ +package spark.util + +import java.io.Serializable +import java.util.{PriorityQueue => JPriorityQueue} +import scala.collection.generic.Growable +import scala.collection.JavaConverters._ + +/** + * Bounded priority queue. This class wraps the original PriorityQueue + * class and modifies it such that only the top K elements are retained. + * The top K elements are defined by an implicit Ordering[A]. + */ +class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) + extends Iterable[A] with Growable[A] with Serializable { + + private val underlying = new JPriorityQueue[A](maxSize, ord) + + override def iterator: Iterator[A] = underlying.iterator.asScala + + override def ++=(xs: TraversableOnce[A]): this.type = { + xs.foreach { this += _ } + this + } + + override def +=(elem: A): this.type = { + if (size < maxSize) underlying.offer(elem) + else maybeReplaceLowest(elem) + this + } + + override def +=(elem1: A, elem2: A, elems: A*): this.type = { + this += elem1 += elem2 ++= elems + } + + override def clear() { underlying.clear() } + + private def maybeReplaceLowest(a: A): Boolean = { + val head = underlying.peek() + if (head != null && ord.gt(a, head)) { + underlying.poll() + underlying.offer(a) + } else false + } +} + diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 3f69e99780..67f3332d44 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -317,4 +317,23 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(sample.size === checkSample.size) for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) } + + test("top with predefined ordering") { + sc = new SparkContext("local", "test") + val nums = Array.range(1, 100000) + val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) + val topK = ints.top(5) + assert(topK.size === 5) + assert(topK.sorted === nums.sorted.takeRight(5)) + } + + test("top with custom ordering") { + sc = new SparkContext("local", "test") + val words = Vector("a", "b", "c", "d") + implicit val ord = implicitly[Ordering[String]].reverse + val rdd = sc.makeRDD(words, 2) + val topK = rdd.top(2) + assert(topK.size === 2) + assert(topK.sorted === Array("b", "a")) + } } |