aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-10-27 13:35:04 -0500
committerStephen Haberman <stephen@exigencecorp.com>2013-10-27 14:08:32 -0500
commita6ae2b48320d367be5fede60687331ce0d563d00 (patch)
tree3577da40da0b9b9498382d7aae85d55b93c153c6 /core
parent1dc776b863663af713920d18cecaf57762c2fd77 (diff)
downloadspark-a6ae2b48320d367be5fede60687331ce0d563d00.tar.gz
spark-a6ae2b48320d367be5fede60687331ce0d563d00.tar.bz2
spark-a6ae2b48320d367be5fede60687331ce0d563d00.zip
Handle ConcurrentModificationExceptions in SparkContext init.
System.getProperties.toMap will fail-fast when concurrently modified, and it seems like some other thread started by SparkContext does a System.setProperty during it's initialization. Handle this by just looping on ConcurrentModificationException, which seems the safest, since the non-fail-fast methods (Hastable.entrySet) have undefined behavior under concurrent modification.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala10
2 files changed, 13 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 564466cfd5..d694dfe4d9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -248,8 +248,8 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
+ Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.") =>
+ conf.set(key.substring("spark.hadoop.".length), value)
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3b3968c5e..d637a0a91d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
import org.apache.spark.{SparkEnv, SparkException, Logging}
+import java.util.ConcurrentModificationException
/**
@@ -819,4 +820,13 @@ private[spark] object Utils extends Logging {
// Nothing else to guard against ?
hashAbs
}
+
+ /** Returns a copy of the system properties that is thread-safe to iterator over. */
+ def getSystemProperties(): Map[String, String] = {
+ try {
+ return System.getProperties().toMap[String, String]
+ } catch {
+ case e: ConcurrentModificationException => getSystemProperties()
+ }
+ }
}