aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala53
-rw-r--r--docs/configuration.md9
2 files changed, 47 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 8010dd9008..775141775e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -132,27 +132,47 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
+ private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
- if (conf.isInstanceOf[JobConf]) {
- // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
- conf.asInstanceOf[JobConf]
- } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- // getJobConf() has been called previously, so there is already a local cache of the JobConf
- // needed by this RDD.
- HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
- } else {
- // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
- // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
+ if (shouldCloneJobConf) {
+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
+ // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
+ // somewhat rarely because most jobs treat the configuration as though it's immutable. One
+ // solution, implemented here, is to clone the Configuration object. Unfortunately, this
+ // clone can be very expensive. To avoid unexpected performance regressions for workloads and
+ // Hadoop versions that do not suffer from these thread-safety issues, this cloning is
+ // disabled by default.
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
- initLocalJobConfFuncOpt.map(f => f(newJobConf))
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ if (!conf.isInstanceOf[JobConf]) {
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ }
newJobConf
}
+ } else {
+ if (conf.isInstanceOf[JobConf]) {
+ logDebug("Re-using user-broadcasted JobConf")
+ conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ logDebug("Re-using cached JobConf")
+ HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
+ HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Creating new JobConf and caching it for later re-use")
+ val newJobConf = new JobConf(conf)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ newJobConf
+ }
+ }
}
}
@@ -276,7 +296,10 @@ class HadoopRDD[K, V](
}
private[spark] object HadoopRDD extends Logging {
- /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
+ /**
+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+ * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
+ */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/**
diff --git a/docs/configuration.md b/docs/configuration.md
index f0204c640b..96fa1377ec 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -620,6 +620,15 @@ Apart from these, the following properties are also available, and may be useful
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
<tr>
+ <td><code>spark.hadoop.cloneConf</code></td>
+ <td>false</td>
+ <td>If set to true, clones a new Hadoop <code>Configuration</code> object for each task. This
+ option should be enabled to work around <code>Configuration</code> thread-safety issues (see
+ <a href="https://issues.apache.org/jira/browse/SPARK-2546">SPARK-2546</a> for more details).
+ This is disabled by default in order to avoid unexpected performance regressions for jobs that
+ are not affected by these issues.</td>
+</tr>
+<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td>
<td>Interval (milliseconds) between each executor's heartbeats to the driver. Heartbeats let