aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorSyed Hashmi <shashmi@cloudera.com>2014-06-09 00:08:40 -0700
committerReynold Xin <rxin@apache.org>2014-06-09 00:08:40 -0700
commit6113ac1559d62c828dfbf08ef0f7f172c24cf7f5 (patch)
treefce848181264903ef869ef891819c15b122e56a9 /python
parent32ee9f0668e50083e415b0662b18f5d2581413f0 (diff)
downloadspark-6113ac1559d62c828dfbf08ef0f7f172c24cf7f5.tar.gz
spark-6113ac1559d62c828dfbf08ef0f7f172c24cf7f5.tar.bz2
spark-6113ac1559d62c828dfbf08ef0f7f172c24cf7f5.zip
[SPARK-1308] Add getNumPartitions to pyspark RDD
Add getNumPartitions to pyspark RDD to provide an intuitive way to get number of partitions in RDD like we can do in scala today. Author: Syed Hashmi <shashmi@cloudera.com> Closes #995 from syedhashmi/master and squashes the following commits: de0ed5e [Syed Hashmi] [SPARK-1308] Add getNumPartitions to pyspark RDD
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py45
1 files changed, 27 insertions, 18 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ca0a95578f..9c69c79236 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -250,7 +250,7 @@ class RDD(object):
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
-
+
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
@@ -312,6 +312,15 @@ class RDD(object):
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
return self.mapPartitionsWithIndex(f, preservesPartitioning)
+ def getNumPartitions(self):
+ """
+ Returns the number of partitions in RDD
+ >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
+ >>> rdd.getNumPartitions()
+ 2
+ """
+ return self._jrdd.splits().size()
+
def filter(self, f):
"""
Return a new RDD containing only the elements that satisfy a predicate.
@@ -413,9 +422,9 @@ class RDD(object):
def intersection(self, other):
"""
- Return the intersection of this RDD and another one. The output will not
+ Return the intersection of this RDD and another one. The output will not
contain any duplicate elements, even if the input RDDs did.
-
+
Note that this method performs a shuffle internally.
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
@@ -571,14 +580,14 @@ class RDD(object):
"""
Applies a function to each partition of this RDD.
- >>> def f(iterator):
- ... for x in iterator:
- ... print x
+ >>> def f(iterator):
+ ... for x in iterator:
+ ... print x
... yield None
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
self.mapPartitions(f).collect() # Force evaluation
-
+
def collect(self):
"""
Return a list that contains all of the elements in this RDD.
@@ -673,7 +682,7 @@ class RDD(object):
yield acc
return self.mapPartitions(func).fold(zeroValue, combOp)
-
+
def max(self):
"""
@@ -692,7 +701,7 @@ class RDD(object):
1.0
"""
return self.reduce(min)
-
+
def sum(self):
"""
Add up the elements in this RDD.
@@ -786,7 +795,7 @@ class RDD(object):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
-
+
def top(self, num):
"""
Get the top N elements from a RDD.
@@ -814,7 +823,7 @@ class RDD(object):
def takeOrdered(self, num, key=None):
"""
Get the N elements from a RDD ordered in ascending order or as specified
- by the optional key function.
+ by the optional key function.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
@@ -834,7 +843,7 @@ class RDD(object):
if key_ != None:
x = [i[1] for i in x]
return x
-
+
def merge(a, b):
return next(topNKeyedElems(a + b))
result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
@@ -1169,12 +1178,12 @@ class RDD(object):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
-
+
def foldByKey(self, zeroValue, func, numPartitions=None):
"""
Merge the values for each key using an associative function "func" and a neutral "zeroValue"
- which may be added to the result an arbitrary number of times, and must not change
- the result (e.g., 0 for addition, or 1 for multiplication.).
+ which may be added to the result an arbitrary number of times, and must not change
+ the result (e.g., 0 for addition, or 1 for multiplication.).
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
@@ -1182,8 +1191,8 @@ class RDD(object):
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
-
-
+
+
# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
@@ -1302,7 +1311,7 @@ class RDD(object):
def repartition(self, numPartitions):
"""
Return a new RDD that has exactly numPartitions partitions.
-
+
Can increase or decrease the level of parallelism in this RDD. Internally, this uses
a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using `coalesce`,