From 4f5a24d7e73104771f233af041eeba4f41675974 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 31 Dec 2015 00:15:55 -0800 Subject: [SPARK-7995][SPARK-6280][CORE] Remove AkkaRpcEnv and remove systemName from setupEndpointRef ### Remove AkkaRpcEnv Keep `SparkEnv.actorSystem` because Streaming still uses it. Will remove it and AkkaUtils after refactoring Streaming actorStream API. ### Remove systemName There are 2 places using `systemName`: * `RpcEnvConfig.name`. Actually, although it's used as `systemName` in `AkkaRpcEnv`, `NettyRpcEnv` uses it as the service name to output the log `Successfully started service *** on port ***`. Since the service name in log is useful, I keep `RpcEnvConfig.name`. * `def setupEndpointRef(systemName: String, address: RpcAddress, endpointName: String)`. Each `ActorSystem` has a `systemName`. Akka requires `systemName` in its URI and will refuse a connection if `systemName` is not matched. However, `NettyRpcEnv` doesn't use it. So we can remove `systemName` from `setupEndpointRef` since we are removing `AkkaRpcEnv`. ### Remove RpcEnv.uriOf `uriOf` exists because Akka uses different URI formats for with and without authentication, e.g., `akka.ssl.tcp...` and `akka.tcp://...`. But `NettyRpcEnv` uses the same format. So it's not necessary after removing `AkkaRpcEnv`. Author: Shixiong Zhu Closes #10459 from zsxwing/remove-akka-rpc-env. --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a01bb267d7..cccc061647 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -29,8 +29,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, - SparkException, SparkUserAppException} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.rpc._ @@ -281,10 +280,10 @@ private[spark] class ApplicationMaster( .getOrElse("") val _sparkConf = if (sc != null) sc.getConf else sparkConf - val driverUrl = _rpcEnv.uriOf( - SparkEnv.driverActorSystemName, - RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME) + val driverUrl = RpcEndpointAddress( + _sparkConf.get("spark.driver.host"), + _sparkConf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString allocator = client.register(driverUrl, driverRef, yarnConf, @@ -310,7 +309,6 @@ private[spark] class ApplicationMaster( port: String, isClusterMode: Boolean): RpcEndpointRef = { val driverEndpoint = rpcEnv.setupEndpointRef( - SparkEnv.driverActorSystemName, RpcAddress(host, port.toInt), YarnSchedulerBackend.ENDPOINT_NAME) amEndpoint = -- cgit v1.2.3