diff options
author | Aaron Staple <aaron.staple@gmail.com> | 2014-09-24 20:39:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-09-24 20:39:09 -0700 |
commit | 8ca4ecb6a56b96bae21b33e27f6abdb53676683a (patch) | |
tree | fce8a6b30398815c0b3f7d03db38824d6af9e1a3 /python | |
parent | 74fb2ecf7afc2d314f6477f8f2e6134614387453 (diff) | |
download | spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.gz spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.tar.bz2 spark-8ca4ecb6a56b96bae21b33e27f6abdb53676683a.zip |
[SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented. This patch adds fullOuterJoin.
Author: Aaron Staple <aaron.staple@gmail.com>
Closes #1395 from staple/SPARK-546 and squashes the following commits:
1f5595c [Aaron Staple] Fix python style
7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions.
31f2956 [Aaron Staple] Fix left outer join documentation comments.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/join.py | 16 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 25 |
2 files changed, 39 insertions, 2 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py index b0f1cc1927..b4a8447137 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions): return _do_python_join(rdd, other, numPartitions, dispatch) +def python_full_outer_join(rdd, other, numPartitions): + def dispatch(seq): + vbuf, wbuf = [], [] + for (n, v) in seq: + if n == 1: + vbuf.append(v) + elif n == 2: + wbuf.append(v) + if not vbuf: + vbuf.append(None) + if not wbuf: + wbuf.append(None) + return [(v, w) for v in vbuf for w in wbuf] + return _do_python_join(rdd, other, numPartitions, dispatch) + + def python_cogroup(rdds, numPartitions): def make_mapper(i): return lambda (k, v): (k, (i, v)) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8ef233bc80..680140d72d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer from pyspark.join import python_join, python_left_outer_join, \ - python_right_outer_join, python_cogroup + python_right_outer_join, python_full_outer_join, python_cogroup from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel @@ -1375,7 +1375,7 @@ class RDD(object): For each element (k, v) in C{self}, the resulting RDD will either contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in other have key k. + (k, (v, None)) if no elements in C{other} have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1403,6 +1403,27 @@ class RDD(object): """ return python_right_outer_join(self, other, numPartitions) + def fullOuterJoin(self, other, numPartitions=None): + """ + Perform a right outer join of C{self} and C{other}. + + For each element (k, v) in C{self}, the resulting RDD will either + contain all pairs (k, (v, w)) for w in C{other}, or the pair + (k, (v, None)) if no elements in C{other} have key k. + + Similarly, for each element (k, w) in C{other}, the resulting RDD will + either contain all pairs (k, (v, w)) for v in C{self}, or the pair + (k, (None, w)) if no elements in C{self} have key k. + + Hash-partitions the resulting RDD into the given number of partitions. + + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2), ("c", 8)]) + >>> sorted(x.fullOuterJoin(y).collect()) + [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))] + """ + return python_full_outer_join(self, other, numPartitions) + # TODO: add option to control map-side combining # portable_hash is used as default, because builtin hash of None is different # cross machines. |