aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala18
1 files changed, 17 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 160fc42c57..7b77d447ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.CallSite
+import org.apache.spark.util.{CallSite, Utils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null)
+ private var shutdownHookRef: AnyRef = _
+
/**
* Return the associated Spark context
*/
@@ -584,6 +586,8 @@ class StreamingContext private[streaming] (
state = StreamingContextState.ACTIVE
StreamingContext.setActiveContext(this)
}
+ shutdownHookRef = Utils.addShutdownHook(
+ StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
@@ -660,6 +664,9 @@ class StreamingContext private[streaming] (
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
+ if (shutdownHookRef != null) {
+ Utils.removeShutdownHook(shutdownHookRef)
+ }
logInfo("StreamingContext stopped successfully")
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
@@ -670,6 +677,13 @@ class StreamingContext private[streaming] (
state = STOPPED
}
}
+
+ private def stopOnShutdown(): Unit = {
+ val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+ logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
+ // Do not stop SparkContext, let its own shutdown hook stop it
+ stop(stopSparkContext = false, stopGracefully = stopGracefully)
+ }
}
/**
@@ -685,6 +699,8 @@ object StreamingContext extends Logging {
*/
private val ACTIVATION_LOCK = new Object()
+ private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+
private val activeContext = new AtomicReference[StreamingContext](null)
private def assertNoOtherContextIsActive(): Unit = {