aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/join.py16
-rw-r--r--python/pyspark/rdd.py25
2 files changed, 39 insertions, 2 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b0f1cc1927..b4a8447137 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions):
return _do_python_join(rdd, other, numPartitions, dispatch)
+def python_full_outer_join(rdd, other, numPartitions):
+ def dispatch(seq):
+ vbuf, wbuf = [], []
+ for (n, v) in seq:
+ if n == 1:
+ vbuf.append(v)
+ elif n == 2:
+ wbuf.append(v)
+ if not vbuf:
+ vbuf.append(None)
+ if not wbuf:
+ wbuf.append(None)
+ return [(v, w) for v in vbuf for w in wbuf]
+ return _do_python_join(rdd, other, numPartitions, dispatch)
+
+
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda (k, v): (k, (i, v))
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ef233bc80..680140d72d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
- python_right_outer_join, python_cogroup
+ python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ class RDD(object):
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in other have key k.
+ (k, (v, None)) if no elements in C{other} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1403,6 +1403,27 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numPartitions)
+ def fullOuterJoin(self, other, numPartitions=None):
+ """
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in C{other} have key k.
+
+ Similarly, for each element (k, w) in C{other}, the resulting RDD will
+ either contain all pairs (k, (v, w)) for v in C{self}, or the pair
+ (k, (None, w)) if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4)])
+ >>> y = sc.parallelize([("a", 2), ("c", 8)])
+ >>> sorted(x.fullOuterJoin(y).collect())
+ [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
+ """
+ return python_full_outer_join(self, other, numPartitions)
+
# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.