aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-12 21:58:55 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-12 21:58:55 -0800
commit8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8 (patch)
tree6ba020187d6514b3b13ae7b45452845d27e93dfd
parent98b212d36b34ab490c391ea2adf5b141e4fb9289 (diff)
downloadspark-8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8.tar.gz
spark-8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8.tar.bz2
spark-8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8.zip
[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message
Author: Shixiong Zhu <shixiong@databricks.com> Closes #10261 from zsxwing/SPARK-12267.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala1
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala42
4 files changed, 65 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b7111..7e2cf956c7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE
+ appUIUrlAtHistoryServer = None
}
private def newExecutorId(useID: Option[Int] = None): Int = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff59f..f41efb097b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
- args.memory, args.masters, args.workDir)
+ args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68c5f44145..f82fd4eb57 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler(
// A variable to track whether we should dispatch the RemoteProcessConnected message.
private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
+ // A variable to track the remote RpcEnv addresses of all clients
+ private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
override def receive(
client: TransportClient,
message: ByteBuffer,
@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler(
// Create a new message with the socket address of the client as the sender.
RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)
} else {
+ // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for
+ // the listening address
+ val remoteEnvAddress = requestMessage.senderAddress
+ if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+ dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+ }
requestMessage
}
}
@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler(
if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+ // If the remove RpcEnv listens to some address, we should also fire a
+ // RemoteProcessConnectionError for the remote RpcEnv listening address
+ val remoteEnvAddress = remoteAddresses.get(clientAddr)
+ if (remoteEnvAddress != null) {
+ dispatcher.postToAll(RemoteProcessConnectionError(cause, remoteEnvAddress))
+ }
} else {
// If the channel is closed before connecting, its remoteAddress will be null.
// See java.net.Socket.getRemoteSocketAddress
@@ -606,6 +621,12 @@ private[netty] class NettyRpcHandler(
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
nettyEnv.removeOutbox(clientAddr)
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
+ val remoteEnvAddress = remoteAddresses.remove(clientAddr)
+ // If the remove RpcEnv listens to some address, we should also fire a
+ // RemoteProcessDisconnected for the remote RpcEnv listening address
+ if (remoteEnvAddress != null) {
+ dispatcher.postToAll(RemoteProcessDisconnected(remoteEnvAddress))
+ }
} else {
// If the channel is closed before connecting, its remoteAddress will be null. In this case,
// we can ignore it since we don't fire "Associated".
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index a61d0479aa..6d153eb04e 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -545,6 +545,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("network events between non-client-mode RpcEnvs") {
+ val events = new mutable.ArrayBuffer[(Any, Any)] with mutable.SynchronizedBuffer[(Any, Any)]
+ env.setupEndpoint("network-events-non-client", new ThreadSafeRpcEndpoint {
+ override val rpcEnv = env
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case "hello" =>
+ case m => events += "receive" -> m
+ }
+
+ override def onConnected(remoteAddress: RpcAddress): Unit = {
+ events += "onConnected" -> remoteAddress
+ }
+
+ override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+ events += "onDisconnected" -> remoteAddress
+ }
+
+ override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
+ events += "onNetworkError" -> remoteAddress
+ }
+
+ })
+
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = false)
+ // Use anotherEnv to find out the RpcEndpointRef
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(
+ "local", env.address, "network-events-non-client")
+ val remoteAddress = anotherEnv.address
+ rpcEndpointRef.send("hello")
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(events.contains(("onConnected", remoteAddress)))
+ }
+
+ anotherEnv.shutdown()
+ anotherEnv.awaitTermination()
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(events.contains(("onConnected", remoteAddress)))
+ assert(events.contains(("onDisconnected", remoteAddress)))
+ }
+ }
+
test("sendWithReply: unserializable error") {
env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
override val rpcEnv = env