aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /python/pyspark/rdd.py
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py25
1 files changed, 23 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ef233bc80..680140d72d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
- python_right_outer_join, python_cogroup
+ python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ class RDD(object):
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in other have key k.
+ (k, (v, None)) if no elements in C{other} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1403,6 +1403,27 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numPartitions)
+ def fullOuterJoin(self, other, numPartitions=None):
+ """
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in C{other} have key k.
+
+ Similarly, for each element (k, w) in C{other}, the resulting RDD will
+ either contain all pairs (k, (v, w)) for v in C{self}, or the pair
+ (k, (None, w)) if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4)])
+ >>> y = sc.parallelize([("a", 2), ("c", 8)])
+ >>> sorted(x.fullOuterJoin(y).collect())
+ [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
+ """
+ return python_full_outer_join(self, other, numPartitions)
+
# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.