aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-02 10:26:36 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-02 10:26:36 -0800
commit71d1c907dec446db566b19f912159fd8f46deb7d (patch)
tree8201803d422933421b5af731d61ef7dcd54ca6da /core/src/test
parenta930e624eb9feb0f7d37d99dcb8178feb9c0f177 (diff)
downloadspark-71d1c907dec446db566b19f912159fd8f46deb7d.tar.gz
spark-71d1c907dec446db566b19f912159fd8f46deb7d.tar.bz2
spark-71d1c907dec446db566b19f912159fd8f46deb7d.zip
[SPARK-10997][CORE] Add "client mode" to netty rpc env.
"Client mode" means the RPC env will not listen for incoming connections. This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets needed by the app and also the number of open ports. Client connections are also preferred when endpoints that actually have a listening socket are involved; so, for example, if a Worker connects to a Master and the Master needs to send a message to a Worker endpoint, that client connection will be used, even though the Worker is also listening for incoming connections. With this change, the workaround for SPARK-10987 isn't necessary anymore, and is removed. The AM connects to the driver in "client mode", and that connection is used for all driver <-> AM communication, and so the AM is properly notified when the connection goes down. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9210 from vanzin/SPARK-10997.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala8
5 files changed, 56 insertions, 29 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 3bead6395d..834e4743df 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -48,7 +48,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
- def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+ def createRpcEnv(conf: SparkConf, name: String, port: Int, clientMode: Boolean = false): RpcEnv
test("send a message locally") {
@volatile var message: String = null
@@ -76,7 +76,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely")
try {
@@ -130,7 +130,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
try {
@@ -158,7 +158,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
- val anotherEnv = createRpcEnv(conf, "remote", 13345)
+ val anotherEnv = createRpcEnv(conf, "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
@@ -417,7 +417,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
try {
@@ -457,7 +457,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-remotely-error")
@@ -497,26 +497,40 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "network-events")
val remoteAddress = anotherEnv.address
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(5 millis)) {
- assert(events === List(("onConnected", remoteAddress)))
+ // anotherEnv is connected in client mode, so the remote address may be unknown depending on
+ // the implementation. Account for that when doing checks.
+ if (remoteAddress != null) {
+ assert(events === List(("onConnected", remoteAddress)))
+ } else {
+ assert(events.size === 1)
+ assert(events(0)._1 === "onConnected")
+ }
}
anotherEnv.shutdown()
anotherEnv.awaitTermination()
eventually(timeout(5 seconds), interval(5 millis)) {
- assert(events === List(
- ("onConnected", remoteAddress),
- ("onNetworkError", remoteAddress),
- ("onDisconnected", remoteAddress)) ||
- events === List(
- ("onConnected", remoteAddress),
- ("onDisconnected", remoteAddress)))
+ // Account for anotherEnv not having an address due to running in client mode.
+ if (remoteAddress != null) {
+ assert(events === List(
+ ("onConnected", remoteAddress),
+ ("onNetworkError", remoteAddress),
+ ("onDisconnected", remoteAddress)) ||
+ events === List(
+ ("onConnected", remoteAddress),
+ ("onDisconnected", remoteAddress)))
+ } else {
+ val eventNames = events.map(_._1)
+ assert(eventNames === List("onConnected", "onNetworkError", "onDisconnected") ||
+ eventNames === List("onConnected", "onDisconnected"))
+ }
}
}
@@ -529,7 +543,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345)
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-unserializable-error")
@@ -558,7 +572,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.authenticate.secret", "good")
val localEnv = createRpcEnv(conf, "authentication-local", 13345)
- val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345)
+ val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345, clientMode = true)
try {
@volatile var message: String = null
@@ -589,7 +603,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.authenticate.secret", "good")
val localEnv = createRpcEnv(conf, "authentication-local", 13345)
- val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345)
+ val remoteEnv = createRpcEnv(conf, "authentication-remote", 14345, clientMode = true)
try {
localEnv.setupEndpoint("ask-authentication", new RpcEndpoint {
diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
index 4aa75c9230..6478ab51c4 100644
--- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
@@ -22,9 +22,12 @@ import org.apache.spark.{SSLSampleConfigs, SecurityManager, SparkConf}
class AkkaRpcEnvSuite extends RpcEnvSuite {
- override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {
+ override def createRpcEnv(conf: SparkConf,
+ name: String,
+ port: Int,
+ clientMode: Boolean = false): RpcEnv = {
new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, name, "localhost", port, new SecurityManager(conf)))
+ RpcEnvConfig(conf, name, "localhost", port, new SecurityManager(conf), clientMode))
}
test("setupEndpointRef: systemName, address, endpointName") {
@@ -37,7 +40,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
})
val conf = new SparkConf()
val newRpcEnv = new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
+ RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf), false))
try {
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint")
assert(s"akka.tcp://local@${env.address}/user/test_endpoint" ===
@@ -56,7 +59,7 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
val conf = SSLSampleConfigs.sparkSSLConfig()
val securityManager = new SecurityManager(conf)
val rpcEnv = new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, "test", "localhost", 12346, securityManager))
+ RpcEnvConfig(conf, "test", "localhost", 12346, securityManager, false))
try {
val uri = rpcEnv.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint")
assert("akka.ssl.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri)
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
index 973a07a0bd..56743ba650 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
@@ -22,8 +22,13 @@ import org.apache.spark.SparkFunSuite
class NettyRpcAddressSuite extends SparkFunSuite {
test("toString") {
- val addr = RpcEndpointAddress("localhost", 12345, "test")
+ val addr = new RpcEndpointAddress("localhost", 12345, "test")
assert(addr.toString === "spark://test@localhost:12345")
}
+ test("toString for client mode") {
+ val addr = RpcEndpointAddress(null, "test")
+ assert(addr.toString === "spark-client://test")
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index be19668e17..ce83087ec0 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -22,8 +22,13 @@ import org.apache.spark.rpc._
class NettyRpcEnvSuite extends RpcEnvSuite {
- override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {
- val config = RpcEnvConfig(conf, "test", "localhost", port, new SecurityManager(conf))
+ override def createRpcEnv(
+ conf: SparkConf,
+ name: String,
+ port: Int,
+ clientMode: Boolean = false): RpcEnv = {
+ val config = RpcEnvConfig(conf, "test", "localhost", port, new SecurityManager(conf),
+ clientMode)
new NettyRpcEnvFactory().create(config)
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
index 5430e4c0c4..f9d8e80c98 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.rpc._
class NettyRpcHandlerSuite extends SparkFunSuite {
val env = mock(classOf[NettyRpcEnv])
- when(env.deserialize(any(classOf[Array[Byte]]))(any())).
+ when(env.deserialize(any(classOf[TransportClient]), any(classOf[Array[Byte]]))(any())).
thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null, false))
test("receive") {
@@ -42,7 +42,7 @@ class NettyRpcHandlerSuite extends SparkFunSuite {
when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000))
nettyRpcHandler.receive(client, null, null)
- verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 12345)))
+ verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 40000)))
}
test("connectionTerminated") {
@@ -57,9 +57,9 @@ class NettyRpcHandlerSuite extends SparkFunSuite {
when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000))
nettyRpcHandler.connectionTerminated(client)
- verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 12345)))
+ verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 40000)))
verify(dispatcher, times(1)).postToAll(
- RemoteProcessDisconnected(RpcAddress("localhost", 12345)))
+ RemoteProcessDisconnected(RpcAddress("localhost", 40000)))
}
}