aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/AkkaUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala17
1 files changed, 14 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f26ed47e58..a6c9a9aaba 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -24,12 +24,12 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.SparkConf
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
* Various utility classes for working with Akka.
*/
-private[spark] object AkkaUtils {
+private[spark] object AkkaUtils extends Logging {
/**
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
@@ -42,7 +42,7 @@ private[spark] object AkkaUtils {
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
- conf: SparkConf): (ActorSystem, Int) = {
+ conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
@@ -65,6 +65,15 @@ private[spark] object AkkaUtils {
conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
+ val secretKey = securityManager.getSecretKey()
+ val isAuthOn = securityManager.isAuthenticationEnabled()
+ if (isAuthOn && secretKey == null) {
+ throw new Exception("Secret key is null with authentication on")
+ }
+ val requireCookie = if (isAuthOn) "on" else "off"
+ val secureCookie = if (isAuthOn) secretKey else ""
+ logDebug("In createActorSystem, requireCookie is: " + requireCookie)
+
val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
ConfigFactory.parseString(
s"""
@@ -72,6 +81,8 @@ private[spark] object AkkaUtils {
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
|akka.stdout-loglevel = "ERROR"
|akka.jvm-exit-on-fatal-error = off
+ |akka.remote.require-cookie = "$requireCookie"
+ |akka.remote.secure-cookie = "$secureCookie"
|akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.transport-failure-detector.threshold = $akkaFailureDetector