aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:23:36 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:23:36 -0700
commit90fc3f30cdc0c08e03ca60703e640d902a26583e (patch)
treee26340784875bd7e5aa294516c93f06e8574a8fd /core/src/main
parentcd7259b4b8d8abbff6db963fd8f84d4bd0b3737b (diff)
parenta1662326e9b1486361eba2f2caf903875fbba597 (diff)
downloadspark-90fc3f30cdc0c08e03ca60703e640d902a26583e.tar.gz
spark-90fc3f30cdc0c08e03ca60703e640d902a26583e.tar.bz2
spark-90fc3f30cdc0c08e03ca60703e640d902a26583e.zip
Merge pull request #692 from Reinvigorate/takeOrdered
adding takeOrdered() to RDD
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala12
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala25
2 files changed, 36 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 106fb2960f..8aa77266bc 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -778,10 +778,20 @@ abstract class RDD[T: ClassManifest](
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
- }.toArray
+ }.toArray.sorted(ord.reverse)
}
/**
+ * Returns the first K elements from this RDD as defined by
+ * the specified implicit Ordering[T] and maintains the
+ * ordering.
+ * @param num the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index b555f2030a..27f40ecdfd 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -384,4 +384,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
top(num, comp)
}
+
+ /**
+ * Returns the first K elements from this RDD as defined by
+ * the specified Comparator[T] and maintains the order.
+ * @param num the number of top elements to return
+ * @param comp the comparator that defines the order
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
+ import scala.collection.JavaConversions._
+ val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
+ val arr: java.util.Collection[T] = topElems.toSeq
+ new java.util.ArrayList(arr)
+ }
+
+ /**
+ * Returns the first K elements from this RDD using the
+ * natural ordering for T while maintain the order.
+ * @param num the number of top elements to return
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int): JList[T] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+ takeOrdered(num, comp)
+ }
}