aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-18 13:51:45 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-18 13:51:45 -0700
commit94c8fef296e5cdac9a93ed34acc079e51839caa7 (patch)
tree0963012cade287b289a8a91effc41d25735810bf /sql
parenta337c235a12d4ea6a7d6db457acc6b32f1915241 (diff)
downloadspark-94c8fef296e5cdac9a93ed34acc079e51839caa7.tar.gz
spark-94c8fef296e5cdac9a93ed34acc079e51839caa7.tar.bz2
spark-94c8fef296e5cdac9a93ed34acc079e51839caa7.zip
[SPARK-11126][SQL] Fix a memory leak in SQLListener._stageIdToStageMetrics
SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions. Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html Author: zsxwing <zsxwing@gmail.com> Closes #9132 from zsxwing/SPARK-11126.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala18
2 files changed, 23 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b302b51999..5a072de400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
val stageId = stageSubmitted.stageInfo.stageId
val stageAttemptId = stageSubmitted.stageInfo.attemptId
// Always override metrics for old stage attempt
- _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+ if (_stageIdToStageMetrics.contains(stageId)) {
+ _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+ } else {
+ // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart".
+ // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution.
+ // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126).
+ }
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index cc1c1e10e9..03bcee94a2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -313,7 +313,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(executionUIData.failedJobs === Seq(0))
}
- ignore("no memory leak") {
+ test("SPARK-11126: no memory leak when running non SQL jobs") {
+ val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size
+ sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ())
+ // listener should ignore the non SQL stage
+ assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber)
+
+ sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
+ // listener should save the SQL stage
+ assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
+ }
+
+}
+
+class SQLListenerMemoryLeakSuite extends SparkFunSuite {
+
+ test("no memory leak") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
@@ -348,5 +363,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
sc.stop()
}
}
-
}