aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-09-24 20:39:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 20:39:09 -0700
commit8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch)
treefce8a6b30398815c0b3f7d03db38824d6af9e1a3 /python/pyspark/join.py
parent74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff)
downloadspark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2
spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1395 from staple/SPARK-546 and squashes the following commits: 1f5595c [Aaron Staple] Fix python style 7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream. 3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions. 31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py16
1 files changed, 16 insertions, 0 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b0f1cc1927..b4a8447137 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions):
return _do_python_join(rdd, other, numPartitions, dispatch)
+def python_full_outer_join(rdd, other, numPartitions):
+ def dispatch(seq):
+ vbuf, wbuf = [], []
+ for (n, v) in seq:
+ if n == 1:
+ vbuf.append(v)
+ elif n == 2:
+ wbuf.append(v)
+ if not vbuf:
+ vbuf.append(None)
+ if not wbuf:
+ wbuf.append(None)
+ return [(v, w) for v in vbuf for w in wbuf]
+ return _do_python_join(rdd, other, numPartitions, dispatch)
+
+
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda (k, v): (k, (i, v))