aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-03-16 23:58:52 -0700
committerReynold Xin <rxin@databricks.com>2015-03-16 23:58:52 -0700
commitb2d8c02224892192b1aa314b4265fe50845932f9 (patch)
tree4ca059befc4cedd18de07c8b2800c9f4e2d3ec7a /core
parentb3e6eca81f79ba3c9205211797fa825b199bac83 (diff)
downloadspark-b2d8c02224892192b1aa314b4265fe50845932f9.tar.gz
spark-b2d8c02224892192b1aa314b4265fe50845932f9.tar.bz2
spark-b2d8c02224892192b1aa314b4265fe50845932f9.zip
SPARK-6044 [CORE] RDD.aggregate() should not use the closure serializer on the zero value
Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer. Compare with https://github.com/apache/spark/blob/e60ad2f4c47b011be7a3198689ac2b82ee317d96/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L127 Author: Sean Owen <sowen@cloudera.com> Closes #5028 from srowen/SPARK-6044 and squashes the following commits: a4040a7 [Sean Owen] Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cf0433010a..a139780d96 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -960,7 +960,7 @@ abstract class RDD[T: ClassTag](
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
- var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+ var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)