aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorAllan Douglas R. de Oliveira <allandouglas@gmail.com>2014-06-20 11:03:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-20 11:03:03 -0700
commit6a224c31e8563156ad5732a23667e73076984ae1 (patch)
treee3364e0ab07258a483668635a90da442a6d0a8df /python/pyspark/rdd.py
parentd484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff)
downloadspark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz
spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2
spark-6a224c31e8563156ad5732a23667e73076984ae1.zip
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs. Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com> Closes #813 from douglaz/more_cogroups and squashes the following commits: f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case 0e9009c [Allan Douglas R. de Oliveira] Added scala tests c3ffcdd [Allan Douglas R. de Oliveira] Added java tests 517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith 2f402d5 [Allan Douglas R. de Oliveira] Removed TODO 17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function 7877a2a [Allan Douglas R. de Oliveira] Fixed code ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4 e94963c [Allan Douglas R. de Oliveira] Fixed spacing f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py22
1 files changed, 15 insertions, 7 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 62a95c8467..1d55c35a8b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1233,7 +1233,7 @@ class RDD(object):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
-
+
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
"""
Aggregate the values of each key, using given combine functions and a neutral "zero value".
@@ -1245,7 +1245,7 @@ class RDD(object):
"""
def createZero():
return copy.deepcopy(zeroValue)
-
+
return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
def foldByKey(self, zeroValue, func, numPartitions=None):
@@ -1323,12 +1323,20 @@ class RDD(object):
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)
- # TODO: support varargs cogroup of several RDDs.
- def groupWith(self, other):
+ def groupWith(self, other, *others):
"""
- Alias for cogroup.
+ Alias for cogroup but with support for multiple RDDs.
+
+ >>> w = sc.parallelize([("a", 5), ("b", 6)])
+ >>> x = sc.parallelize([("a", 1), ("b", 4)])
+ >>> y = sc.parallelize([("a", 2)])
+ >>> z = sc.parallelize([("b", 42)])
+ >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
+ sorted(list(w.groupWith(x, y, z).collect())))
+ [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
+
"""
- return self.cogroup(other)
+ return python_cogroup((self, other) + others, numPartitions=None)
# TODO: add variant with custom parittioner
def cogroup(self, other, numPartitions=None):
@@ -1342,7 +1350,7 @@ class RDD(object):
>>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
[('a', ([1], [2])), ('b', ([4], []))]
"""
- return python_cogroup(self, other, numPartitions)
+ return python_cogroup((self, other), numPartitions)
def subtractByKey(self, other, numPartitions=None):
"""