aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-01-26 12:51:32 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-26 13:22:17 -0800
commitef6fe84dcbfff2d937088ea179889752317896e5 (patch)
tree61af3c066488cc3f52afa532aea0ee7d798ba4b4
parentcf65620f51d9e85ebe99c50ae65e9814af2b4928 (diff)
downloadspark-ef6fe84dcbfff2d937088ea179889752317896e5.tar.gz
spark-ef6fe84dcbfff2d937088ea179889752317896e5.tar.bz2
spark-ef6fe84dcbfff2d937088ea179889752317896e5.zip
[SPARK-5355] use j.u.c.ConcurrentHashMap instead of TrieMap
j.u.c.ConcurrentHashMap is more battle tested. cc rxin JoshRosen pwendell Author: Davies Liu <davies@databricks.com> Closes #4208 from davies/safe-conf and squashes the following commits: c2182dc [Davies Liu] address comments, fix tests 3a1d821 [Davies Liu] fix test da14ced [Davies Liu] Merge branch 'master' of github.com:apache/spark into safe-conf ae4d305 [Davies Liu] change to j.u.c.ConcurrentMap f8fa1cf [Davies Liu] change to TrieMap a1d769a [Davies Liu] make SparkConf thread-safe (cherry picked from commit 142093179a4c40bdd90744191034de7b94a963ff) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala2
3 files changed, 23 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index dd80013757..3337974978 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,9 +17,11 @@
package org.apache.spark
+import java.util.concurrent.ConcurrentHashMap
+
import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.{HashMap, LinkedHashSet}
+import scala.collection.mutable.LinkedHashSet
+
import org.apache.spark.serializer.KryoSerializer
/**
@@ -47,12 +49,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
- private[spark] val settings = new TrieMap[String, String]()
+ private val settings = new ConcurrentHashMap[String, String]()
if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
- settings(k) = v
+ set(k, v)
}
}
@@ -64,7 +66,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value")
}
- settings(key) = value
+ settings.put(key, value)
this
}
@@ -130,15 +132,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]) = {
- this.settings ++= settings
+ this.settings.putAll(settings.toMap.asJava)
this
}
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- if (!settings.contains(key)) {
- settings(key) = value
- }
+ settings.putIfAbsent(key, value)
this
}
@@ -164,21 +164,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
- settings.getOrElse(key, throw new NoSuchElementException(key))
+ getOption(key).getOrElse(throw new NoSuchElementException(key))
}
/** Get a parameter, falling back to a default if not set */
def get(key: String, defaultValue: String): String = {
- settings.getOrElse(key, defaultValue)
+ getOption(key).getOrElse(defaultValue)
}
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- settings.get(key)
+ Option(settings.get(key))
}
/** Get all parameters as a list of pairs */
- def getAll: Array[(String, String)] = settings.toArray
+ def getAll: Array[(String, String)] = {
+ settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
+ }
/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
@@ -225,11 +227,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")
/** Does the configuration contain a given parameter? */
- def contains(key: String): Boolean = settings.contains(key)
+ def contains(key: String): Boolean = settings.containsKey(key)
/** Copy this object */
override def clone: SparkConf = {
- new SparkConf(false).setAll(settings)
+ new SparkConf(false).setAll(getAll)
}
/**
@@ -241,7 +243,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
- if (settings.contains("spark.local.dir")) {
+ if (contains("spark.local.dir")) {
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
logWarning(msg)
@@ -266,7 +268,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
// Validate spark.executor.extraJavaOptions
- settings.get(executorOptsKey).map { javaOpts =>
+ getOption(executorOptsKey).map { javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
@@ -346,7 +348,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* configuration out for debugging.
*/
def toDebugString: String = {
- settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+ getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 1a28a9a187..372d7aa453 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -43,7 +43,7 @@ class WorkerArgumentsTest extends FunSuite {
}
override def clone: SparkConf = {
- new MySparkConf().setAll(settings)
+ new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
@@ -62,7 +62,7 @@ class WorkerArgumentsTest extends FunSuite {
}
override def clone: SparkConf = {
- new MySparkConf().setAll(settings)
+ new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index dae7bf0e33..8cf951adb3 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -49,7 +49,7 @@ class LocalDirsSuite extends FunSuite {
}
override def clone: SparkConf = {
- new MySparkConf().setAll(settings)
+ new MySparkConf().setAll(getAll)
}
}
// spark.local.dir only contains invalid directories, but that's not a problem since