aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala8
-rw-r--r--docs/configuration.md8
3 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f27033b5..2d437f1b21 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = {
+ getAll.filter {
+ case (k, v) => k.startsWith("akka.")
+ }
+ }
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e5..2ee37815de 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin)
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 09342fedfc..8a8857bb3b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -361,6 +361,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>akka.x.y....</td>
+ <td>value</td>
+ <td>
+ An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well.
+ </td>
+</tr>
+
+<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
<td>