diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-06-12 08:14:25 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-12 08:14:25 -0700 |
commit | ce92a9c18f033ac9fa2f12143fab00a90e0f4577 (patch) | |
tree | 34507639ff2f876630c6868619844c52e13d3720 /python/pyspark/rdd.py | |
parent | 43d53d51c9ee2626d9de91faa3b192979b86821d (diff) | |
download | spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.gz spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.bz2 spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.zip |
SPARK-554. Add aggregateByKey.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #705 from sryza/sandy-spark-554 and squashes the following commits:
2302b8f [Sandy Ryza] Add MIMA exclude
f52e0ad [Sandy Ryza] Fix Python tests for real
2f3afa3 [Sandy Ryza] Fix Python test
0b735e9 [Sandy Ryza] Fix line lengths
ae56746 [Sandy Ryza] Fix doc (replace T with V)
c2be415 [Sandy Ryza] Java and Python aggregateByKey
23bf400 [Sandy Ryza] SPARK-554. Add aggregateByKey.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 19 |
1 files changed, 18 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8a215fc511..735389c698 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1178,6 +1178,20 @@ class RDD(object): combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) + + def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): + """ + Aggregate the values of each key, using given combine functions and a neutral "zero value". + This function can return a different result type, U, than the type of the values in this RDD, + V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, + The former operation is used for merging values within a partition, and the latter is used + for merging values between partitions. To avoid memory allocation, both of these functions are + allowed to modify and return their first argument instead of creating a new U. + """ + def createZero(): + return copy.deepcopy(zeroValue) + + return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) def foldByKey(self, zeroValue, func, numPartitions=None): """ @@ -1190,7 +1204,10 @@ class RDD(object): >>> rdd.foldByKey(0, add).collect() [('a', 2), ('b', 1)] """ - return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) + def createZero(): + return copy.deepcopy(zeroValue) + + return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) # TODO: support variant with custom partitioner |