aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-21 17:41:31 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-21 17:41:31 -0700
commitd68ea24d60ce1aa55b06a8c107f42544d696eb41 (patch)
tree20a4b2683beeba001e27710066e0fe3d7b76321d /streaming
parent347b50106bd1bcd40049f1ca29cefbb0baf53413 (diff)
downloadspark-d68ea24d60ce1aa55b06a8c107f42544d696eb41.tar.gz
spark-d68ea24d60ce1aa55b06a8c107f42544d696eb41.tar.bz2
spark-d68ea24d60ce1aa55b06a8c107f42544d696eb41.zip
[SPARK-7776] [STREAMING] Added shutdown hook to StreamingContext
Shutdown hook to stop SparkContext was added recently. This results in ugly errors when a streaming application is terminated by ctrl-C. ``` Exception in thread "Thread-27" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403) at org.apache.spark.SparkContext.stop(SparkContext.scala:1642) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:559) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2266) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2236) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2236) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2236) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2218) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) ``` This is because the Spark's shutdown hook stops the context, and the streaming jobs fail in the middle. The correct solution is to stop the streaming context before the spark context. This PR adds the shutdown hook to do so with a priority higher than the SparkContext's shutdown hooks priority. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6307 from tdas/SPARK-7776 and squashes the following commits: e3d5475 [Tathagata Das] Added conf to specify graceful shutdown 4c18652 [Tathagata Das] Added shutdown hook to StreamingContxt.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala18
1 files changed, 17 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 160fc42c57..7b77d447ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.CallSite
+import org.apache.spark.util.{CallSite, Utils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null)
+ private var shutdownHookRef: AnyRef = _
+
/**
* Return the associated Spark context
*/
@@ -584,6 +586,8 @@ class StreamingContext private[streaming] (
state = StreamingContextState.ACTIVE
StreamingContext.setActiveContext(this)
}
+ shutdownHookRef = Utils.addShutdownHook(
+ StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
@@ -660,6 +664,9 @@ class StreamingContext private[streaming] (
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
+ if (shutdownHookRef != null) {
+ Utils.removeShutdownHook(shutdownHookRef)
+ }
logInfo("StreamingContext stopped successfully")
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
@@ -670,6 +677,13 @@ class StreamingContext private[streaming] (
state = STOPPED
}
}
+
+ private def stopOnShutdown(): Unit = {
+ val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+ logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
+ // Do not stop SparkContext, let its own shutdown hook stop it
+ stop(stopSparkContext = false, stopGracefully = stopGracefully)
+ }
}
/**
@@ -685,6 +699,8 @@ object StreamingContext extends Logging {
*/
private val ACTIVATION_LOCK = new Object()
+ private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+
private val activeContext = new AtomicReference[StreamingContext](null)
private def assertNoOtherContextIsActive(): Unit = {