diff options
Diffstat (limited to 'pyspark')
-rw-r--r-- | pyspark/pyspark/rdd.py | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 7d280d8844..af7703fdfc 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -141,7 +141,25 @@ class RDD(object): vals = PipelinedRDD(self, func).collect() return reduce(f, vals) - # TODO: fold + def fold(self, zeroValue, op): + """ + Aggregate the elements of each partition, and then the results for all + the partitions, using a given associative function and a neutral "zero + value." The function op(t1, t2) is allowed to modify t1 and return it + as its result value to avoid object allocation; however, it should not + modify t2. + + >>> from operator import add + >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) + 15 + """ + def func(iterator): + acc = zeroValue + for obj in iterator: + acc = op(obj, acc) + yield acc + vals = PipelinedRDD(self, func).collect() + return reduce(op, vals, zeroValue) # TODO: aggregate |