aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-01-06 15:47:40 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2014-01-07 13:01:43 +0530
commitb3018811e106e6414816380a35c07a8564945d37 (patch)
tree3abe125d1b18eafc52fb4a4bcfd838244ec4e232 /core/src
parentb97ef218f3feec6354bae6cfcd060e9b112c1779 (diff)
downloadspark-b3018811e106e6414816380a35c07a8564945d37.tar.gz
spark-b3018811e106e6414816380a35c07a8564945d37.tar.bz2
spark-b3018811e106e6414816380a35c07a8564945d37.zip
Allow users to set arbitrary akka configurations via spark conf.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala8
2 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f27033b5..2d437f1b21 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = {
+ getAll.filter {
+ case (k, v) => k.startsWith("akka.")
+ }
+ }
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e5..2ee37815de 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin)
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)