diff options
author | zsxwing <zsxwing@gmail.com> | 2015-10-18 13:51:45 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-10-18 13:51:45 -0700 |
commit | 94c8fef296e5cdac9a93ed34acc079e51839caa7 (patch) | |
tree | 0963012cade287b289a8a91effc41d25735810bf /sql | |
parent | a337c235a12d4ea6a7d6db457acc6b32f1915241 (diff) | |
download | spark-94c8fef296e5cdac9a93ed34acc079e51839caa7.tar.gz spark-94c8fef296e5cdac9a93ed34acc079e51839caa7.tar.bz2 spark-94c8fef296e5cdac9a93ed34acc079e51839caa7.zip |
[SPARK-11126][SQL] Fix a memory leak in SQLListener._stageIdToStageMetrics
SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions.
Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html
Author: zsxwing <zsxwing@gmail.com>
Closes #9132 from zsxwing/SPARK-11126.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 18 |
2 files changed, 23 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b302b51999..5a072de400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi val stageId = stageSubmitted.stageInfo.stageId val stageAttemptId = stageSubmitted.stageInfo.attemptId // Always override metrics for old stage attempt - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + if (_stageIdToStageMetrics.contains(stageId)) { + _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + } else { + // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". + // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. + // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index cc1c1e10e9..03bcee94a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -313,7 +313,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(executionUIData.failedJobs === Seq(0)) } - ignore("no memory leak") { + test("SPARK-11126: no memory leak when running non SQL jobs") { + val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size + sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ()) + // listener should ignore the non SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber) + + sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ()) + // listener should save the SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1) + } + +} + +class SQLListenerMemoryLeakSuite extends SparkFunSuite { + + test("no memory leak") { val conf = new SparkConf() .setMaster("local") .setAppName("test") @@ -348,5 +363,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { sc.stop() } } - } |