aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-31 00:15:55 -0800
committerReynold Xin <rxin@databricks.com>2015-12-31 00:15:55 -0800
commit4f5a24d7e73104771f233af041eeba4f41675974 (patch)
tree57f3c139e679e476c97189ccc6d967919b3ed61d /yarn
parente6c77874b915691dead91e8d96ad9f58ba3a73db (diff)
downloadspark-4f5a24d7e73104771f233af041eeba4f41675974.tar.gz
spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.bz2
spark-4f5a24d7e73104771f233af041eeba4f41675974.zip
[SPARK-7995][SPARK-6280][CORE] Remove AkkaRpcEnv and remove systemName from setupEndpointRef
### Remove AkkaRpcEnv Keep `SparkEnv.actorSystem` because Streaming still uses it. Will remove it and AkkaUtils after refactoring Streaming actorStream API. ### Remove systemName There are 2 places using `systemName`: * `RpcEnvConfig.name`. Actually, although it's used as `systemName` in `AkkaRpcEnv`, `NettyRpcEnv` uses it as the service name to output the log `Successfully started service *** on port ***`. Since the service name in log is useful, I keep `RpcEnvConfig.name`. * `def setupEndpointRef(systemName: String, address: RpcAddress, endpointName: String)`. Each `ActorSystem` has a `systemName`. Akka requires `systemName` in its URI and will refuse a connection if `systemName` is not matched. However, `NettyRpcEnv` doesn't use it. So we can remove `systemName` from `setupEndpointRef` since we are removing `AkkaRpcEnv`. ### Remove RpcEnv.uriOf `uriOf` exists because Akka uses different URI formats for with and without authentication, e.g., `akka.ssl.tcp...` and `akka.tcp://...`. But `NettyRpcEnv` uses the same format. So it's not necessary after removing `AkkaRpcEnv`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10459 from zsxwing/remove-akka-rpc-env.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala12
1 files changed, 5 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a01bb267d7..cccc061647 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -29,8 +29,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
- SparkException, SparkUserAppException}
+import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.rpc._
@@ -281,10 +280,10 @@ private[spark] class ApplicationMaster(
.getOrElse("")
val _sparkConf = if (sc != null) sc.getConf else sparkConf
- val driverUrl = _rpcEnv.uriOf(
- SparkEnv.driverActorSystemName,
- RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+ val driverUrl = RpcEndpointAddress(
+ _sparkConf.get("spark.driver.host"),
+ _sparkConf.get("spark.driver.port").toInt,
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
allocator = client.register(driverUrl,
driverRef,
yarnConf,
@@ -310,7 +309,6 @@ private[spark] class ApplicationMaster(
port: String,
isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
- SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =