aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-01-06 15:47:40 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2014-01-07 13:01:43 +0530
commitb3018811e106e6414816380a35c07a8564945d37 (patch)
tree3abe125d1b18eafc52fb4a4bcfd838244ec4e232
parentb97ef218f3feec6354bae6cfcd060e9b112c1779 (diff)
downloadspark-b3018811e106e6414816380a35c07a8564945d37.tar.gz
spark-b3018811e106e6414816380a35c07a8564945d37.tar.bz2
spark-b3018811e106e6414816380a35c07a8564945d37.zip
Allow users to set arbitrary akka configurations via spark conf.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala8
-rw-r--r--docs/configuration.md8
3 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f27033b5..2d437f1b21 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = {
+ getAll.filter {
+ case (k, v) => k.startsWith("akka.")
+ }
+ }
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e5..2ee37815de 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin)
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 09342fedfc..8a8857bb3b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -361,6 +361,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>akka.x.y....</td>
+ <td>value</td>
+ <td>
+ An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well.
+ </td>
+</tr>
+
+<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
<td>