diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-02-06 14:58:35 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-02-06 14:58:35 -0800 |
commit | 084839ba357e03bb56517620123682b50a91cb0b (patch) | |
tree | f2c98d03c5e6492812e04df474228e7b06de55e5 /python | |
parent | 79c95527a77af32bd83a968c1a56feb22e441b7d (diff) | |
download | spark-084839ba357e03bb56517620123682b50a91cb0b.tar.gz spark-084839ba357e03bb56517620123682b50a91cb0b.tar.bz2 spark-084839ba357e03bb56517620123682b50a91cb0b.zip |
Merge pull request #498 from ScrapCodes/python-api. Closes #498.
Python api additions
Author: Prashant Sharma <prashant.s@imaginea.com>
== Merge branch commits ==
commit 8b51591f1a7a79a62c13ee66ff8d83040f7eccd8
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Fri Jan 24 11:50:29 2014 +0530
Josh's and Patricks review comments.
commit d37f9677838e43bef6c18ef61fbf08055ba6d1ca
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Thu Jan 23 17:27:17 2014 +0530
fixed doc tests
commit 27cb54bf5c99b1ea38a73858c291d0a1c43d8b7c
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Thu Jan 23 16:48:43 2014 +0530
Added keys and values methods for PairFunctions in python
commit 4ce76b396fbaefef2386d7a36d611572bdef9b5d
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Thu Jan 23 13:51:26 2014 +0530
Added foreachPartition
commit 05f05341a187cba829ac0e6c2bdf30be49948c89
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Thu Jan 23 13:02:59 2014 +0530
Added coalesce fucntion to python API
commit 6568d2c2fa14845dc56322c0f39ba2e13b3b26dd
Author: Prashant Sharma <prashant.s@imaginea.com>
Date: Thu Jan 23 12:52:44 2014 +0530
added repartition function to python API.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1ad4b52987..90f93a1926 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -455,6 +455,18 @@ class RDD(object): yield None self.mapPartitions(processPartition).collect() # Force evaluation + def foreachPartition(self, f): + """ + Applies a function to each partition of this RDD. + + >>> def f(iterator): + ... for x in iterator: + ... print x + ... yield None + >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) + """ + self.mapPartitions(f).collect() # Force evaluation + def collect(self): """ Return a list that contains all of the elements in this RDD. @@ -695,6 +707,24 @@ class RDD(object): """ return dict(self.collect()) + def keys(self): + """ + Return an RDD with the keys of each tuple. + >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() + >>> m.collect() + [1, 3] + """ + return self.map(lambda (k, v): k) + + def values(self): + """ + Return an RDD with the values of each tuple. + >>> m = sc.parallelize([(1, 2), (3, 4)]).values() + >>> m.collect() + [2, 4] + """ + return self.map(lambda (k, v): v) + def reduceByKey(self, func, numPartitions=None): """ Merge the values for each key using an associative reduce function. @@ -987,6 +1017,36 @@ class RDD(object): """ return self.map(lambda x: (f(x), x)) + def repartition(self, numPartitions): + """ + Return a new RDD that has exactly numPartitions partitions. + + Can increase or decrease the level of parallelism in this RDD. Internally, this uses + a shuffle to redistribute data. + If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + which can avoid performing a shuffle. + >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) + >>> sorted(rdd.glom().collect()) + [[1], [2, 3], [4, 5], [6, 7]] + >>> len(rdd.repartition(2).glom().collect()) + 2 + >>> len(rdd.repartition(10).glom().collect()) + 10 + """ + jrdd = self._jrdd.repartition(numPartitions) + return RDD(jrdd, self.ctx, self._jrdd_deserializer) + + def coalesce(self, numPartitions, shuffle=False): + """ + Return a new RDD that is reduced into `numPartitions` partitions. + >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() + [[1], [2, 3], [4, 5]] + >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() + [[1, 2, 3, 4, 5]] + """ + jrdd = self._jrdd.coalesce(numPartitions) + return RDD(jrdd, self.ctx, self._jrdd_deserializer) + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those |