diff options
author | Allan Douglas R. de Oliveira <allandouglas@gmail.com> | 2014-06-20 11:03:03 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-20 11:03:03 -0700 |
commit | 6a224c31e8563156ad5732a23667e73076984ae1 (patch) | |
tree | e3364e0ab07258a483668635a90da442a6d0a8df /python/pyspark/rdd.py | |
parent | d484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff) | |
download | spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2 spark-6a224c31e8563156ad5732a23667e73076984ae1.zip |
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs.
Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com>
Closes #813 from douglaz/more_cogroups and squashes the following commits:
f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case
0e9009c [Allan Douglas R. de Oliveira] Added scala tests
c3ffcdd [Allan Douglas R. de Oliveira] Added java tests
517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith
2f402d5 [Allan Douglas R. de Oliveira] Removed TODO
17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function
7877a2a [Allan Douglas R. de Oliveira] Fixed code
ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark
c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4
e94963c [Allan Douglas R. de Oliveira] Fixed spacing
f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues
d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 62a95c8467..1d55c35a8b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1233,7 +1233,7 @@ class RDD(object): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) - + def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): """ Aggregate the values of each key, using given combine functions and a neutral "zero value". @@ -1245,7 +1245,7 @@ class RDD(object): """ def createZero(): return copy.deepcopy(zeroValue) - + return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) def foldByKey(self, zeroValue, func, numPartitions=None): @@ -1323,12 +1323,20 @@ class RDD(object): map_values_fn = lambda (k, v): (k, f(v)) return self.map(map_values_fn, preservesPartitioning=True) - # TODO: support varargs cogroup of several RDDs. - def groupWith(self, other): + def groupWith(self, other, *others): """ - Alias for cogroup. + Alias for cogroup but with support for multiple RDDs. + + >>> w = sc.parallelize([("a", 5), ("b", 6)]) + >>> x = sc.parallelize([("a", 1), ("b", 4)]) + >>> y = sc.parallelize([("a", 2)]) + >>> z = sc.parallelize([("b", 42)]) + >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ + sorted(list(w.groupWith(x, y, z).collect()))) + [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] + """ - return self.cogroup(other) + return python_cogroup((self, other) + others, numPartitions=None) # TODO: add variant with custom parittioner def cogroup(self, other, numPartitions=None): @@ -1342,7 +1350,7 @@ class RDD(object): >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) [('a', ([1], [2])), ('b', ([4], []))] """ - return python_cogroup(self, other, numPartitions) + return python_cogroup((self, other), numPartitions) def subtractByKey(self, other, numPartitions=None): """ |