diff options
author | zsxwing <zsxwing@gmail.com> | 2015-10-16 13:56:51 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-10-16 13:56:51 -0700 |
commit | e1eef248f13f6c334fe4eea8a29a1de5470a2e62 (patch) | |
tree | 8cdbc735162ba268681d2a6ad90ba90cda70e0af /streaming/src/main | |
parent | 369d786f58580e7df73e7e23f27390d37269d0de (diff) | |
download | spark-e1eef248f13f6c334fe4eea8a29a1de5470a2e62.tar.gz spark-e1eef248f13f6c334fe4eea8a29a1de5470a2e62.tar.bz2 spark-e1eef248f13f6c334fe4eea8a29a1de5470a2e62.zip |
[SPARK-11104] [STREAMING] Fix a deadlock in StreamingContex.stop
The following deadlock may happen if shutdownHook and StreamingContext.stop are running at the same time.
```
Java stack information for the threads listed above:
===================================================
"Thread-2":
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:699)
- waiting to lock <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:729)
at org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:625)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:266)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1697)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:236)
- locked <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
"main":
at org.apache.spark.util.SparkShutdownHookManager.remove(ShutdownHookManager.scala:248)
- waiting to lock <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
at org.apache.spark.util.ShutdownHookManager$.removeShutdownHook(ShutdownHookManager.scala:199)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:712)
- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:684)
- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.SessionByKeyBenchmark$.main(SessionByKeyBenchmark.scala:108)
at org.apache.spark.streaming.SessionByKeyBenchmark.main(SessionByKeyBenchmark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
This PR just moved `ShutdownHookManager.removeShutdownHook` out of `synchronized` to avoid deadlock.
Author: zsxwing <zsxwing@gmail.com>
Closes #9116 from zsxwing/stop-deadlock.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 55 |
1 files changed, 31 insertions, 24 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9b2632c229..051f53de64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -694,32 +694,39 @@ class StreamingContext private[streaming] ( * @param stopGracefully if true, stops gracefully by waiting for the processing of all * received data to be completed */ - def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - try { - state match { - case INITIALIZED => - logWarning("StreamingContext has not been started yet") - case STOPPED => - logWarning("StreamingContext has already been stopped") - case ACTIVE => - scheduler.stop(stopGracefully) - // Removing the streamingSource to de-register the metrics on stop() - env.metricsSystem.removeSource(streamingSource) - uiTab.foreach(_.detach()) - StreamingContext.setActiveContext(null) - waiter.notifyStop() - if (shutdownHookRef != null) { - ShutdownHookManager.removeShutdownHook(shutdownHookRef) - } - logInfo("StreamingContext stopped successfully") + def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { + var shutdownHookRefToRemove: AnyRef = null + synchronized { + try { + state match { + case INITIALIZED => + logWarning("StreamingContext has not been started yet") + case STOPPED => + logWarning("StreamingContext has already been stopped") + case ACTIVE => + scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) + uiTab.foreach(_.detach()) + StreamingContext.setActiveContext(null) + waiter.notifyStop() + if (shutdownHookRef != null) { + shutdownHookRefToRemove = shutdownHookRef + shutdownHookRef = null + } + logInfo("StreamingContext stopped successfully") + } + } finally { + // The state should always be Stopped after calling `stop()`, even if we haven't started yet + state = STOPPED } - // Even if we have already stopped, we still need to attempt to stop the SparkContext because - // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). - if (stopSparkContext) sc.stop() - } finally { - // The state should always be Stopped after calling `stop()`, even if we haven't started yet - state = STOPPED } + if (shutdownHookRefToRemove != null) { + ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) + } + // Even if we have already stopped, we still need to attempt to stop the SparkContext because + // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). + if (stopSparkContext) sc.stop() } private def stopOnShutdown(): Unit = { |