aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-12-15 16:06:15 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-17 12:20:29 -0800
commit76c88c6687033bd3cb9a686ea922098f4c0212ad (patch)
tree91d6f32d6dfcdf79139dab715bda96b8bd5d8fd9 /core
parente635168556025c36f31ee68ccd0144a5179855bb (diff)
downloadspark-76c88c6687033bd3cb9a686ea922098f4c0212ad.tar.gz
spark-76c88c6687033bd3cb9a686ea922098f4c0212ad.tar.bz2
spark-76c88c6687033bd3cb9a686ea922098f4c0212ad.zip
SPARK-785 [CORE] ClosureCleaner not invoked on most PairRDDFunctions
This looked like perhaps a simple and important one. `combineByKey` looks like it should clean its arguments' closures, and that in turn covers apparently all remaining functions in `PairRDDFunctions` which delegate to it. Author: Sean Owen <sowen@cloudera.com> Closes #3690 from srowen/SPARK-785 and squashes the following commits: 8df68fe [Sean Owen] Clean context of most remaining functions in PairRDDFunctions, which ultimately call combineByKey (cherry picked from commit 2a28bc61009a170af3853c78f7f36970898a6d56) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala5
1 files changed, 4 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 8c2c959e73..0a0f0c36b5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -86,7 +86,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
- val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val aggregator = new Aggregator[K, V, C](
+ self.context.clean(createCombiner),
+ self.context.clean(mergeValue),
+ self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()