diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-12-31 00:15:55 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-12-31 00:15:55 -0800 |
commit | 4f5a24d7e73104771f233af041eeba4f41675974 (patch) | |
tree | 57f3c139e679e476c97189ccc6d967919b3ed61d /yarn/src/main | |
parent | e6c77874b915691dead91e8d96ad9f58ba3a73db (diff) | |
download | spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.gz spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.bz2 spark-4f5a24d7e73104771f233af041eeba4f41675974.zip |
[SPARK-7995][SPARK-6280][CORE] Remove AkkaRpcEnv and remove systemName from setupEndpointRef
### Remove AkkaRpcEnv
Keep `SparkEnv.actorSystem` because Streaming still uses it. Will remove it and AkkaUtils after refactoring Streaming actorStream API.
### Remove systemName
There are 2 places using `systemName`:
* `RpcEnvConfig.name`. Actually, although it's used as `systemName` in `AkkaRpcEnv`, `NettyRpcEnv` uses it as the service name to output the log `Successfully started service *** on port ***`. Since the service name in log is useful, I keep `RpcEnvConfig.name`.
* `def setupEndpointRef(systemName: String, address: RpcAddress, endpointName: String)`. Each `ActorSystem` has a `systemName`. Akka requires `systemName` in its URI and will refuse a connection if `systemName` is not matched. However, `NettyRpcEnv` doesn't use it. So we can remove `systemName` from `setupEndpointRef` since we are removing `AkkaRpcEnv`.
### Remove RpcEnv.uriOf
`uriOf` exists because Akka uses different URI formats for with and without authentication, e.g., `akka.ssl.tcp...` and `akka.tcp://...`. But `NettyRpcEnv` uses the same format. So it's not necessary after removing `AkkaRpcEnv`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10459 from zsxwing/remove-akka-rpc-env.
Diffstat (limited to 'yarn/src/main')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a01bb267d7..cccc061647 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -29,8 +29,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, - SparkException, SparkUserAppException} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.rpc._ @@ -281,10 +280,10 @@ private[spark] class ApplicationMaster( .getOrElse("") val _sparkConf = if (sc != null) sc.getConf else sparkConf - val driverUrl = _rpcEnv.uriOf( - SparkEnv.driverActorSystemName, - RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME) + val driverUrl = RpcEndpointAddress( + _sparkConf.get("spark.driver.host"), + _sparkConf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString allocator = client.register(driverUrl, driverRef, yarnConf, @@ -310,7 +309,6 @@ private[spark] class ApplicationMaster( port: String, isClusterMode: Boolean): RpcEndpointRef = { val driverEndpoint = rpcEnv.setupEndpointRef( - SparkEnv.driverActorSystemName, RpcAddress(host, port.toInt), YarnSchedulerBackend.ENDPOINT_NAME) amEndpoint = |