diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-11-08 13:14:56 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-08 13:14:56 -0800 |
commit | b6de0c98c70960a97b07615b0b08fbd8f900fbe7 (patch) | |
tree | 9a541b2d10a0ae2f6721214c784e06611d81a944 /core/src/test/scala | |
parent | 26e1c53aceee37e3687a372ff6c6f05463fd8a94 (diff) | |
download | spark-b6de0c98c70960a97b07615b0b08fbd8f900fbe7.tar.gz spark-b6de0c98c70960a97b07615b0b08fbd8f900fbe7.tar.bz2 spark-b6de0c98c70960a97b07615b0b08fbd8f900fbe7.zip |
[SPARK-18280][CORE] Fix potential deadlock in `StandaloneSchedulerBackend.dead`
## What changes were proposed in this pull request?
"StandaloneSchedulerBackend.dead" is called in a RPC thread, so it should not call "SparkContext.stop" in the same thread. "SparkContext.stop" will block until all RPC threads exit, if it's called inside a RPC thread, it will be dead-lock.
This PR add a thread local flag inside RPC threads. `SparkContext.stop` uses it to decide if launching a new thread to stop the SparkContext.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #15775 from zsxwing/SPARK-18280.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acdf21df9a..aa0705987d 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } + + test("isInRPCThread") { + val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(rpcEnv.isInRPCThread) + } + }) + assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) + assert(env.isInRPCThread === false) + env.stop(rpcEndpointRef) + } } class UnserializableClass |