aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pyspark/pyspark/rdd.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 7d280d8844..af7703fdfc 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -141,7 +141,25 @@ class RDD(object):
vals = PipelinedRDD(self, func).collect()
return reduce(f, vals)
- # TODO: fold
+ def fold(self, zeroValue, op):
+ """
+ Aggregate the elements of each partition, and then the results for all
+ the partitions, using a given associative function and a neutral "zero
+ value." The function op(t1, t2) is allowed to modify t1 and return it
+ as its result value to avoid object allocation; however, it should not
+ modify t2.
+
+ >>> from operator import add
+ >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
+ 15
+ """
+ def func(iterator):
+ acc = zeroValue
+ for obj in iterator:
+ acc = op(obj, acc)
+ yield acc
+ vals = PipelinedRDD(self, func).collect()
+ return reduce(op, vals, zeroValue)
# TODO: aggregate