diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-12-02 13:44:01 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-12-02 13:44:01 -0800 |
commit | 452690ba1cc3c667bdd9f3022c43c9a10267880b (patch) | |
tree | ce347137b6b47ecdb62131e3b2133cbab5fb958e /streaming | |
parent | a1542ce2f33ad365ff437d2d3014b9de2f6670e5 (diff) | |
download | spark-452690ba1cc3c667bdd9f3022c43c9a10267880b.tar.gz spark-452690ba1cc3c667bdd9f3022c43c9a10267880b.tar.bz2 spark-452690ba1cc3c667bdd9f3022c43c9a10267880b.zip |
[SPARK-12001] Allow partially-stopped StreamingContext to be completely stopped
If `StreamingContext.stop()` is interrupted midway through the call, the context will be marked as stopped but certain state will have not been cleaned up. Because `state = STOPPED` will be set, subsequent `stop()` calls will be unable to finish stopping the context, preventing any new StreamingContexts from being created.
This patch addresses this issue by only marking the context as `STOPPED` once the `stop()` has successfully completed which allows `stop()` to be called a second time in order to finish stopping the context in case the original `stop()` call was interrupted.
I discovered this issue by examining logs from a failed Jenkins run in which this race condition occurred in `FailureSuite`, leaking an unstoppable context and causing all subsequent tests to fail.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #9982 from JoshRosen/SPARK-12001.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 49 |
1 files changed, 27 insertions, 22 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6fb8ad38ab..cf843e3e8b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -699,28 +699,33 @@ class StreamingContext private[streaming] ( " AsynchronousListenerBus") } synchronized { - try { - state match { - case INITIALIZED => - logWarning("StreamingContext has not been started yet") - case STOPPED => - logWarning("StreamingContext has already been stopped") - case ACTIVE => - scheduler.stop(stopGracefully) - // Removing the streamingSource to de-register the metrics on stop() - env.metricsSystem.removeSource(streamingSource) - uiTab.foreach(_.detach()) - StreamingContext.setActiveContext(null) - waiter.notifyStop() - if (shutdownHookRef != null) { - shutdownHookRefToRemove = shutdownHookRef - shutdownHookRef = null - } - logInfo("StreamingContext stopped successfully") - } - } finally { - // The state should always be Stopped after calling `stop()`, even if we haven't started yet - state = STOPPED + // The state should always be Stopped after calling `stop()`, even if we haven't started yet + state match { + case INITIALIZED => + logWarning("StreamingContext has not been started yet") + state = STOPPED + case STOPPED => + logWarning("StreamingContext has already been stopped") + state = STOPPED + case ACTIVE => + // It's important that we don't set state = STOPPED until the very end of this case, + // since we need to ensure that we're still able to call `stop()` to recover from + // a partially-stopped StreamingContext which resulted from this `stop()` call being + // interrupted. See SPARK-12001 for more details. Because the body of this case can be + // executed twice in the case of a partial stop, all methods called here need to be + // idempotent. + scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) + uiTab.foreach(_.detach()) + StreamingContext.setActiveContext(null) + waiter.notifyStop() + if (shutdownHookRef != null) { + shutdownHookRefToRemove = shutdownHookRef + shutdownHookRef = null + } + logInfo("StreamingContext stopped successfully") + state = STOPPED } } if (shutdownHookRefToRemove != null) { |