aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala53
1 files changed, 38 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 8010dd9008..775141775e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -132,27 +132,47 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
+ private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
- if (conf.isInstanceOf[JobConf]) {
- // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
- conf.asInstanceOf[JobConf]
- } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- // getJobConf() has been called previously, so there is already a local cache of the JobConf
- // needed by this RDD.
- HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
- } else {
- // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
- // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
+ if (shouldCloneJobConf) {
+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
+ // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
+ // somewhat rarely because most jobs treat the configuration as though it's immutable. One
+ // solution, implemented here, is to clone the Configuration object. Unfortunately, this
+ // clone can be very expensive. To avoid unexpected performance regressions for workloads and
+ // Hadoop versions that do not suffer from these thread-safety issues, this cloning is
+ // disabled by default.
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
- initLocalJobConfFuncOpt.map(f => f(newJobConf))
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ if (!conf.isInstanceOf[JobConf]) {
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ }
newJobConf
}
+ } else {
+ if (conf.isInstanceOf[JobConf]) {
+ logDebug("Re-using user-broadcasted JobConf")
+ conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ logDebug("Re-using cached JobConf")
+ HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
+ HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Creating new JobConf and caching it for later re-use")
+ val newJobConf = new JobConf(conf)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ newJobConf
+ }
+ }
}
}
@@ -276,7 +296,10 @@ class HadoopRDD[K, V](
}
private[spark] object HadoopRDD extends Logging {
- /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
+ /**
+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+ * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
+ */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/**