From 303f00a4bf6660dd83c8bd9e3a107bb3438a421b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 9 Feb 2017 11:16:51 -0800 Subject: [SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt ## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16825 from zsxwing/SPARK-19481. --- .../src/main/scala/org/apache/spark/repl/Main.scala | 1 + .../scala/org/apache/spark/repl/SparkILoop.scala | 1 - .../src/main/scala/org/apache/spark/repl/Main.scala | 2 +- .../main/scala/org/apache/spark/repl/Signaling.scala | 20 +++++++++++--------- 4 files changed, 13 insertions(+), 11 deletions(-) (limited to 'repl') diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala index 7b4e14bb6a..fba321be91 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() private var _interp: SparkILoop = _ diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index e017aa42a4..b7237a6ce8 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1027,7 +1027,6 @@ class SparkILoop( builder.getOrCreate() } sparkContext = sparkSession.sparkContext - Signaling.cancelOnInterrupt(sparkContext) sparkSession } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index fec4d49379..7f2ec01cc9 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() val conf = new SparkConf() val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) @@ -108,7 +109,6 @@ object Main extends Logging { logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext - Signaling.cancelOnInterrupt(sparkContext) sparkSession } diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala index 202febf144..9577e0ecaa 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala @@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging { * when no jobs are currently running. * This makes it possible to interrupt a running shell job by pressing Ctrl+C. */ - def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { - if (!ctx.statusTracker.getActiveJobIds().isEmpty) { - logWarning("Cancelling all active jobs, this can take a while. " + - "Press Ctrl+C again to exit now.") - ctx.cancelAllJobs() - true - } else { - false - } + def cancelOnInterrupt(): Unit = SignalUtils.register("INT") { + SparkContext.getActive.map { ctx => + if (!ctx.statusTracker.getActiveJobIds().isEmpty) { + logWarning("Cancelling all active jobs, this can take a while. " + + "Press Ctrl+C again to exit now.") + ctx.cancelAllJobs() + true + } else { + false + } + }.getOrElse(false) } } -- cgit v1.2.3