diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-30 20:11:48 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-30 20:11:48 -0700 |
commit | 8f1098a3f0de8c9b2eb9ede91a1b01da10a525ea (patch) | |
tree | 474a241c093523268e7de40492d2cb88e48bba7c /core | |
parent | dc9ce16f6b695295c24062a76fd10f574d278cac (diff) | |
parent | 09f3b677cb7cce08882ea030e9af5798a63046ba (diff) | |
download | spark-8f1098a3f0de8c9b2eb9ede91a1b01da10a525ea.tar.gz spark-8f1098a3f0de8c9b2eb9ede91a1b01da10a525ea.tar.bz2 spark-8f1098a3f0de8c9b2eb9ede91a1b01da10a525ea.zip |
Merge pull request #117 from stephenh/avoid_concurrent_modification_exception
Handle ConcurrentModificationExceptions in SparkContext init.
System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.
Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 7 |
2 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ade75e20d5..8d7c5a77da 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -255,8 +255,10 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { - conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) + Utils.getSystemProperties.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + conf.set(key.substring("spark.hadoop.".length), value) + } } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fd2811e44c..0c5c12b7a8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer import org.apache.spark.{SparkEnv, SparkException, Logging} +import java.util.ConcurrentModificationException /** @@ -818,4 +819,10 @@ private[spark] object Utils extends Logging { // Nothing else to guard against ? hashAbs } + + /** Returns a copy of the system properties that is thread-safe to iterator over. */ + def getSystemProperties(): Map[String, String] = { + return System.getProperties().clone() + .asInstanceOf[java.util.Properties].toMap[String, String] + } } |