diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 8 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 18 |
2 files changed, 23 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b302b51999..5a072de400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi val stageId = stageSubmitted.stageInfo.stageId val stageAttemptId = stageSubmitted.stageInfo.attemptId // Always override metrics for old stage attempt - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + if (_stageIdToStageMetrics.contains(stageId)) { + _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId) + } else { + // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart". + // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution. + // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126). + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index cc1c1e10e9..03bcee94a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -313,7 +313,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(executionUIData.failedJobs === Seq(0)) } - ignore("no memory leak") { + test("SPARK-11126: no memory leak when running non SQL jobs") { + val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size + sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ()) + // listener should ignore the non SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber) + + sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ()) + // listener should save the SQL stage + assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1) + } + +} + +class SQLListenerMemoryLeakSuite extends SparkFunSuite { + + test("no memory leak") { val conf = new SparkConf() .setMaster("local") .setAppName("test") @@ -348,5 +363,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { sc.stop() } } - } |