aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-06-12 08:14:25 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-12 08:14:25 -0700
commitce92a9c18f033ac9fa2f12143fab00a90e0f4577 (patch)
tree34507639ff2f876630c6868619844c52e13d3720 /python/pyspark/rdd.py
parent43d53d51c9ee2626d9de91faa3b192979b86821d (diff)
downloadspark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.gz
spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.bz2
spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.zip
SPARK-554. Add aggregateByKey.
Author: Sandy Ryza <sandy@cloudera.com> Closes #705 from sryza/sandy-spark-554 and squashes the following commits: 2302b8f [Sandy Ryza] Add MIMA exclude f52e0ad [Sandy Ryza] Fix Python tests for real 2f3afa3 [Sandy Ryza] Fix Python test 0b735e9 [Sandy Ryza] Fix line lengths ae56746 [Sandy Ryza] Fix doc (replace T with V) c2be415 [Sandy Ryza] Java and Python aggregateByKey 23bf400 [Sandy Ryza] SPARK-554. Add aggregateByKey.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py19
1 files changed, 18 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8a215fc511..735389c698 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1178,6 +1178,20 @@ class RDD(object):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
+
+ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
+ """
+ Aggregate the values of each key, using given combine functions and a neutral "zero value".
+ This function can return a different result type, U, than the type of the values in this RDD,
+ V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
+ The former operation is used for merging values within a partition, and the latter is used
+ for merging values between partitions. To avoid memory allocation, both of these functions are
+ allowed to modify and return their first argument instead of creating a new U.
+ """
+ def createZero():
+ return copy.deepcopy(zeroValue)
+
+ return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
def foldByKey(self, zeroValue, func, numPartitions=None):
"""
@@ -1190,7 +1204,10 @@ class RDD(object):
>>> rdd.foldByKey(0, add).collect()
[('a', 2), ('b', 1)]
"""
- return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
+ def createZero():
+ return copy.deepcopy(zeroValue)
+
+ return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
# TODO: support variant with custom partitioner