aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/join.py
diff options
context:
space:
mode:
authorAllan Douglas R. de Oliveira <allandouglas@gmail.com>2014-06-20 11:03:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-20 11:03:03 -0700
commit6a224c31e8563156ad5732a23667e73076984ae1 (patch)
treee3364e0ab07258a483668635a90da442a6d0a8df /python/pyspark/join.py
parentd484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff)
downloadspark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz
spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2
spark-6a224c31e8563156ad5732a23667e73076984ae1.zip
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs. Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com> Closes #813 from douglaz/more_cogroups and squashes the following commits: f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case 0e9009c [Allan Douglas R. de Oliveira] Added scala tests c3ffcdd [Allan Douglas R. de Oliveira] Added java tests 517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith 2f402d5 [Allan Douglas R. de Oliveira] Removed TODO 17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function 7877a2a [Allan Douglas R. de Oliveira] Fixed code ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4 e94963c [Allan Douglas R. de Oliveira] Fixed spacing f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'python/pyspark/join.py')
-rw-r--r--python/pyspark/join.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 6f94d26ef8..5f3a7e71f7 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -79,15 +79,15 @@ def python_left_outer_join(rdd, other, numPartitions):
return _do_python_join(rdd, other, numPartitions, dispatch)
-def python_cogroup(rdd, other, numPartitions):
- vs = rdd.map(lambda (k, v): (k, (1, v)))
- ws = other.map(lambda (k, v): (k, (2, v)))
+def python_cogroup(rdds, numPartitions):
+ def make_mapper(i):
+ return lambda (k, v): (k, (i, v))
+ vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
+ union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
+ rdd_len = len(vrdds)
def dispatch(seq):
- vbuf, wbuf = [], []
+ bufs = [[] for i in range(rdd_len)]
for (n, v) in seq:
- if n == 1:
- vbuf.append(v)
- elif n == 2:
- wbuf.append(v)
- return (ResultIterable(vbuf), ResultIterable(wbuf))
- return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
+ bufs[n].append(v)
+ return tuple(map(ResultIterable, bufs))
+ return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)