diff options
author | Allan Douglas R. de Oliveira <allandouglas@gmail.com> | 2014-06-20 11:03:03 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-20 11:03:03 -0700 |
commit | 6a224c31e8563156ad5732a23667e73076984ae1 (patch) | |
tree | e3364e0ab07258a483668635a90da442a6d0a8df /python/pyspark/join.py | |
parent | d484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff) | |
download | spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2 spark-6a224c31e8563156ad5732a23667e73076984ae1.zip |
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs.
Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com>
Closes #813 from douglaz/more_cogroups and squashes the following commits:
f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case
0e9009c [Allan Douglas R. de Oliveira] Added scala tests
c3ffcdd [Allan Douglas R. de Oliveira] Added java tests
517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith
2f402d5 [Allan Douglas R. de Oliveira] Removed TODO
17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function
7877a2a [Allan Douglas R. de Oliveira] Fixed code
ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark
c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4
e94963c [Allan Douglas R. de Oliveira] Fixed spacing
f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues
d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r-- | python/pyspark/join.py | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 6f94d26ef8..5f3a7e71f7 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -79,15 +79,15 @@ def python_left_outer_join(rdd, other, numPartitions): return _do_python_join(rdd, other, numPartitions, dispatch) -def python_cogroup(rdd, other, numPartitions): - vs = rdd.map(lambda (k, v): (k, (1, v))) - ws = other.map(lambda (k, v): (k, (2, v))) +def python_cogroup(rdds, numPartitions): + def make_mapper(i): + return lambda (k, v): (k, (i, v)) + vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)] + union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds) + rdd_len = len(vrdds) def dispatch(seq): - vbuf, wbuf = [], [] + bufs = [[] for i in range(rdd_len)] for (n, v) in seq: - if n == 1: - vbuf.append(v) - elif n == 2: - wbuf.append(v) - return (ResultIterable(vbuf), ResultIterable(wbuf)) - return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch) + bufs[n].append(v) + return tuple(map(ResultIterable, bufs)) + return union_vrdds.groupByKey(numPartitions).mapValues(dispatch) |