aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-08 11:54:04 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-08 11:54:04 -0800
commit26432df9cc6ffe569583aa628c6ecd7050b38316 (patch)
treea5dd7d05716e88c44b791d267c40e058460089f0 /core/src/test/scala/org
parentc3d3a9d0e85b834abef87069e4edd27db87fc607 (diff)
downloadspark-26432df9cc6ffe569583aa628c6ecd7050b38316.tar.gz
spark-26432df9cc6ffe569583aa628c6ecd7050b38316.tar.bz2
spark-26432df9cc6ffe569583aa628c6ecd7050b38316.zip
[SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext
## What changes were proposed in this pull request? When `SparkContext.stop` is called in `Utils.tryOrStopSparkContext` (the following three places), it will cause deadlock because the `stop` method needs to wait for the thread running `stop` to exit. - ContextCleaner.keepCleaning - LiveListenerBus.listenerThread.run - TaskSchedulerImpl.start This PR adds `SparkContext.stopInNewThread` and uses it to eliminate the potential deadlock. I also removed my changes in #15775 since they are not necessary now. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16178 from zsxwing/fix-stop-deadlock.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala13
1 files changed, 0 insertions, 13 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index aa0705987d..acdf21df9a 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -870,19 +870,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
verify(endpoint, never()).onDisconnected(any())
verify(endpoint, never()).onNetworkError(any(), any())
}
-
- test("isInRPCThread") {
- val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint {
- override val rpcEnv = env
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case m => context.reply(rpcEnv.isInRPCThread)
- }
- })
- assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true)
- assert(env.isInRPCThread === false)
- env.stop(rpcEndpointRef)
- }
}
class UnserializableClass