aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /python
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/join.py16
-rw-r--r--python/pyspark/rdd.py25
2 files changed, 39 insertions, 2 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b0f1cc1927..b4a8447137 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions):
return _do_python_join(rdd, other, numPartitions, dispatch)
+def python_full_outer_join(rdd, other, numPartitions):
+ def dispatch(seq):
+ vbuf, wbuf = [], []
+ for (n, v) in seq:
+ if n == 1:
+ vbuf.append(v)
+ elif n == 2:
+ wbuf.append(v)
+ if not vbuf:
+ vbuf.append(None)
+ if not wbuf:
+ wbuf.append(None)
+ return [(v, w) for v in vbuf for w in wbuf]
+ return _do_python_join(rdd, other, numPartitions, dispatch)
+
+
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda (k, v): (k, (i, v))
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ef233bc80..680140d72d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
- python_right_outer_join, python_cogroup
+ python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ class RDD(object):
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
- (k, (v, None)) if no elements in other have key k.
+ (k, (v, None)) if no elements in C{other} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
@@ -1403,6 +1403,27 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numPartitions)
+ def fullOuterJoin(self, other, numPartitions=None):
+ """
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in C{other} have key k.
+
+ Similarly, for each element (k, w) in C{other}, the resulting RDD will
+ either contain all pairs (k, (v, w)) for v in C{self}, or the pair
+ (k, (None, w)) if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
+ >>> x = sc.parallelize([("a", 1), ("b", 4)])
+ >>> y = sc.parallelize([("a", 2), ("c", 8)])
+ >>> sorted(x.fullOuterJoin(y).collect())
+ [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
+ """
+ return python_full_outer_join(self, other, numPartitions)
+
# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.