From e4904d870a0e705a3a7d370320e6f8a5f23d5944 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 21 Apr 2016 11:51:04 -0700 Subject: [SPARK-14699][CORE] Stop endpoints before closing the connections and don't stop client in Outbox ## What changes were proposed in this pull request? In general, `onDisconnected` is for dealing with unexpected network disconnections. When RpcEnv.shutdown is called, the disconnections are expected so RpcEnv should not fire these events. This PR moves `dispatcher.stop()` above closing the connections so that when stopping RpcEnv, the endpoints won't receive `onDisconnected` events. In addition, Outbox should not close the client since it will be reused by others. This PR fixes it as well. ## How was this patch tested? test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") Author: Shixiong Zhu Closes #12481 from zsxwing/SPARK-14699. --- .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 6 ++--- .../scala/org/apache/spark/rpc/netty/Outbox.scala | 5 +--- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 28 +++++++++++++++++++++- 3 files changed, 31 insertions(+), 8 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7f2192e1f5..7d7b4c82fa 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -287,15 +287,15 @@ private[netty] class NettyRpcEnv( if (timeoutScheduler != null) { timeoutScheduler.shutdownNow() } + if (dispatcher != null) { + dispatcher.stop() + } if (server != null) { server.close() } if (clientFactory != null) { clientFactory.close() } - if (dispatcher != null) { - dispatcher.stop() - } if (clientConnectionExecutor != null) { clientConnectionExecutor.shutdownNow() } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 56499c639f..6c090ada5a 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -241,10 +241,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } private def closeClient(): Unit = synchronized { - // Not sure if `client.close` is idempotent. Just for safety. - if (client != null) { - client.close() - } + // Just set client to null. Don't close it in order to reuse the connection. client = null } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 73803ec21a..505cd476ff 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -29,7 +29,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files -import org.mockito.Mockito.{mock, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -844,6 +845,31 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") { + env.setupEndpoint("SPARK-14699", new RpcEndpoint { + override val rpcEnv: RpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(m) + } + }) + + val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0) + val endpoint = mock(classOf[RpcEndpoint]) + anotherEnv.setupEndpoint("SPARK-14699", endpoint) + + val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") + // Make sure the connect is set up + assert(ref.askWithRetry[String]("hello") === "hello") + anotherEnv.shutdown() + anotherEnv.awaitTermination() + + env.stop(ref) + + verify(endpoint).onStop() + verify(endpoint, never()).onDisconnected(any()) + verify(endpoint, never()).onNetworkError(any(), any()) + } } class UnserializableClass -- cgit v1.2.3