aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala10
2 files changed, 33 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea4ddcc2e2..65b903a55d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
+ private var _shutdownHookRef: AnyRef = _
/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+
+ // Make sure the context is stopped if the user forgets about it. This avoids leaving
+ // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
+ // is killed, though.
+ _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+ logInfo("Invoking stop() from shutdown hook")
+ stop()
+ }
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
@@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("SparkContext already stopped.")
return
}
+ if (_shutdownHookRef != null) {
+ Utils.removeShutdownHook(_shutdownHookRef)
+ }
postApplicationEnd()
_ui.foreach(_.stop())
@@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
- private val activeContext: AtomicReference[SparkContext] =
+ private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
/**
@@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
}
/**
- * This function may be used to get or instantiate a SparkContext and register it as a
- * singleton object. Because we can only have one active SparkContext per JVM,
- * this is useful when applications may wish to share a SparkContext.
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
+ * this is useful when applications may wish to share a SparkContext.
*
- * Note: This function cannot be used to create multiple SparkContext instances
+ * Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
@@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}
-
+
/**
- * This function may be used to get or instantiate a SparkContext and register it as a
- * singleton object. Because we can only have one active SparkContext per JVM,
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
- *
+ *
* This method allows not passing a SparkConf (useful if just retrieving).
- *
- * Note: This function cannot be used to create multiple SparkContext instances
- * even if multiple contexts are allowed.
- */
+ *
+ * Note: This function cannot be used to create multiple SparkContext instances
+ * even if multiple contexts are allowed.
+ */
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c6c6df7cfa..342bc9a06d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {
val DEFAULT_SHUTDOWN_PRIORITY = 100
+ /**
+ * The shutdown priority of the SparkContext instance. This is lower than the default
+ * priority, so that by default hooks are run before the context is shut down.
+ */
+ val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
@@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
- addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+ addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
}
/**
@@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
- def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+ def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}