aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorjayadevanmurali <jayadevan.m@tcs.com>2016-01-23 11:48:48 +0000
committerSean Owen <sowen@cloudera.com>2016-01-23 11:48:48 +0000
commit5f56980127704d3c2877d0d0b5047791c00fdac9 (patch)
tree2b8b0086b47cac69b40c8fae710ed635bbb35ac8 /streaming/src/main
parentaca2a0165405b9eba27ac5e4739e36a618b96676 (diff)
downloadspark-5f56980127704d3c2877d0d0b5047791c00fdac9.tar.gz
spark-5f56980127704d3c2877d0d0b5047791c00fdac9.tar.bz2
spark-5f56980127704d3c2877d0d0b5047791c00fdac9.zip
[SPARK-11137][STREAMING] Make StreamingContext.stop() exception-safe
Make StreamingContext.stop() exception-safe Author: jayadevanmurali <jayadevan.m@tcs.com> Closes #10807 from jayadevanmurali/branch-0.1-SPARK-11137.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala16
1 files changed, 12 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ec57c05e3b..32bea88ec6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -693,12 +693,20 @@ class StreamingContext private[streaming] (
// interrupted. See SPARK-12001 for more details. Because the body of this case can be
// executed twice in the case of a partial stop, all methods called here need to be
// idempotent.
- scheduler.stop(stopGracefully)
+ Utils.tryLogNonFatalError {
+ scheduler.stop(stopGracefully)
+ }
// Removing the streamingSource to de-register the metrics on stop()
- env.metricsSystem.removeSource(streamingSource)
- uiTab.foreach(_.detach())
+ Utils.tryLogNonFatalError {
+ env.metricsSystem.removeSource(streamingSource)
+ }
+ Utils.tryLogNonFatalError {
+ uiTab.foreach(_.detach())
+ }
StreamingContext.setActiveContext(null)
- waiter.notifyStop()
+ Utils.tryLogNonFatalError {
+ waiter.notifyStop()
+ }
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null