aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/AkkaUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala139
1 files changed, 0 insertions, 139 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b2f1..0000000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
- /**
- * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
- * ActorSystem itself and its port (which is hard to get from Akka).
- *
- * Note: the `name` parameter is important, as even if a client sends a message to right
- * host + port, if the system name is incorrect, Akka will drop the message.
- *
- * If indestructible is set to true, the Actor System will continue running in the event
- * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
- */
- def createActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
- val startService: Int => (ActorSystem, Int) = { actualPort =>
- doCreateActorSystem(name, host, actualPort, conf, securityManager)
- }
- Utils.startServiceOnPort(port, startService, conf, name)
- }
-
- private def doCreateActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
-
- val akkaThreads = conf.getInt("spark.akka.threads", 4)
- val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
- conf.get("spark.network.timeout", "120s"))
- val akkaFrameSize = maxFrameSizeBytes(conf)
- val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
- val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
- if (!akkaLogLifecycleEvents) {
- // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
- // See: https://www.assembla.com/spaces/akka/tickets/3787#/
- Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
- }
-
- val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
- val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
- val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
- val secretKey = securityManager.getSecretKey()
- val isAuthOn = securityManager.isAuthenticationEnabled()
- if (isAuthOn && secretKey == null) {
- throw new Exception("Secret key is null with authentication on")
- }
- val requireCookie = if (isAuthOn) "on" else "off"
- val secureCookie = if (isAuthOn) secretKey else ""
- logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
- val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
- .getOrElse(ConfigFactory.empty())
-
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
- .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
- s"""
- |akka.daemonic = on
- |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
- |akka.stdout-loglevel = "ERROR"
- |akka.jvm-exit-on-fatal-error = off
- |akka.remote.require-cookie = "$requireCookie"
- |akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
- |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
- |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
- |akka.actor.default-dispatcher.throughput = $akkaBatchSize
- |akka.log-config-on-start = $logAkkaConfig
- |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
- |akka.log-dead-letters = $lifecycleEvents
- |akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin))
-
- val actorSystem = ActorSystem(name, akkaConf)
- val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.getDefaultAddress.port.get
- (actorSystem, boundPort)
- }
-
- private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
- /** Returns the configured max frame size for Akka messages in bytes. */
- def maxFrameSizeBytes(conf: SparkConf): Int = {
- val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
- if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
- throw new IllegalArgumentException(
- s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
- }
- frameSizeInMB * 1024 * 1024
- }
-
- /** Space reserved for extra data in an Akka message besides serialized task or task result. */
- val reservedSizeBytes = 200 * 1024
-
-}