aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJacek Lewandowski <lewandowski.jacek@gmail.com>2015-02-02 14:07:19 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-02 14:07:19 -0800
commit5a5526164bdf9ecf1306d4570e816eb4df5cfd2b (patch)
tree70a30836e095dd16fce0e27bb4d2681d2cb425e6 /core
parentbff65b5cca7ae0c6c49e6a04638d18104be4be7c (diff)
downloadspark-5a5526164bdf9ecf1306d4570e816eb4df5cfd2b.tar.gz
spark-5a5526164bdf9ecf1306d4570e816eb4df5cfd2b.tar.bz2
spark-5a5526164bdf9ecf1306d4570e816eb4df5cfd2b.zip
SPARK-5425: Use synchronised methods in system properties to create SparkConf
SPARK-5425: Fixed usages of system properties This patch fixes few problems caused by the fact that the Scala wrapper over system properties is not thread-safe and is basically invalid because it doesn't take into account the default values which could have been set in the properties object. The problem is fixed by modifying `Utils.getSystemProperties` method so that it uses `stringPropertyNames` method of the `Properties` class, which is thread-safe (internally it creates a defensive copy in a synchronized method) and returns keys of the properties which were set explicitly and which are defined as defaults. The other related problem, which is fixed here. was in `ResetSystemProperties` mix-in. It created a copy of the system properties in the wrong way. This patch also introduces a test case for thread-safeness of SparkConf creation. Refer to the discussion in https://github.com/apache/spark/pull/4220 for more details. Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #4222 from jacek-lewandowski/SPARK-5425-1.3 and squashes the following commits: 03da61b [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults 8faf2ea [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait 71aa572 [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system properties to create SparkConf
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala7
4 files changed, 42 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cd91c8f875..4d4c69d42d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.Utils
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (loadDefaults) {
// Load any spark.* system properties
- for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
- set(k, v)
+ for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
+ set(key, value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 703b23add8..31850b50bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1347,9 +1347,14 @@ private[spark] object Utils extends Logging {
hashAbs
}
- /** Returns a copy of the system properties that is thread-safe to iterator over. */
- def getSystemProperties(): Map[String, String] = {
- System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
+ /** Returns the system properties map that is thread-safe to iterator over. It gets the
+ * properties which have been set explicitly, as well as those for which only a default value
+ * has been defined. */
+ def getSystemProperties: Map[String, String] = {
+ val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
+ (key, System.getProperty(key))
+
+ sysProps.toMap
}
/**
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 790976a5ac..e08210ae60 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -17,6 +17,10 @@
package org.apache.spark
+import java.util.concurrent.{TimeUnit, Executors}
+
+import scala.util.{Try, Random}
+
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
@@ -123,6 +127,27 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}
+ test("Thread safeness - SPARK-5425") {
+ import scala.collection.JavaConversions._
+ val executor = Executors.newSingleThreadScheduledExecutor()
+ val sf = executor.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit =
+ System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
+ }, 0, 1, TimeUnit.MILLISECONDS)
+
+ try {
+ val t0 = System.currentTimeMillis()
+ while ((System.currentTimeMillis() - t0) < 1000) {
+ val conf = Try(new SparkConf(loadDefaults = true))
+ assert(conf.isSuccess === true)
+ }
+ } finally {
+ executor.shutdownNow()
+ for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425."))
+ System.getProperties.remove(key)
+ }
+ }
+
test("register kryo classes through registerKryoClasses") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
index d4b92f33dd..bad1aa9995 100644
--- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.util.Properties
+import org.apache.commons.lang3.SerializationUtils
import org.scalatest.{BeforeAndAfterEach, Suite}
/**
@@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
var oldProperties: Properties = null
override def beforeEach(): Unit = {
- oldProperties = new Properties(System.getProperties)
+ // we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because
+ // the later way of creating a copy does not copy the properties but it initializes a new
+ // Properties object with the given properties as defaults. They are not recognized at all
+ // by standard Scala wrapper over Java Properties then.
+ oldProperties = SerializationUtils.clone(System.getProperties)
super.beforeEach()
}