From 65e8406029a0fe1e1c5c5d033d335b43f6743a04 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Aug 2012 21:07:26 -0700 Subject: Implement fold() in Python API. --- pyspark/pyspark/rdd.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'pyspark') diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 7d280d8844..af7703fdfc 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -141,7 +141,25 @@ class RDD(object): vals = PipelinedRDD(self, func).collect() return reduce(f, vals) - # TODO: fold + def fold(self, zeroValue, op): + """ + Aggregate the elements of each partition, and then the results for all + the partitions, using a given associative function and a neutral "zero + value." The function op(t1, t2) is allowed to modify t1 and return it + as its result value to avoid object allocation; however, it should not + modify t2. + + >>> from operator import add + >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) + 15 + """ + def func(iterator): + acc = zeroValue + for obj in iterator: + acc = op(obj, acc) + yield acc + vals = PipelinedRDD(self, func).collect() + return reduce(op, vals, zeroValue) # TODO: aggregate -- cgit v1.2.3