diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-12-31 00:15:55 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-12-31 00:15:55 -0800 |
commit | 4f5a24d7e73104771f233af041eeba4f41675974 (patch) | |
tree | 57f3c139e679e476c97189ccc6d967919b3ed61d /core/src/main/scala/org/apache/spark/SparkEnv.scala | |
parent | e6c77874b915691dead91e8d96ad9f58ba3a73db (diff) | |
download | spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.gz spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.bz2 spark-4f5a24d7e73104771f233af041eeba4f41675974.zip |
[SPARK-7995][SPARK-6280][CORE] Remove AkkaRpcEnv and remove systemName from setupEndpointRef
### Remove AkkaRpcEnv
Keep `SparkEnv.actorSystem` because Streaming still uses it. Will remove it and AkkaUtils after refactoring Streaming actorStream API.
### Remove systemName
There are 2 places using `systemName`:
* `RpcEnvConfig.name`. Actually, although it's used as `systemName` in `AkkaRpcEnv`, `NettyRpcEnv` uses it as the service name to output the log `Successfully started service *** on port ***`. Since the service name in log is useful, I keep `RpcEnvConfig.name`.
* `def setupEndpointRef(systemName: String, address: RpcAddress, endpointName: String)`. Each `ActorSystem` has a `systemName`. Akka requires `systemName` in its URI and will refuse a connection if `systemName` is not matched. However, `NettyRpcEnv` doesn't use it. So we can remove `systemName` from `setupEndpointRef` since we are removing `AkkaRpcEnv`.
### Remove RpcEnv.uriOf
`uriOf` exists because Akka uses different URI formats for with and without authentication, e.g., `akka.ssl.tcp...` and `akka.tcp://...`. But `NettyRpcEnv` uses the same format. So it's not necessary after removing `AkkaRpcEnv`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10459 from zsxwing/remove-akka-rpc-env.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 12 |
1 files changed, 3 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 52acde1b41..b98cc964ed 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -34,7 +34,6 @@ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemor import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv} -import org.apache.spark.rpc.akka.AkkaRpcEnv import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus} import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint import org.apache.spark.serializer.Serializer @@ -97,9 +96,7 @@ class SparkEnv ( blockManager.master.stop() metricsSystem.stop() outputCommitCoordinator.stop() - if (!rpcEnv.isInstanceOf[AkkaRpcEnv]) { - actorSystem.shutdown() - } + actorSystem.shutdown() rpcEnv.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut @@ -248,14 +245,11 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) - // Create the ActorSystem for Akka and get the port it binds to. val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName + // Create the ActorSystem for Akka and get the port it binds to. val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver) - val actorSystem: ActorSystem = - if (rpcEnv.isInstanceOf[AkkaRpcEnv]) { - rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem - } else { + val actorSystem: ActorSystem = { val actorSystemPort = if (port == 0 || rpcEnv.address == null) { port |