diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/AkkaUtils.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 139 |
1 files changed, 0 insertions, 139 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala deleted file mode 100644 index 3f4ac9b2f1..0000000000 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.JavaConverters._ - -import akka.actor.{ActorSystem, ExtendedActorSystem} -import com.typesafe.config.ConfigFactory -import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{Logging, SecurityManager, SparkConf} - -/** - * Various utility classes for working with Akka. - */ -private[spark] object AkkaUtils extends Logging { - - /** - * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the - * ActorSystem itself and its port (which is hard to get from Akka). - * - * Note: the `name` parameter is important, as even if a client sends a message to right - * host + port, if the system name is incorrect, Akka will drop the message. - * - * If indestructible is set to true, the Actor System will continue running in the event - * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. - */ - def createActorSystem( - name: String, - host: String, - port: Int, - conf: SparkConf, - securityManager: SecurityManager): (ActorSystem, Int) = { - val startService: Int => (ActorSystem, Int) = { actualPort => - doCreateActorSystem(name, host, actualPort, conf, securityManager) - } - Utils.startServiceOnPort(port, startService, conf, name) - } - - private def doCreateActorSystem( - name: String, - host: String, - port: Int, - conf: SparkConf, - securityManager: SecurityManager): (ActorSystem, Int) = { - - val akkaThreads = conf.getInt("spark.akka.threads", 4) - val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout", - conf.get("spark.network.timeout", "120s")) - val akkaFrameSize = maxFrameSizeBytes(conf) - val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) - val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" - if (!akkaLogLifecycleEvents) { - // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. - // See: https://www.assembla.com/spaces/akka/tickets/3787#/ - Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) - } - - val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - - val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s") - val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s") - - val secretKey = securityManager.getSecretKey() - val isAuthOn = securityManager.isAuthenticationEnabled() - if (isAuthOn && secretKey == null) { - throw new Exception("Secret key is null with authentication on") - } - val requireCookie = if (isAuthOn) "on" else "off" - val secureCookie = if (isAuthOn) secretKey else "" - logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - - val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig - .getOrElse(ConfigFactory.empty()) - - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava) - .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( - s""" - |akka.daemonic = on - |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] - |akka.stdout-loglevel = "ERROR" - |akka.jvm-exit-on-fatal-error = off - |akka.remote.require-cookie = "$requireCookie" - |akka.remote.secure-cookie = "$secureCookie" - |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s - |akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" - |akka.remote.netty.tcp.hostname = "$host" - |akka.remote.netty.tcp.port = $port - |akka.remote.netty.tcp.tcp-nodelay = on - |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B - |akka.remote.netty.tcp.execution-pool-size = $akkaThreads - |akka.actor.default-dispatcher.throughput = $akkaBatchSize - |akka.log-config-on-start = $logAkkaConfig - |akka.remote.log-remote-lifecycle-events = $lifecycleEvents - |akka.log-dead-letters = $lifecycleEvents - |akka.log-dead-letters-during-shutdown = $lifecycleEvents - """.stripMargin)) - - val actorSystem = ActorSystem(name, akkaConf) - val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider - val boundPort = provider.getDefaultAddress.port.get - (actorSystem, boundPort) - } - - private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 - - /** Returns the configured max frame size for Akka messages in bytes. */ - def maxFrameSizeBytes(conf: SparkConf): Int = { - val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128) - if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) { - throw new IllegalArgumentException( - s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB") - } - frameSizeInMB * 1024 * 1024 - } - - /** Space reserved for extra data in an Akka message besides serialized task or task result. */ - val reservedSizeBytes = 200 * 1024 - -} |