aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2014-04-24 23:07:54 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-24 23:08:06 -0700
commitf09a2c0af2c472f6e644e0f2a39b8d5c055edfa4 (patch)
treeac2ec6a0816140aba64fa1555860a06c8c036dbc /python
parent496b9ae180f1d9c6aac0680e953f2250e03be808 (diff)
downloadspark-f09a2c0af2c472f6e644e0f2a39b8d5c055edfa4.tar.gz
spark-f09a2c0af2c472f6e644e0f2a39b8d5c055edfa4.tar.bz2
spark-f09a2c0af2c472f6e644e0f2a39b8d5c055edfa4.zip
SPARK-1242 Add aggregate to python rdd
Author: Holden Karau <holden@pigscanfly.ca> Closes #139 from holdenk/add_aggregate_to_python_api and squashes the following commits: 0f39ae3 [Holden Karau] Merge in master 4879c75 [Holden Karau] CR feedback, fix issue with empty RDDs in aggregate 70b4724 [Holden Karau] Style fixes from code review 96b047b [Holden Karau] Add aggregate to python rdd (cherry picked from commit e03bc379ee03fde0ee4fa578d3c39aae35c63f01) Signed-off-by: Matei Zaharia <matei@databricks.com>
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py31
1 files changed, 29 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d73ab7006e..a59778c721 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -599,7 +599,7 @@ class RDD(object):
def reduce(self, f):
"""
Reduces the elements of this RDD using the specified commutative and
- associative binary operator.
+ associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
@@ -641,7 +641,34 @@ class RDD(object):
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
- # TODO: aggregate
+ def aggregate(self, zeroValue, seqOp, combOp):
+ """
+ Aggregate the elements of each partition, and then the results for all
+ the partitions, using a given combine functions and a neutral "zero
+ value."
+
+ The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
+ as its result value to avoid object allocation; however, it should not
+ modify C{t2}.
+
+ The first function (seqOp) can return a different result type, U, than
+ the type of this RDD. Thus, we need one operation for merging a T into an U
+ and one operation for merging two U
+
+ >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
+ >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
+ >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
+ (10, 4)
+ >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
+ (0, 0)
+ """
+ def func(iterator):
+ acc = zeroValue
+ for obj in iterator:
+ acc = seqOp(acc, obj)
+ yield acc
+
+ return self.mapPartitions(func).fold(zeroValue, combOp)
def max(self):