aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorNicholas Hwang <moogling@gmail.com>2015-07-19 10:30:28 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-19 10:30:28 -0700
commita803ac3e060d181c7b34d9501c9350e5f215ba85 (patch)
tree0142302afbe93b2fff738909dfb775b4b9ceff54 /python/pyspark/rdd.py
parent34ed82bb44c4519819695ddc760e6c9a98bc2e40 (diff)
downloadspark-a803ac3e060d181c7b34d9501c9350e5f215ba85.tar.gz
spark-a803ac3e060d181c7b34d9501c9350e5f215ba85.tar.bz2
spark-a803ac3e060d181c7b34d9501c9350e5f215ba85.zip
[SPARK-9021] [PYSPARK] Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold()
I'm relatively new to Spark and functional programming, so forgive me if this pull request is just a result of my misunderstanding of how Spark should be used. Currently, if one happens to use a mutable object as `zeroValue` for `RDD.aggregate()`, possibly unexpected behavior can occur. This is because pyspark's current implementation of `RDD.aggregate()` does not serialize or make a copy of `zeroValue` before handing it off to `RDD.mapPartitions(...).fold(...)`. This results in a single reference to `zeroValue` being used for both `RDD.mapPartitions()` and `RDD.fold()` on each partition. This can result in strange accumulator values being fed into each partition's call to `RDD.fold()`, as the `zeroValue` may have been changed in-place during the `RDD.mapPartitions()` call. As an illustrative example, submit the following to `spark-submit`: ``` from pyspark import SparkConf, SparkContext import collections def updateCounter(acc, val): print 'update acc:', acc print 'update val:', val acc[val] += 1 return acc def comboCounter(acc1, acc2): print 'combo acc1:', acc1 print 'combo acc2:', acc2 acc1.update(acc2) return acc1 def main(): conf = SparkConf().setMaster("local").setAppName("Aggregate with Counter") sc = SparkContext(conf = conf) print '======= AGGREGATING with ONE PARTITION =======' print sc.parallelize(range(1,10), 1).aggregate(collections.Counter(), updateCounter, comboCounter) print '======= AGGREGATING with TWO PARTITIONS =======' print sc.parallelize(range(1,10), 2).aggregate(collections.Counter(), updateCounter, comboCounter) if __name__ == "__main__": main() ``` One probably expects this to output the following: ``` Counter({1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1}) ``` But it instead outputs this (regardless of the number of partitions): ``` Counter({1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2}) ``` This is because (I believe) `zeroValue` gets passed correctly to each partition, but after `RDD.mapPartitions()` completes, the `zeroValue` object has been updated and is then passed to `RDD.fold()`, which results in all items being double-counted within each partition before being finally reduced at the calling node. I realize that this type of calculation is typically done by `RDD.mapPartitions(...).reduceByKey(...)`, but hopefully this illustrates some potentially confusing behavior. I also noticed that other `RDD` methods use this `deepcopy` approach to creating unique copies of `zeroValue` (i.e., `RDD.aggregateByKey()` and `RDD.foldByKey()`), and that the Scala implementations do seem to serialize the `zeroValue` object appropriately to prevent this type of behavior. Author: Nicholas Hwang <moogling@gmail.com> Closes #7378 from njhwang/master and squashes the following commits: 659bb27 [Nicholas Hwang] Fixed RDD.aggregate() to perform a reduce operation on collected mapPartitions results, similar to how fold currently is implemented. This prevents an initial combOp being performed on each partition with zeroValue (which leads to unexpected behavior if zeroValue is a mutable object) before being combOp'ed with other partition results. 8d8d694 [Nicholas Hwang] Changed dict construction to be compatible with Python 2.6 (cannot use list comprehensions to make dicts) 56eb2ab [Nicholas Hwang] Fixed whitespace after colon to conform with PEP8 391de4a [Nicholas Hwang] Removed used of collections.Counter from RDD tests for Python 2.6 compatibility; used defaultdict(int) instead. Merged treeAggregate test with mutable zero value into aggregate test to reduce code duplication. 2fa4e4b [Nicholas Hwang] Merge branch 'master' of https://github.com/njhwang/spark ba528bd [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e17906098f801cbc2059e7a9054e8cab. Also replaced some parallelizations of ranges with xranges, per the documentation's recommendations of preferring xrange over range. 7820391 [Nicholas Hwang] Updated comments regarding protection of zeroValue from mutation in RDD.aggregate(). Added regression tests for aggregate(), fold(), aggregateByKey(), foldByKey(), and treeAggregate(), all with both 1 and 2 partition RDDs. Confirmed that aggregate() is the only problematic implementation as of commit 257236c3e17906098f801cbc2059e7a9054e8cab. 90d1544 [Nicholas Hwang] Made sure RDD.aggregate() makes a deepcopy of zeroValue for all partitions; this ensures that the mapPartitions call works with unique copies of zeroValue in each partition, and prevents a single reference to zeroValue being used for both map and fold calls on each partition (resulting in possibly unexpected behavior).
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 3218bed5c7..7e788148d9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -862,6 +862,9 @@ class RDD(object):
for obj in iterator:
acc = op(obj, acc)
yield acc
+ # collecting result of mapPartitions here ensures that the copy of
+ # zeroValue provided to each partition is unique from the one provided
+ # to the final reduce call
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
@@ -891,8 +894,11 @@ class RDD(object):
for obj in iterator:
acc = seqOp(acc, obj)
yield acc
-
- return self.mapPartitions(func).fold(zeroValue, combOp)
+ # collecting result of mapPartitions here ensures that the copy of
+ # zeroValue provided to each partition is unique from the one provided
+ # to the final reduce call
+ vals = self.mapPartitions(func).collect()
+ return reduce(combOp, vals, zeroValue)
def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
"""