From 71d1c907dec446db566b19f912159fd8f46deb7d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Nov 2015 10:26:36 -0800 Subject: [SPARK-10997][CORE] Add "client mode" to netty rpc env. "Client mode" means the RPC env will not listen for incoming connections. This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets needed by the app and also the number of open ports. Client connections are also preferred when endpoints that actually have a listening socket are involved; so, for example, if a Worker connects to a Master and the Master needs to send a message to a Worker endpoint, that client connection will be used, even though the Worker is also listening for incoming connections. With this change, the workaround for SPARK-10987 isn't necessary anymore, and is removed. The AM connects to the driver in "client mode", and that connection is used for all driver <-> AM communication, and so the AM is properly notified when the connection goes down. Author: Marcelo Vanzin Closes #9210 from vanzin/SPARK-10997. --- .../main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c6a6d7ac56..12ae350e4c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -321,7 +321,8 @@ private[spark] class ApplicationMaster( private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { val port = sparkConf.getInt("spark.yarn.am.port", 0) - rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr) + rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr, + clientMode = true) val driverRef = waitForSparkDriver() addAmIpFilter() registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) @@ -574,9 +575,6 @@ private[spark] class ApplicationMaster( case x: AddWebUIFilter => logInfo(s"Add WebUI Filter. $x") driver.send(x) - - case DriverHello => - // SPARK-10987: no action needed for this message. } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { -- cgit v1.2.3