aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-12-02 13:44:01 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-12-02 13:44:01 -0800
commit452690ba1cc3c667bdd9f3022c43c9a10267880b (patch)
treece347137b6b47ecdb62131e3b2133cbab5fb958e /streaming
parenta1542ce2f33ad365ff437d2d3014b9de2f6670e5 (diff)
downloadspark-452690ba1cc3c667bdd9f3022c43c9a10267880b.tar.gz
spark-452690ba1cc3c667bdd9f3022c43c9a10267880b.tar.bz2
spark-452690ba1cc3c667bdd9f3022c43c9a10267880b.zip
[SPARK-12001] Allow partially-stopped StreamingContext to be completely stopped
If `StreamingContext.stop()` is interrupted midway through the call, the context will be marked as stopped but certain state will have not been cleaned up. Because `state = STOPPED` will be set, subsequent `stop()` calls will be unable to finish stopping the context, preventing any new StreamingContexts from being created. This patch addresses this issue by only marking the context as `STOPPED` once the `stop()` has successfully completed which allows `stop()` to be called a second time in order to finish stopping the context in case the original `stop()` call was interrupted. I discovered this issue by examining logs from a failed Jenkins run in which this race condition occurred in `FailureSuite`, leaking an unstoppable context and causing all subsequent tests to fail. Author: Josh Rosen <joshrosen@databricks.com> Closes #9982 from JoshRosen/SPARK-12001.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala49
1 files changed, 27 insertions, 22 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6fb8ad38ab..cf843e3e8b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -699,28 +699,33 @@ class StreamingContext private[streaming] (
" AsynchronousListenerBus")
}
synchronized {
- try {
- state match {
- case INITIALIZED =>
- logWarning("StreamingContext has not been started yet")
- case STOPPED =>
- logWarning("StreamingContext has already been stopped")
- case ACTIVE =>
- scheduler.stop(stopGracefully)
- // Removing the streamingSource to de-register the metrics on stop()
- env.metricsSystem.removeSource(streamingSource)
- uiTab.foreach(_.detach())
- StreamingContext.setActiveContext(null)
- waiter.notifyStop()
- if (shutdownHookRef != null) {
- shutdownHookRefToRemove = shutdownHookRef
- shutdownHookRef = null
- }
- logInfo("StreamingContext stopped successfully")
- }
- } finally {
- // The state should always be Stopped after calling `stop()`, even if we haven't started yet
- state = STOPPED
+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet
+ state match {
+ case INITIALIZED =>
+ logWarning("StreamingContext has not been started yet")
+ state = STOPPED
+ case STOPPED =>
+ logWarning("StreamingContext has already been stopped")
+ state = STOPPED
+ case ACTIVE =>
+ // It's important that we don't set state = STOPPED until the very end of this case,
+ // since we need to ensure that we're still able to call `stop()` to recover from
+ // a partially-stopped StreamingContext which resulted from this `stop()` call being
+ // interrupted. See SPARK-12001 for more details. Because the body of this case can be
+ // executed twice in the case of a partial stop, all methods called here need to be
+ // idempotent.
+ scheduler.stop(stopGracefully)
+ // Removing the streamingSource to de-register the metrics on stop()
+ env.metricsSystem.removeSource(streamingSource)
+ uiTab.foreach(_.detach())
+ StreamingContext.setActiveContext(null)
+ waiter.notifyStop()
+ if (shutdownHookRef != null) {
+ shutdownHookRefToRemove = shutdownHookRef
+ shutdownHookRef = null
+ }
+ logInfo("StreamingContext stopped successfully")
+ state = STOPPED
}
}
if (shutdownHookRefToRemove != null) {