aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-11 14:06:23 -0700
committerReynold Xin <rxin@databricks.com>2015-08-11 14:06:23 -0700
commitf16bc68dfb25c7b746ae031a57840ace9bafa87f (patch)
tree4d5d61a036b7a0de11eb480fc934fb6bdc526407 /core
parent00c02728a6c6c4282c389ca90641dd78dd5e3d32 (diff)
downloadspark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.tar.gz
spark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.tar.bz2
spark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.zip
[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks WeakReference
`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed. This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak. Author: zsxwing <zsxwing@gmail.com> Closes #8108 from zsxwing/internal-accumulators-leak.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala3
3 files changed, 16 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 064246dfa7..c39c8667d0 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
- def create(): Seq[Accumulator[Long]] = {
- Seq(
- // Execution memory refers to the memory used by internal data structures created
- // during shuffles, aggregations and joins. The value of this accumulator should be
- // approximately the sum of the peak sizes across all such data structures created
- // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
- new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
- ) ++ maybeTestAccumulator.toSeq
+ def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+ val internalAccumulators = Seq(
+ // Execution memory refers to the memory used by internal data structures created
+ // during shuffles, aggregations and joins. The value of this accumulator should be
+ // approximately the sum of the peak sizes across all such data structures created
+ // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
+ new Accumulator(
+ 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
+ ) ++ maybeTestAccumulator.toSeq
+ internalAccumulators.foreach { accumulator =>
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+ }
+ internalAccumulators
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index de05ee256d..1cf06856ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -81,7 +81,7 @@ private[spark] abstract class Stage(
* accumulators here again will override partial values from the finished tasks.
*/
def resetInternalAccumulators(): Unit = {
- _internalAccumulators = InternalAccumulator.create()
+ _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 48f549575f..0eb2293a9d 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("internal accumulators in TaskContext") {
- val accums = InternalAccumulator.create()
+ sc = new SparkContext("local", "test")
+ val accums = InternalAccumulator.create(sc)
val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
val collectedInternalAccums = taskContext.collectInternalAccumulators()