From 6e730edcde7ca6cbb5727dff7a42f7284b368528 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 7 Mar 2014 18:48:07 -0800 Subject: Spark 1165 rdd.intersection in python and java Author: Prashant Sharma Author: Prashant Sharma Closes #80 from ScrapCodes/SPARK-1165/RDD.intersection and squashes the following commits: 9b015e9 [Prashant Sharma] Added a note, shuffle is required for intersection. 1fea813 [Prashant Sharma] correct the lines wrapping d0c71f3 [Prashant Sharma] SPARK-1165 RDD.intersection in java d6effee [Prashant Sharma] SPARK-1165 Implemented RDD.intersection in python. --- python/pyspark/rdd.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 097a0a236b..e72f57d9d1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -326,6 +326,23 @@ class RDD(object): return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, self.ctx.serializer) + def intersection(self, other): + """ + Return the intersection of this RDD and another one. The output will not + contain any duplicate elements, even if the input RDDs did. + + Note that this method performs a shuffle internally. + + >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) + >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) + >>> rdd1.intersection(rdd2).collect() + [1, 2, 3] + """ + return self.map(lambda v: (v, None)) \ + .cogroup(other.map(lambda v: (v, None))) \ + .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ + .keys() + def _reserialize(self): if self._jrdd_deserializer == self.ctx.serializer: return self -- cgit v1.2.3