aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-07 00:54:25 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-07 00:54:25 -0800
commitc3cf0475e8f5fc1a5322ed12ffebca80a1dd0824 (patch)
treedfc5f221868f8e83430d4d3de3d4b37f9f042802
parenta862cafacf555373b5fdbafb4c9c4d712b191648 (diff)
parentc729fa7c8ed733a778a7201ed17bf74f3e132845 (diff)
downloadspark-c3cf0475e8f5fc1a5322ed12ffebca80a1dd0824.tar.gz
spark-c3cf0475e8f5fc1a5322ed12ffebca80a1dd0824.tar.bz2
spark-c3cf0475e8f5fc1a5322ed12ffebca80a1dd0824.zip
Merge pull request #339 from ScrapCodes/conf-improvements
Conf improvements There are two new features. 1. Allow users to set arbitrary akka configurations via spark conf. 2. Allow configuration to be printed in logs for diagnosis.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala8
-rw-r--r--docs/configuration.md15
4 files changed, 27 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f27033b5..b166527614 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
.map{case (k, v) => (k.substring(prefix.length), v)}
}
+ /** Get all akka conf variables set on this SparkConf */
+ def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e80e43af6d..99dcced7d7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,6 +116,10 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
+ if (conf.get("spark.log-conf", "false").toBoolean) {
+ logInfo("Spark configuration:\n" + conf.toDebugString)
+ }
+
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d8e5..2ee37815de 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark.util
+import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
/**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
- val akkaConf = ConfigFactory.parseString(
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+ ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
|akka.log-dead-letters = $lifecycleEvents
|akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin)
+ """.stripMargin))
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 09342fedfc..1d36ecb9c1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -361,6 +361,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>akka.x.y....</td>
+ <td>value</td>
+ <td>
+ An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
+ </td>
+</tr>
+
+<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
<td>
@@ -395,6 +403,13 @@ Apart from these, the following properties are also available, and may be useful
How many times slower a task is than the median to be considered for speculation.
</td>
</tr>
+<tr>
+ <td>spark.log-conf</td>
+ <td>false</td>
+ <td>
+ Log the supplied SparkConf as INFO at start of spark context.
+ </td>
+</tr>
</table>
## Viewing Spark Properties