aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-10 13:37:11 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-10 13:37:11 -0700
commita59419c27e45f06be5143c58d48affb0a5158bdf (patch)
tree8e636eec68a9e52afc98f61cc030dacffbcfd812
parentf5518989b67a0941ca79368e73811895a5fa8669 (diff)
downloadspark-a59419c27e45f06be5143c58d48affb0a5158bdf.tar.gz
spark-a59419c27e45f06be5143c58d48affb0a5158bdf.tar.bz2
spark-a59419c27e45f06be5143c58d48affb0a5158bdf.zip
SPARK-1168, Added foldByKey to pyspark.
Author: Prashant Sharma <prashant.s@imaginea.com> Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits: db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark.
-rw-r--r--python/pyspark/rdd.py14
1 files changed, 14 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e1043ad564..39916d21c7 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -946,7 +946,21 @@ class RDD(object):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
+
+ def foldByKey(self, zeroValue, func, numPartitions=None):
+ """
+ Merge the values for each key using an associative function "func" and a neutral "zeroValue"
+ which may be added to the result an arbitrary number of times, and must not change
+ the result (e.g., 0 for addition, or 1 for multiplication.).
+ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+ >>> from operator import add
+ >>> rdd.foldByKey(0, add).collect()
+ [('a', 2), ('b', 1)]
+ """
+ return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
+
+
# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""