aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-02-06 14:58:35 -0800
committerJosh Rosen <joshrosen@apache.org>2014-02-06 14:58:35 -0800
commit084839ba357e03bb56517620123682b50a91cb0b (patch)
treef2c98d03c5e6492812e04df474228e7b06de55e5 /python
parent79c95527a77af32bd83a968c1a56feb22e441b7d (diff)
downloadspark-084839ba357e03bb56517620123682b50a91cb0b.tar.gz
spark-084839ba357e03bb56517620123682b50a91cb0b.tar.bz2
spark-084839ba357e03bb56517620123682b50a91cb0b.zip
Merge pull request #498 from ScrapCodes/python-api. Closes #498.
Python api additions Author: Prashant Sharma <prashant.s@imaginea.com> == Merge branch commits == commit 8b51591f1a7a79a62c13ee66ff8d83040f7eccd8 Author: Prashant Sharma <prashant.s@imaginea.com> Date: Fri Jan 24 11:50:29 2014 +0530 Josh's and Patricks review comments. commit d37f9677838e43bef6c18ef61fbf08055ba6d1ca Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 17:27:17 2014 +0530 fixed doc tests commit 27cb54bf5c99b1ea38a73858c291d0a1c43d8b7c Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 16:48:43 2014 +0530 Added keys and values methods for PairFunctions in python commit 4ce76b396fbaefef2386d7a36d611572bdef9b5d Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 13:51:26 2014 +0530 Added foreachPartition commit 05f05341a187cba829ac0e6c2bdf30be49948c89 Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 13:02:59 2014 +0530 Added coalesce fucntion to python API commit 6568d2c2fa14845dc56322c0f39ba2e13b3b26dd Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 12:52:44 2014 +0530 added repartition function to python API.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py60
1 files changed, 60 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1ad4b52987..90f93a1926 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -455,6 +455,18 @@ class RDD(object):
yield None
self.mapPartitions(processPartition).collect() # Force evaluation
+ def foreachPartition(self, f):
+ """
+ Applies a function to each partition of this RDD.
+
+ >>> def f(iterator):
+ ... for x in iterator:
+ ... print x
+ ... yield None
+ >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
+ """
+ self.mapPartitions(f).collect() # Force evaluation
+
def collect(self):
"""
Return a list that contains all of the elements in this RDD.
@@ -695,6 +707,24 @@ class RDD(object):
"""
return dict(self.collect())
+ def keys(self):
+ """
+ Return an RDD with the keys of each tuple.
+ >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
+ >>> m.collect()
+ [1, 3]
+ """
+ return self.map(lambda (k, v): k)
+
+ def values(self):
+ """
+ Return an RDD with the values of each tuple.
+ >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
+ >>> m.collect()
+ [2, 4]
+ """
+ return self.map(lambda (k, v): v)
+
def reduceByKey(self, func, numPartitions=None):
"""
Merge the values for each key using an associative reduce function.
@@ -987,6 +1017,36 @@ class RDD(object):
"""
return self.map(lambda x: (f(x), x))
+ def repartition(self, numPartitions):
+ """
+ Return a new RDD that has exactly numPartitions partitions.
+
+ Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ a shuffle to redistribute data.
+ If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ which can avoid performing a shuffle.
+ >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
+ >>> sorted(rdd.glom().collect())
+ [[1], [2, 3], [4, 5], [6, 7]]
+ >>> len(rdd.repartition(2).glom().collect())
+ 2
+ >>> len(rdd.repartition(10).glom().collect())
+ 10
+ """
+ jrdd = self._jrdd.repartition(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+ def coalesce(self, numPartitions, shuffle=False):
+ """
+ Return a new RDD that is reduced into `numPartitions` partitions.
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
+ [[1], [2, 3], [4, 5]]
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
+ [[1, 2, 3, 4, 5]]
+ """
+ jrdd = self._jrdd.coalesce(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those