diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-08 11:54:04 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-08 11:54:04 -0800 |
commit | 26432df9cc6ffe569583aa628c6ecd7050b38316 (patch) | |
tree | a5dd7d05716e88c44b791d267c40e058460089f0 /core/src/test/scala/org | |
parent | c3d3a9d0e85b834abef87069e4edd27db87fc607 (diff) | |
download | spark-26432df9cc6ffe569583aa628c6ecd7050b38316.tar.gz spark-26432df9cc6ffe569583aa628c6ecd7050b38316.tar.bz2 spark-26432df9cc6ffe569583aa628c6ecd7050b38316.zip |
[SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext
## What changes were proposed in this pull request?
When `SparkContext.stop` is called in `Utils.tryOrStopSparkContext` (the following three places), it will cause deadlock because the `stop` method needs to wait for the thread running `stop` to exit.
- ContextCleaner.keepCleaning
- LiveListenerBus.listenerThread.run
- TaskSchedulerImpl.start
This PR adds `SparkContext.stopInNewThread` and uses it to eliminate the potential deadlock. I also removed my changes in #15775 since they are not necessary now.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16178 from zsxwing/fix-stop-deadlock.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala | 13 |
1 files changed, 0 insertions, 13 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index aa0705987d..acdf21df9a 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,19 +870,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } - - test("isInRPCThread") { - val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { - override val rpcEnv = env - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case m => context.reply(rpcEnv.isInRPCThread) - } - }) - assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) - assert(env.isInRPCThread === false) - env.stop(rpcEndpointRef) - } } class UnserializableClass |