aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-07 18:48:07 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-03-07 18:48:07 -0800
commit6e730edcde7ca6cbb5727dff7a42f7284b368528 (patch)
tree191f237f11ef141f03084b785fff653169c6c601 /python
parentb7cd9e992cbc2e649534a2cdf9b8bde2c1ee26bd (diff)
downloadspark-6e730edcde7ca6cbb5727dff7a42f7284b368528.tar.gz
spark-6e730edcde7ca6cbb5727dff7a42f7284b368528.tar.bz2
spark-6e730edcde7ca6cbb5727dff7a42f7284b368528.zip
Spark 1165 rdd.intersection in python and java
Author: Prashant Sharma <prashant.s@imaginea.com> Author: Prashant Sharma <scrapcodes@gmail.com> Closes #80 from ScrapCodes/SPARK-1165/RDD.intersection and squashes the following commits: 9b015e9 [Prashant Sharma] Added a note, shuffle is required for intersection. 1fea813 [Prashant Sharma] correct the lines wrapping d0c71f3 [Prashant Sharma] SPARK-1165 RDD.intersection in java d6effee [Prashant Sharma] SPARK-1165 Implemented RDD.intersection in python.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py17
1 files changed, 17 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 097a0a236b..e72f57d9d1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -326,6 +326,23 @@ class RDD(object):
return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
self.ctx.serializer)
+ def intersection(self, other):
+ """
+ Return the intersection of this RDD and another one. The output will not
+ contain any duplicate elements, even if the input RDDs did.
+
+ Note that this method performs a shuffle internally.
+
+ >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
+ >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
+ >>> rdd1.intersection(rdd2).collect()
+ [1, 2, 3]
+ """
+ return self.map(lambda v: (v, None)) \
+ .cogroup(other.map(lambda v: (v, None))) \
+ .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
+ .keys()
+
def _reserialize(self):
if self._jrdd_deserializer == self.ctx.serializer:
return self