aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornravi <nravi@c1704.halxg.cloudera.com>2014-06-13 10:52:21 -0700
committerReynold Xin <rxin@apache.org>2014-06-13 10:52:21 -0700
commit70c8116c0aecba293234edc44a7f8e58e5008649 (patch)
treee1b0f7ce6afbed02b2567d4bd45e8b43f4dd2fb0
parentb3736e3d2ff9ccb83a18eefec661739105a38df5 (diff)
downloadspark-70c8116c0aecba293234edc44a7f8e58e5008649.tar.gz
spark-70c8116c0aecba293234edc44a7f8e58e5008649.tar.bz2
spark-70c8116c0aecba293234edc44a7f8e58e5008649.zip
Workaround in Spark for ConcurrentModification issue (JIRA Hadoop-10456, Spark-1097)
This fix has gone into Hadoop 2.4.1. For developers using < 2.4.1, it would be good to have a workaround in Spark as well. Fix has been tested for performance as well, no regressions found. Author: nravi <nravi@c1704.halxg.cloudera.com> Closes #1000 from nishkamravi2/master and squashes the following commits: eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) 6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed) 5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) 681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala11
1 files changed, 7 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6547755764..2aa111d600 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -139,10 +139,13 @@ class HadoopRDD[K, V](
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- val newJobConf = new JobConf(broadcastedConf.value.value)
- initLocalJobConfFuncOpt.map(f => f(newJobConf))
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- newJobConf
+ // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
+ broadcastedConf.synchronized {
+ val newJobConf = new JobConf(broadcastedConf.value.value)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ newJobConf
+ }
}
}