diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-07-10 10:33:11 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-07-10 10:33:11 -0700 |
commit | 24705d0f46ce536bf829660f4506dcffd9ff799a (patch) | |
tree | 6aaac7a09d4da3a236beec04c8d4239463bd1c6e | |
parent | 7dcda9ae74818f17b57448acab8bd1dc40d78c19 (diff) | |
download | spark-24705d0f46ce536bf829660f4506dcffd9ff799a.tar.gz spark-24705d0f46ce536bf829660f4506dcffd9ff799a.tar.bz2 spark-24705d0f46ce536bf829660f4506dcffd9ff799a.zip |
adding takeOrdered() to RDD
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 12 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 18 |
2 files changed, 30 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 106fb2960f..af52040fa6 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -782,6 +782,18 @@ abstract class RDD[T: ClassManifest]( } /** + * Returns the top K elements from this RDD as defined by + * the specified implicit Ordering[T] and maintains the + * ordering. + * @param num the number of top elements to return + * @param ord the implicit ordering for T + * @return an array of top elements + */ + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { + top(num)(ord.reverse).sorted(ord) + } + + /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index e41ae385c0..fe17d1d5e7 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(topK.sorted === Array("b", "a")) } + test("takeOrdered with predefined ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val rdd = sc.makeRDD(nums, 2) + val sortedTopK = rdd.takeOrdered(5) + assert(sortedTopK.size === 5) + assert(sortedTopK === Array(1, 2, 3, 4, 5)) + } + + test("takeOrdered with custom ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + implicit val ord = implicitly[Ordering[Int]].reverse + val rdd = sc.makeRDD(nums, 2) + val sortedTopK = rdd.takeOrdered(5) + assert(sortedTopK.size === 5) + assert(sortedTopK === Array(10, 9, 8, 7, 6)) + assert(sortedTopK === nums.sorted(ord).take(5)) + } + test("takeSample") { val data = sc.parallelize(1 to 100, 2) for (seed <- 1 to 5) { |