diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-06 15:47:40 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-07 13:01:43 +0530 |
commit | b3018811e106e6414816380a35c07a8564945d37 (patch) | |
tree | 3abe125d1b18eafc52fb4a4bcfd838244ec4e232 /core | |
parent | b97ef218f3feec6354bae6cfcd060e9b112c1779 (diff) | |
download | spark-b3018811e106e6414816380a35c07a8564945d37.tar.gz spark-b3018811e106e6414816380a35c07a8564945d37.tar.bz2 spark-b3018811e106e6414816380a35c07a8564945d37.zip |
Allow users to set arbitrary akka configurations via spark conf.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkConf.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 8 |
2 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 55f27033b5..2d437f1b21 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with .map{case (k, v) => (k.substring(prefix.length), v)} } + /** Get all akka conf variables set on this SparkConf */ + def getAkkaConf: Seq[(String, String)] = { + getAll.filter { + case (k, v) => k.startsWith("akka.") + } + } + /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 7df7e3d8e5..2ee37815de 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,12 +17,13 @@ package org.apache.spark.util +import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory -import org.apache.log4j.{Level, Logger} +import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf /** @@ -64,7 +65,8 @@ private[spark] object AkkaUtils { conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt - val akkaConf = ConfigFactory.parseString( + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( + ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -86,7 +88,7 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents - """.stripMargin) + """.stripMargin)) val actorSystem = if (indestructible) { IndestructibleActorSystem(name, akkaConf) |