aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala4
5 files changed, 45 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cd91c8f875..4d4c69d42d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.Utils
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (loadDefaults) {
// Load any spark.* system properties
- for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
- set(k, v)
+ for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
+ set(key, value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 703b23add8..31850b50bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1347,9 +1347,14 @@ private[spark] object Utils extends Logging {
hashAbs
}
- /** Returns a copy of the system properties that is thread-safe to iterator over. */
- def getSystemProperties(): Map[String, String] = {
- System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
+ /** Returns the system properties map that is thread-safe to iterator over. It gets the
+ * properties which have been set explicitly, as well as those for which only a default value
+ * has been defined. */
+ def getSystemProperties: Map[String, String] = {
+ val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
+ (key, System.getProperty(key))
+
+ sysProps.toMap
}
/**
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 790976a5ac..e08210ae60 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -17,6 +17,10 @@
package org.apache.spark
+import java.util.concurrent.{TimeUnit, Executors}
+
+import scala.util.{Try, Random}
+
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
@@ -123,6 +127,27 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}
+ test("Thread safeness - SPARK-5425") {
+ import scala.collection.JavaConversions._
+ val executor = Executors.newSingleThreadScheduledExecutor()
+ val sf = executor.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit =
+ System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
+ }, 0, 1, TimeUnit.MILLISECONDS)
+
+ try {
+ val t0 = System.currentTimeMillis()
+ while ((System.currentTimeMillis() - t0) < 1000) {
+ val conf = Try(new SparkConf(loadDefaults = true))
+ assert(conf.isSuccess === true)
+ }
+ } finally {
+ executor.shutdownNow()
+ for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425."))
+ System.getProperties.remove(key)
+ }
+ }
+
test("register kryo classes through registerKryoClasses") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
index d4b92f33dd..bad1aa9995 100644
--- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.util.Properties
+import org.apache.commons.lang3.SerializationUtils
import org.scalatest.{BeforeAndAfterEach, Suite}
/**
@@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
var oldProperties: Properties = null
override def beforeEach(): Unit = {
- oldProperties = new Properties(System.getProperties)
+ // we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because
+ // the later way of creating a copy does not copy the properties but it initializes a new
+ // Properties object with the given properties as defaults. They are not recognized at all
+ // by standard Scala wrapper over Java Properties then.
+ oldProperties = SerializationUtils.clone(System.getProperties)
super.beforeEach()
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index 65251e9319..e757283823 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.examples
import scala.collection.JavaConversions._
+import org.apache.spark.util.Utils
+
/** Prints out environmental information, sleeps, and then exits. Made to
* test driver submission in the standalone scheduler. */
object DriverSubmissionTest {
@@ -30,7 +32,7 @@ object DriverSubmissionTest {
val numSecondsToSleep = args(0).toInt
val env = System.getenv()
- val properties = System.getProperties()
+ val properties = Utils.getSystemProperties
println("Environment variables containing SPARK_TEST:")
env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)