diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-21 11:51:04 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-04-21 11:51:04 -0700 |
commit | e4904d870a0e705a3a7d370320e6f8a5f23d5944 (patch) | |
tree | f93943d5bc88283a3c23ca075d79f9520be24552 | |
parent | 3a21e8d5ed640e3f82946893e24c099aa723c169 (diff) | |
download | spark-e4904d870a0e705a3a7d370320e6f8a5f23d5944.tar.gz spark-e4904d870a0e705a3a7d370320e6f8a5f23d5944.tar.bz2 spark-e4904d870a0e705a3a7d370320e6f8a5f23d5944.zip |
[SPARK-14699][CORE] Stop endpoints before closing the connections and don't stop client in Outbox
## What changes were proposed in this pull request?
In general, `onDisconnected` is for dealing with unexpected network disconnections. When RpcEnv.shutdown is called, the disconnections are expected so RpcEnv should not fire these events.
This PR moves `dispatcher.stop()` above closing the connections so that when stopping RpcEnv, the endpoints won't receive `onDisconnected` events.
In addition, Outbox should not close the client since it will be reused by others. This PR fixes it as well.
## How was this patch tested?
test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events")
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12481 from zsxwing/SPARK-14699.
3 files changed, 31 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7f2192e1f5..7d7b4c82fa 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -287,15 +287,15 @@ private[netty] class NettyRpcEnv( if (timeoutScheduler != null) { timeoutScheduler.shutdownNow() } + if (dispatcher != null) { + dispatcher.stop() + } if (server != null) { server.close() } if (clientFactory != null) { clientFactory.close() } - if (dispatcher != null) { - dispatcher.stop() - } if (clientConnectionExecutor != null) { clientConnectionExecutor.shutdownNow() } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 56499c639f..6c090ada5a 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -241,10 +241,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } private def closeClient(): Unit = synchronized { - // Not sure if `client.close` is idempotent. Just for safety. - if (client != null) { - client.close() - } + // Just set client to null. Don't close it in order to reuse the connection. client = null } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 73803ec21a..505cd476ff 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -29,7 +29,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files -import org.mockito.Mockito.{mock, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -844,6 +845,31 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") { + env.setupEndpoint("SPARK-14699", new RpcEndpoint { + override val rpcEnv: RpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(m) + } + }) + + val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0) + val endpoint = mock(classOf[RpcEndpoint]) + anotherEnv.setupEndpoint("SPARK-14699", endpoint) + + val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") + // Make sure the connect is set up + assert(ref.askWithRetry[String]("hello") === "hello") + anotherEnv.shutdown() + anotherEnv.awaitTermination() + + env.stop(ref) + + verify(endpoint).onStop() + verify(endpoint, never()).onDisconnected(any()) + verify(endpoint, never()).onNetworkError(any(), any()) + } } class UnserializableClass |