diff options
author | tedyu <yuzhihong@gmail.com> | 2015-07-31 18:16:55 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-07-31 18:16:55 +0100 |
commit | 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc (patch) | |
tree | 331da7fe7038e80a98ef7f251db863b8bb18eb1d /core | |
parent | 04a49edfdb606c01fa4f8ae6e730ec4f9bd0cb6d (diff) | |
download | spark-27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc.tar.gz spark-27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc.tar.bz2 spark-27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc.zip |
[SPARK-9446] Clear Active SparkContext in stop() method
In thread 'stopped SparkContext remaining active' on mailing list, Andres observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: <address removed>
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to user since SparkContext.clearActiveContext() is not called.
Author: tedyu <yuzhihong@gmail.com>
Closes #7756 from tedyu/master and squashes the following commits:
7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 50 |
1 files changed, 37 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac6ac6c216..2d8aa25d81 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1689,33 +1689,57 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli Utils.removeShutdownHook(_shutdownHookRef) } - postApplicationEnd() - _ui.foreach(_.stop()) + Utils.tryLogNonFatalError { + postApplicationEnd() + } + Utils.tryLogNonFatalError { + _ui.foreach(_.stop()) + } if (env != null) { - env.metricsSystem.report() + Utils.tryLogNonFatalError { + env.metricsSystem.report() + } } if (metadataCleaner != null) { - metadataCleaner.cancel() + Utils.tryLogNonFatalError { + metadataCleaner.cancel() + } + } + Utils.tryLogNonFatalError { + _cleaner.foreach(_.stop()) + } + Utils.tryLogNonFatalError { + _executorAllocationManager.foreach(_.stop()) } - _cleaner.foreach(_.stop()) - _executorAllocationManager.foreach(_.stop()) if (_dagScheduler != null) { - _dagScheduler.stop() + Utils.tryLogNonFatalError { + _dagScheduler.stop() + } _dagScheduler = null } if (_listenerBusStarted) { - listenerBus.stop() - _listenerBusStarted = false + Utils.tryLogNonFatalError { + listenerBus.stop() + _listenerBusStarted = false + } + } + Utils.tryLogNonFatalError { + _eventLogger.foreach(_.stop()) } - _eventLogger.foreach(_.stop()) if (env != null && _heartbeatReceiver != null) { - env.rpcEnv.stop(_heartbeatReceiver) + Utils.tryLogNonFatalError { + env.rpcEnv.stop(_heartbeatReceiver) + } + } + Utils.tryLogNonFatalError { + _progressBar.foreach(_.stop()) } - _progressBar.foreach(_.stop()) _taskScheduler = null // TODO: Cache.stop()? if (_env != null) { - _env.stop() + Utils.tryLogNonFatalError { + _env.stop() + } SparkEnv.set(null) } SparkContext.clearActiveContext() |