aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-12 15:57:44 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-12 15:57:44 -0700
commitb8afe3052086547879ebf28d6e36207e0d370710 (patch)
tree0eda377f65e7f30f930713ba3e0923276c491321 /python
parent5d1ec64e7934ad7f922cdab516fa5de690644780 (diff)
downloadspark-b8afe3052086547879ebf28d6e36207e0d370710.tar.gz
spark-b8afe3052086547879ebf28d6e36207e0d370710.tar.bz2
spark-b8afe3052086547879ebf28d6e36207e0d370710.zip
SPARK-1162 Added top in python.
Author: Prashant Sharma <prashant.s@imaginea.com> Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits: ece1fa4 [Prashant Sharma] Added top in python.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py25
1 files changed, 25 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0f28dbd6fc..6d549b40e5 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -29,6 +29,7 @@ from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
import warnings
+from heapq import heappush, heappop, heappushpop
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
@@ -660,6 +661,30 @@ class RDD(object):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
+
+ def top(self, num):
+ """
+ Get the top N elements from a RDD.
+
+ Note: It returns the list sorted in ascending order.
+ >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
+ [12]
+ >>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
+ [5, 6]
+ """
+ def topIterator(iterator):
+ q = []
+ for k in iterator:
+ if len(q) < num:
+ heappush(q, k)
+ else:
+ heappushpop(q, k)
+ yield q
+
+ def merge(a, b):
+ return next(topIterator(a + b))
+
+ return sorted(self.mapPartitions(topIterator).reduce(merge))
def take(self, num):
"""