aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala38
1 files changed, 18 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 5522b40782..89b6df76c2 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -19,7 +19,6 @@ package org.apache.spark.rpc.netty
import java.io._
import java.net.{InetSocketAddress, URI}
import java.nio.ByteBuffer
-import java.util.Arrays
import java.util.concurrent._
import javax.annotation.concurrent.GuardedBy
@@ -77,19 +76,19 @@ private[netty] class NettyRpcEnv(
@volatile private var server: TransportServer = _
def start(port: Int): Unit = {
- val bootstraps: Seq[TransportServerBootstrap] =
+ val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
- Seq(new SaslServerBootstrap(transportConf, securityManager))
+ java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
} else {
- Nil
+ java.util.Collections.emptyList()
}
- server = transportContext.createServer(port, bootstraps.asJava)
+ server = transportContext.createServer(port, bootstraps)
dispatcher.registerRpcEndpoint(IDVerifier.NAME, new IDVerifier(this, dispatcher))
}
override lazy val address: RpcAddress = {
require(server != null, "NettyRpcEnv has not yet started")
- RpcAddress(host, server.getPort())
+ RpcAddress(host, server.getPort)
}
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
@@ -119,7 +118,7 @@ private[netty] class NettyRpcEnv(
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
val promise = Promise[Any]()
- dispatcher.postMessage(message, promise)
+ dispatcher.postLocalMessage(message, promise)
promise.future.onComplete {
case Success(response) =>
val ack = response.asInstanceOf[Ack]
@@ -148,10 +147,9 @@ private[netty] class NettyRpcEnv(
}
})
} catch {
- case e: RejectedExecutionException => {
+ case e: RejectedExecutionException =>
// `send` after shutting clientConnectionExecutor down, ignore it
- logWarning(s"Cannot send ${message} because RpcEnv is stopped")
- }
+ logWarning(s"Cannot send $message because RpcEnv is stopped")
}
}
}
@@ -161,7 +159,7 @@ private[netty] class NettyRpcEnv(
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
val p = Promise[Any]()
- dispatcher.postMessage(message, p)
+ dispatcher.postLocalMessage(message, p)
p.future.onComplete {
case Success(response) =>
val reply = response.asInstanceOf[AskResponse]
@@ -218,7 +216,7 @@ private[netty] class NettyRpcEnv(
private[netty] def serialize(content: Any): Array[Byte] = {
val buffer = javaSerializerInstance.serialize(content)
- Arrays.copyOfRange(
+ java.util.Arrays.copyOfRange(
buffer.array(), buffer.arrayOffset + buffer.position, buffer.arrayOffset + buffer.limit)
}
@@ -425,7 +423,7 @@ private[netty] class NettyRpcHandler(
assert(addr != null)
val remoteEnvAddress = requestMessage.senderAddress
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
- val broadcastMessage =
+ val broadcastMessage: Option[RemoteProcessConnected] =
synchronized {
// If the first connection to a remote RpcEnv is found, we should broadcast "Associated"
if (remoteAddresses.put(clientAddr, remoteEnvAddress).isEmpty) {
@@ -435,7 +433,7 @@ private[netty] class NettyRpcHandler(
remoteConnectionCount.put(remoteEnvAddress, count + 1)
if (count == 0) {
// This is the first connection, so fire "Associated"
- Some(Associated(remoteEnvAddress))
+ Some(RemoteProcessConnected(remoteEnvAddress))
} else {
None
}
@@ -443,8 +441,8 @@ private[netty] class NettyRpcHandler(
None
}
}
- broadcastMessage.foreach(dispatcher.broadcastMessage)
- dispatcher.postMessage(requestMessage, callback)
+ broadcastMessage.foreach(dispatcher.postToAll)
+ dispatcher.postRemoteMessage(requestMessage, callback)
}
override def getStreamManager: StreamManager = new OneForOneStreamManager
@@ -455,12 +453,12 @@ private[netty] class NettyRpcHandler(
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val broadcastMessage =
synchronized {
- remoteAddresses.get(clientAddr).map(AssociationError(cause, _))
+ remoteAddresses.get(clientAddr).map(RemoteProcessConnectionError(cause, _))
}
if (broadcastMessage.isEmpty) {
logError(cause.getMessage, cause)
} else {
- dispatcher.broadcastMessage(broadcastMessage.get)
+ dispatcher.postToAll(broadcastMessage.get)
}
} else {
// If the channel is closed before connecting, its remoteAddress will be null.
@@ -485,7 +483,7 @@ private[netty] class NettyRpcHandler(
if (count - 1 == 0) {
// We lost all clients, so clean up and fire "Disassociated"
remoteConnectionCount.remove(remoteEnvAddress)
- Some(Disassociated(remoteEnvAddress))
+ Some(RemoteProcessDisconnected(remoteEnvAddress))
} else {
// Decrease the connection number of remoteEnvAddress
remoteConnectionCount.put(remoteEnvAddress, count - 1)
@@ -493,7 +491,7 @@ private[netty] class NettyRpcHandler(
}
}
}
- broadcastMessage.foreach(dispatcher.broadcastMessage)
+ broadcastMessage.foreach(dispatcher.postToAll)
} else {
// If the channel is closed before connecting, its remoteAddress will be null. In this case,
// we can ignore it since we don't fire "Associated".