diff options
author | Prabin Banka <prabin.banka@imaginea.com> | 2014-03-10 13:27:00 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-03-10 13:27:00 -0700 |
commit | e1e09e0ef6b18e034727403d81747d899b042219 (patch) | |
tree | c9bc6792ef9f25e4f78e32755ef8c84d9f82ffb0 /python/pyspark/rdd.py | |
parent | 5d98cfc1c8fb17fbbeacc7192ac21c0b038cbd16 (diff) | |
download | spark-e1e09e0ef6b18e034727403d81747d899b042219.tar.gz spark-e1e09e0ef6b18e034727403d81747d899b042219.tar.bz2 spark-e1e09e0ef6b18e034727403d81747d899b042219.zip |
SPARK-977 Added Python RDD.zip function
was raised earlier as a part of apache/incubator-spark#486
Author: Prabin Banka <prabin.banka@imaginea.com>
Closes #76 from prabinb/python-api-zip and squashes the following commits:
b1a31a0 [Prabin Banka] Added Python RDD.zip function
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e72f57d9d1..5ab27ff402 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -30,7 +30,7 @@ from threading import Thread import warnings from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -1081,6 +1081,24 @@ class RDD(object): jrdd = self._jrdd.coalesce(numPartitions) return RDD(jrdd, self.ctx, self._jrdd_deserializer) + def zip(self, other): + """ + Zips this RDD with another one, returning key-value pairs with the first element in each RDD + second element in each RDD, etc. Assumes that the two RDDs have the same number of + partitions and the same number of elements in each partition (e.g. one was made through + a map on the other). + + >>> x = sc.parallelize(range(0,5)) + >>> y = sc.parallelize(range(1000, 1005)) + >>> x.zip(y).collect() + [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] + """ + pairRDD = self._jrdd.zip(other._jrdd) + deserializer = PairDeserializer(self._jrdd_deserializer, + other._jrdd_deserializer) + return RDD(pairRDD, self.ctx, deserializer) + + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those |