diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-11 14:06:23 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-11 14:06:23 -0700 |
commit | f16bc68dfb25c7b746ae031a57840ace9bafa87f (patch) | |
tree | 4d5d61a036b7a0de11eb480fc934fb6bdc526407 /core | |
parent | 00c02728a6c6c4282c389ca90641dd78dd5e3d32 (diff) | |
download | spark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.tar.gz spark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.tar.bz2 spark-f16bc68dfb25c7b746ae031a57840ace9bafa87f.zip |
[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks WeakReference
`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed.
This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak.
Author: zsxwing <zsxwing@gmail.com>
Closes #8108 from zsxwing/internal-accumulators-leak.
Diffstat (limited to 'core')
3 files changed, 16 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 064246dfa7..c39c8667d0 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -382,14 +382,18 @@ private[spark] object InternalAccumulator { * add to the same set of accumulators. We do this to report the distribution of accumulator * values across all tasks within each stage. */ - def create(): Seq[Accumulator[Long]] = { - Seq( - // Execution memory refers to the memory used by internal data structures created - // during shuffles, aggregations and joins. The value of this accumulator should be - // approximately the sum of the peak sizes across all such data structures created - // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. - new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) - ) ++ maybeTestAccumulator.toSeq + def create(sc: SparkContext): Seq[Accumulator[Long]] = { + val internalAccumulators = Seq( + // Execution memory refers to the memory used by internal data structures created + // during shuffles, aggregations and joins. The value of this accumulator should be + // approximately the sum of the peak sizes across all such data structures created + // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. + new Accumulator( + 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) + ) ++ maybeTestAccumulator.toSeq + internalAccumulators.foreach { accumulator => + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) + } + internalAccumulators } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index de05ee256d..1cf06856ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -81,7 +81,7 @@ private[spark] abstract class Stage( * accumulators here again will override partial values from the finished tasks. */ def resetInternalAccumulators(): Unit = { - _internalAccumulators = InternalAccumulator.create() + _internalAccumulators = InternalAccumulator.create(rdd.sparkContext) } /** diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 48f549575f..0eb2293a9d 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("internal accumulators in TaskContext") { - val accums = InternalAccumulator.create() + sc = new SparkContext("local", "test") + val accums = InternalAccumulator.create(sc) val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums) val internalMetricsToAccums = taskContext.internalMetricsToAccumulators val collectedInternalAccums = taskContext.collectInternalAccumulators() |