diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-03-12 15:57:44 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-03-12 15:57:44 -0700 |
commit | b8afe3052086547879ebf28d6e36207e0d370710 (patch) | |
tree | 0eda377f65e7f30f930713ba3e0923276c491321 /python/pyspark/rdd.py | |
parent | 5d1ec64e7934ad7f922cdab516fa5de690644780 (diff) | |
download | spark-b8afe3052086547879ebf28d6e36207e0d370710.tar.gz spark-b8afe3052086547879ebf28d6e36207e0d370710.tar.bz2 spark-b8afe3052086547879ebf28d6e36207e0d370710.zip |
SPARK-1162 Added top in python.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits:
ece1fa4 [Prashant Sharma] Added top in python.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0f28dbd6fc..6d549b40e5 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -29,6 +29,7 @@ from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread import warnings +from heapq import heappush, heappop, heappushpop from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long @@ -660,6 +661,30 @@ class RDD(object): m1[k] += v return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) + + def top(self, num): + """ + Get the top N elements from a RDD. + + Note: It returns the list sorted in ascending order. + >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) + [12] + >>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2) + [5, 6] + """ + def topIterator(iterator): + q = [] + for k in iterator: + if len(q) < num: + heappush(q, k) + else: + heappushpop(q, k) + yield q + + def merge(a, b): + return next(topIterator(a + b)) + + return sorted(self.mapPartitions(topIterator).reduce(merge)) def take(self, num): """ |