aboutsummaryrefslogtreecommitdiff
path: root/pyspark/pyspark/rdd.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-24 21:07:26 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-27 00:16:47 -0700
commit65e8406029a0fe1e1c5c5d033d335b43f6743a04 (patch)
tree017316693ad75b13d4d33a973b3f07fce26c2504 /pyspark/pyspark/rdd.py
parentf3b852ce66d193e3421eeecef71ea27bff73a94b (diff)
downloadspark-65e8406029a0fe1e1c5c5d033d335b43f6743a04.tar.gz
spark-65e8406029a0fe1e1c5c5d033d335b43f6743a04.tar.bz2
spark-65e8406029a0fe1e1c5c5d033d335b43f6743a04.zip
Implement fold() in Python API.
Diffstat (limited to 'pyspark/pyspark/rdd.py')
-rw-r--r--pyspark/pyspark/rdd.py20
1 files changed, 19 insertions, 1 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 7d280d8844..af7703fdfc 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -141,7 +141,25 @@ class RDD(object):
vals = PipelinedRDD(self, func).collect()
return reduce(f, vals)
- # TODO: fold
+ def fold(self, zeroValue, op):
+ """
+ Aggregate the elements of each partition, and then the results for all
+ the partitions, using a given associative function and a neutral "zero
+ value." The function op(t1, t2) is allowed to modify t1 and return it
+ as its result value to avoid object allocation; however, it should not
+ modify t2.
+
+ >>> from operator import add
+ >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
+ 15
+ """
+ def func(iterator):
+ acc = zeroValue
+ for obj in iterator:
+ acc = op(obj, acc)
+ yield acc
+ vals = PipelinedRDD(self, func).collect()
+ return reduce(op, vals, zeroValue)
# TODO: aggregate