aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-09 11:16:51 -0800
committerDavies Liu <davies.liu@gmail.com>2017-02-09 11:16:51 -0800
commit303f00a4bf6660dd83c8bd9e3a107bb3438a421b (patch)
treedf6209c244f94d4ea7782e1ace2b1099e8f50d58 /repl
parent6287c94f08200d548df5cc0a401b73b84f9968c4 (diff)
downloadspark-303f00a4bf6660dd83c8bd9e3a107bb3438a421b.tar.gz
spark-303f00a4bf6660dd83c8bd9e3a107bb3438a421b.tar.bz2
spark-303f00a4bf6660dd83c8bd9e3a107bb3438a421b.zip
[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt
## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16825 from zsxwing/SPARK-19481.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala1
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala1
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala2
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/Signaling.scala20
4 files changed, 13 insertions, 11 deletions
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
index 7b4e14bb6a..fba321be91 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
object Main extends Logging {
initializeLogIfNecessary(true)
+ Signaling.cancelOnInterrupt()
private var _interp: SparkILoop = _
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index e017aa42a4..b7237a6ce8 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1027,7 +1027,6 @@ class SparkILoop(
builder.getOrCreate()
}
sparkContext = sparkSession.sparkContext
- Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index fec4d49379..7f2ec01cc9 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
object Main extends Logging {
initializeLogIfNecessary(true)
+ Signaling.cancelOnInterrupt()
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@@ -108,7 +109,6 @@ object Main extends Logging {
logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
- Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
index 202febf144..9577e0ecaa 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
@@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
*/
- def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
- if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
- logWarning("Cancelling all active jobs, this can take a while. " +
- "Press Ctrl+C again to exit now.")
- ctx.cancelAllJobs()
- true
- } else {
- false
- }
+ def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
+ SparkContext.getActive.map { ctx =>
+ if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
+ logWarning("Cancelling all active jobs, this can take a while. " +
+ "Press Ctrl+C again to exit now.")
+ ctx.cancelAllJobs()
+ true
+ } else {
+ false
+ }
+ }.getOrElse(false)
}
}