diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 38 |
1 files changed, 18 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 5522b40782..89b6df76c2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -19,7 +19,6 @@ package org.apache.spark.rpc.netty import java.io._ import java.net.{InetSocketAddress, URI} import java.nio.ByteBuffer -import java.util.Arrays import java.util.concurrent._ import javax.annotation.concurrent.GuardedBy @@ -77,19 +76,19 @@ private[netty] class NettyRpcEnv( @volatile private var server: TransportServer = _ def start(port: Int): Unit = { - val bootstraps: Seq[TransportServerBootstrap] = + val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { - Seq(new SaslServerBootstrap(transportConf, securityManager)) + java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) } else { - Nil + java.util.Collections.emptyList() } - server = transportContext.createServer(port, bootstraps.asJava) + server = transportContext.createServer(port, bootstraps) dispatcher.registerRpcEndpoint(IDVerifier.NAME, new IDVerifier(this, dispatcher)) } override lazy val address: RpcAddress = { require(server != null, "NettyRpcEnv has not yet started") - RpcAddress(host, server.getPort()) + RpcAddress(host, server.getPort) } override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { @@ -119,7 +118,7 @@ private[netty] class NettyRpcEnv( val remoteAddr = message.receiver.address if (remoteAddr == address) { val promise = Promise[Any]() - dispatcher.postMessage(message, promise) + dispatcher.postLocalMessage(message, promise) promise.future.onComplete { case Success(response) => val ack = response.asInstanceOf[Ack] @@ -148,10 +147,9 @@ private[netty] class NettyRpcEnv( } }) } catch { - case e: RejectedExecutionException => { + case e: RejectedExecutionException => // `send` after shutting clientConnectionExecutor down, ignore it - logWarning(s"Cannot send ${message} because RpcEnv is stopped") - } + logWarning(s"Cannot send $message because RpcEnv is stopped") } } } @@ -161,7 +159,7 @@ private[netty] class NettyRpcEnv( val remoteAddr = message.receiver.address if (remoteAddr == address) { val p = Promise[Any]() - dispatcher.postMessage(message, p) + dispatcher.postLocalMessage(message, p) p.future.onComplete { case Success(response) => val reply = response.asInstanceOf[AskResponse] @@ -218,7 +216,7 @@ private[netty] class NettyRpcEnv( private[netty] def serialize(content: Any): Array[Byte] = { val buffer = javaSerializerInstance.serialize(content) - Arrays.copyOfRange( + java.util.Arrays.copyOfRange( buffer.array(), buffer.arrayOffset + buffer.position, buffer.arrayOffset + buffer.limit) } @@ -425,7 +423,7 @@ private[netty] class NettyRpcHandler( assert(addr != null) val remoteEnvAddress = requestMessage.senderAddress val clientAddr = RpcAddress(addr.getHostName, addr.getPort) - val broadcastMessage = + val broadcastMessage: Option[RemoteProcessConnected] = synchronized { // If the first connection to a remote RpcEnv is found, we should broadcast "Associated" if (remoteAddresses.put(clientAddr, remoteEnvAddress).isEmpty) { @@ -435,7 +433,7 @@ private[netty] class NettyRpcHandler( remoteConnectionCount.put(remoteEnvAddress, count + 1) if (count == 0) { // This is the first connection, so fire "Associated" - Some(Associated(remoteEnvAddress)) + Some(RemoteProcessConnected(remoteEnvAddress)) } else { None } @@ -443,8 +441,8 @@ private[netty] class NettyRpcHandler( None } } - broadcastMessage.foreach(dispatcher.broadcastMessage) - dispatcher.postMessage(requestMessage, callback) + broadcastMessage.foreach(dispatcher.postToAll) + dispatcher.postRemoteMessage(requestMessage, callback) } override def getStreamManager: StreamManager = new OneForOneStreamManager @@ -455,12 +453,12 @@ private[netty] class NettyRpcHandler( val clientAddr = RpcAddress(addr.getHostName, addr.getPort) val broadcastMessage = synchronized { - remoteAddresses.get(clientAddr).map(AssociationError(cause, _)) + remoteAddresses.get(clientAddr).map(RemoteProcessConnectionError(cause, _)) } if (broadcastMessage.isEmpty) { logError(cause.getMessage, cause) } else { - dispatcher.broadcastMessage(broadcastMessage.get) + dispatcher.postToAll(broadcastMessage.get) } } else { // If the channel is closed before connecting, its remoteAddress will be null. @@ -485,7 +483,7 @@ private[netty] class NettyRpcHandler( if (count - 1 == 0) { // We lost all clients, so clean up and fire "Disassociated" remoteConnectionCount.remove(remoteEnvAddress) - Some(Disassociated(remoteEnvAddress)) + Some(RemoteProcessDisconnected(remoteEnvAddress)) } else { // Decrease the connection number of remoteEnvAddress remoteConnectionCount.put(remoteEnvAddress, count - 1) @@ -493,7 +491,7 @@ private[netty] class NettyRpcHandler( } } } - broadcastMessage.foreach(dispatcher.broadcastMessage) + broadcastMessage.foreach(dispatcher.postToAll) } else { // If the channel is closed before connecting, its remoteAddress will be null. In this case, // we can ignore it since we don't fire "Associated". |