aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkEnv.scala
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-31 00:15:55 -0800
committerReynold Xin <rxin@databricks.com>2015-12-31 00:15:55 -0800
commit4f5a24d7e73104771f233af041eeba4f41675974 (patch)
tree57f3c139e679e476c97189ccc6d967919b3ed61d /core/src/main/scala/org/apache/spark/SparkEnv.scala
parente6c77874b915691dead91e8d96ad9f58ba3a73db (diff)
downloadspark-4f5a24d7e73104771f233af041eeba4f41675974.tar.gz
spark-4f5a24d7e73104771f233af041eeba4f41675974.tar.bz2
spark-4f5a24d7e73104771f233af041eeba4f41675974.zip
[SPARK-7995][SPARK-6280][CORE] Remove AkkaRpcEnv and remove systemName from setupEndpointRef
### Remove AkkaRpcEnv Keep `SparkEnv.actorSystem` because Streaming still uses it. Will remove it and AkkaUtils after refactoring Streaming actorStream API. ### Remove systemName There are 2 places using `systemName`: * `RpcEnvConfig.name`. Actually, although it's used as `systemName` in `AkkaRpcEnv`, `NettyRpcEnv` uses it as the service name to output the log `Successfully started service *** on port ***`. Since the service name in log is useful, I keep `RpcEnvConfig.name`. * `def setupEndpointRef(systemName: String, address: RpcAddress, endpointName: String)`. Each `ActorSystem` has a `systemName`. Akka requires `systemName` in its URI and will refuse a connection if `systemName` is not matched. However, `NettyRpcEnv` doesn't use it. So we can remove `systemName` from `setupEndpointRef` since we are removing `AkkaRpcEnv`. ### Remove RpcEnv.uriOf `uriOf` exists because Akka uses different URI formats for with and without authentication, e.g., `akka.ssl.tcp...` and `akka.tcp://...`. But `NettyRpcEnv` uses the same format. So it's not necessary after removing `AkkaRpcEnv`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10459 from zsxwing/remove-akka-rpc-env.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala12
1 files changed, 3 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 52acde1b41..b98cc964ed 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -34,7 +34,6 @@ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemor
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
-import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
import org.apache.spark.serializer.Serializer
@@ -97,9 +96,7 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
- if (!rpcEnv.isInstanceOf[AkkaRpcEnv]) {
- actorSystem.shutdown()
- }
+ actorSystem.shutdown()
rpcEnv.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
@@ -248,14 +245,11 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
- // Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
+ // Create the ActorSystem for Akka and get the port it binds to.
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
- val actorSystem: ActorSystem =
- if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
- rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
- } else {
+ val actorSystem: ActorSystem = {
val actorSystemPort =
if (port == 0 || rpcEnv.address == null) {
port