aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-05-21 19:42:51 +0100
committerSean Owen <sowen@cloudera.com>2015-05-21 19:42:51 +0100
commit6e534026963e567f92743c5721de16325645223e (patch)
treea52c4c10cb6ca8c0eec0b3b04fe8880fbd8cb494
parent4b7ff3092c53827817079e0810563cbb0b9d0747 (diff)
downloadspark-6e534026963e567f92743c5721de16325645223e.tar.gz
spark-6e534026963e567f92743c5721de16325645223e.tar.bz2
spark-6e534026963e567f92743c5721de16325645223e.zip
[SPARK-6416] [DOCS] RDD.fold() requires the operator to be commutative
Document current limitation of rdd.fold. This does not resolve SPARK-6416 but just documents the issue. CC JoshRosen Author: Sean Owen <sowen@cloudera.com> Closes #6231 from srowen/SPARK-6416 and squashes the following commits: 9fef39f [Sean Owen] Add comment to other languages; reword to highlight the difference from non-distributed collections and to not suggest it is a bug that is to be fixed da40d84 [Sean Owen] Document current limitation of rdd.fold.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala13
-rw-r--r--python/pyspark/rdd.py12
3 files changed, 30 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 8bf0627fc4..74db764322 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -386,9 +386,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
- * modify t1 and return it as its result value to avoid object allocation; however, it should not
- * modify t2.
+ * given associative and commutative function and a neutral "zero value". The function
+ * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
+ * allocation; however, it should not modify t2.
+ *
+ * This behaves somewhat differently from fold operations implemented for non-distributed
+ * collections in functional languages like Scala. This fold operation may be applied to
+ * partitions individually, and then fold those results into the final result, rather than
+ * apply the fold to each element sequentially in some defined ordering. For functions
+ * that are not commutative, the result may differ from that of a fold applied to a
+ * non-distributed collection.
*/
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
rdd.fold(zeroValue)(f)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f7fa37e4cd..d772f03f76 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1015,9 +1015,16 @@ abstract class RDD[T: ClassTag](
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
- * modify t1 and return it as its result value to avoid object allocation; however, it should not
- * modify t2.
+ * given associative and commutative function and a neutral "zero value". The function
+ * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
+ * allocation; however, it should not modify t2.
+ *
+ * This behaves somewhat differently from fold operations implemented for non-distributed
+ * collections in functional languages like Scala. This fold operation may be applied to
+ * partitions individually, and then fold those results into the final result, rather than
+ * apply the fold to each element sequentially in some defined ordering. For functions
+ * that are not commutative, the result may differ from that of a fold applied to a
+ * non-distributed collection.
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 70db4bbe4c..98a8ff8606 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -813,13 +813,21 @@ class RDD(object):
def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
- the partitions, using a given associative function and a neutral "zero
- value."
+ the partitions, using a given associative and commutative function and
+ a neutral "zero value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
+ This behaves somewhat differently from fold operations implemented
+ for non-distributed collections in functional languages like Scala.
+ This fold operation may be applied to partitions individually, and then
+ fold those results into the final result, rather than apply the fold
+ to each element sequentially in some defined ordering. For functions
+ that are not commutative, the result may differ from that of a fold
+ applied to a non-distributed collection.
+
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15