diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 097a0a236b..e72f57d9d1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -326,6 +326,23 @@ class RDD(object): return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, self.ctx.serializer) + def intersection(self, other): + """ + Return the intersection of this RDD and another one. The output will not + contain any duplicate elements, even if the input RDDs did. + + Note that this method performs a shuffle internally. + + >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) + >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) + >>> rdd1.intersection(rdd2).collect() + [1, 2, 3] + """ + return self.map(lambda v: (v, None)) \ + .cogroup(other.map(lambda v: (v, None))) \ + .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ + .keys() + def _reserialize(self): if self._jrdd_deserializer == self.ctx.serializer: return self |