aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-16 14:10:17 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-16 14:10:33 -0700
commit91e7a71c68eb9ff0738c21bc7525fa89bd662993 (patch)
treea0770feba9d32a7b71c0c771a3eaedd4d801b932 /core/src
parentbf1ddc7b848c9dae671d6dff7d20d160abbe7b5c (diff)
downloadspark-91e7a71c68eb9ff0738c21bc7525fa89bd662993.tar.gz
spark-91e7a71c68eb9ff0738c21bc7525fa89bd662993.tar.bz2
spark-91e7a71c68eb9ff0738c21bc7525fa89bd662993.zip
SPARK-1097: Do not introduce deadlock while fixing concurrency bug
We recently added this lock on 'conf' in order to prevent concurrent creation. However, it turns out that this can introduce a deadlock because Hadoop also synchronizes on the Configuration objects when creating new Configurations (and they do so via a static REGISTRY which contains all created Configurations). This fix forces all Spark initialization of Configuration objects to occur serially by using a static lock that we control, and thus also prevents introducing the deadlock. Author: Aaron Davidson <aaron@databricks.com> Closes #1409 from aarondav/1054 and squashes the following commits: 7d1b769 [Aaron Davidson] SPARK-1097: Do not introduce deadlock while fixing concurrency bug (cherry picked from commit 8867cd0bc2961fefed84901b8b14e9676ae6ab18) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a55b2266f8..d0a2241663 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -139,8 +139,8 @@ class HadoopRDD[K, V](
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
- conf.synchronized {
+ // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
+ HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
@@ -231,6 +231,9 @@ class HadoopRDD[K, V](
}
private[spark] object HadoopRDD {
+ /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
+ val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.