aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-27 19:46:17 -0400
committerSean Owen <sowen@cloudera.com>2015-04-27 19:46:17 -0400
commit5d45e1f60059e2f2fc8ad64778b9ddcc8887c570 (patch)
tree149be28032b602497d6d8d7adf03f965448db669 /core
parent8e1c00dbf4b60962908626dead744e5d73c8085e (diff)
downloadspark-5d45e1f60059e2f2fc8ad64778b9ddcc8887c570.tar.gz
spark-5d45e1f60059e2f2fc8ad64778b9ddcc8887c570.tar.bz2
spark-5d45e1f60059e2f2fc8ad64778b9ddcc8887c570.zip
[SPARK-3090] [CORE] Stop SparkContext if user forgets to.
Set up a shutdown hook to try to stop the Spark context in case the user forgets to do it. The main effect is that any open logs files are flushed and closed, which is particularly interesting for event logs. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5696 from vanzin/SPARK-3090 and squashes the following commits: 3b554b5 [Marcelo Vanzin] [SPARK-3090] [core] Stop SparkContext if user forgets to.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala10
2 files changed, 33 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea4ddcc2e2..65b903a55d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
+ private var _shutdownHookRef: AnyRef = _
/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+
+ // Make sure the context is stopped if the user forgets about it. This avoids leaving
+ // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
+ // is killed, though.
+ _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+ logInfo("Invoking stop() from shutdown hook")
+ stop()
+ }
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
@@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("SparkContext already stopped.")
return
}
+ if (_shutdownHookRef != null) {
+ Utils.removeShutdownHook(_shutdownHookRef)
+ }
postApplicationEnd()
_ui.foreach(_.stop())
@@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
- private val activeContext: AtomicReference[SparkContext] =
+ private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
/**
@@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
}
/**
- * This function may be used to get or instantiate a SparkContext and register it as a
- * singleton object. Because we can only have one active SparkContext per JVM,
- * this is useful when applications may wish to share a SparkContext.
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
+ * this is useful when applications may wish to share a SparkContext.
*
- * Note: This function cannot be used to create multiple SparkContext instances
+ * Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
@@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}
-
+
/**
- * This function may be used to get or instantiate a SparkContext and register it as a
- * singleton object. Because we can only have one active SparkContext per JVM,
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
- *
+ *
* This method allows not passing a SparkConf (useful if just retrieving).
- *
- * Note: This function cannot be used to create multiple SparkContext instances
- * even if multiple contexts are allowed.
- */
+ *
+ * Note: This function cannot be used to create multiple SparkContext instances
+ * even if multiple contexts are allowed.
+ */
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c6c6df7cfa..342bc9a06d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {
val DEFAULT_SHUTDOWN_PRIORITY = 100
+ /**
+ * The shutdown priority of the SparkContext instance. This is lower than the default
+ * priority, so that by default hooks are run before the context is shut down.
+ */
+ val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
@@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
- addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
}
/**
@@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
- def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+ def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}