aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py17
1 files changed, 17 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 097a0a236b..e72f57d9d1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -326,6 +326,23 @@ class RDD(object):
return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
self.ctx.serializer)
+ def intersection(self, other):
+ """
+ Return the intersection of this RDD and another one. The output will not
+ contain any duplicate elements, even if the input RDDs did.
+
+ Note that this method performs a shuffle internally.
+
+ >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
+ >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
+ >>> rdd1.intersection(rdd2).collect()
+ [1, 2, 3]
+ """
+ return self.map(lambda v: (v, None)) \
+ .cogroup(other.map(lambda v: (v, None))) \
+ .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
+ .keys()
+
def _reserialize(self):
if self._jrdd_deserializer == self.ctx.serializer:
return self