aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-10-19 00:31:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-19 00:35:05 -0700
commit7e63bb49c526c3f872619ae14e4b5273f4c535e9 (patch)
tree241f07bb2627381f75b0b3791d0dbbac35baa5ea /core/src
parent05db2da7dc256822cdb602c4821cbb9fb84dac98 (diff)
downloadspark-7e63bb49c526c3f872619ae14e4b5273f4c535e9.tar.gz
spark-7e63bb49c526c3f872619ae14e4b5273f4c535e9.tar.bz2
spark-7e63bb49c526c3f872619ae14e4b5273f4c535e9.zip
[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)
This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`. The underlying problem is that thread-safety issues in Hadoop Configuration objects may cause Spark tasks to get stuck in infinite loops. The approach taken here is to clone a new copy of the JobConf for each task rather than sharing a single copy between tasks. Note that there are still Configuration thread-safety issues that may affect the driver, but these seem much less likely to occur in practice and will be more complex to fix (see discussion on the SPARK-2546 ticket). This cloning is guarded by a new configuration option (`spark.hadoop.cloneConf`) and is disabled by default in order to avoid unexpected performance regressions for workloads that are unaffected by the Configuration thread-safety issues. Author: Josh Rosen <joshrosen@apache.org> Closes #2684 from JoshRosen/jobconf-fix-backport and squashes the following commits: f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop JobConf. b562451 [Josh Rosen] Remove unused jobConfCacheKey field. dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task. (cherry picked from commit 2cd40db2b3ab5ddcb323fd05c171dbd9025f9e71) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala53
1 files changed, 38 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 8010dd9008..775141775e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -132,27 +132,47 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
+ private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
- if (conf.isInstanceOf[JobConf]) {
- // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
- conf.asInstanceOf[JobConf]
- } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- // getJobConf() has been called previously, so there is already a local cache of the JobConf
- // needed by this RDD.
- HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
- } else {
- // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
- // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
+ if (shouldCloneJobConf) {
+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
+ // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
+ // somewhat rarely because most jobs treat the configuration as though it's immutable. One
+ // solution, implemented here, is to clone the Configuration object. Unfortunately, this
+ // clone can be very expensive. To avoid unexpected performance regressions for workloads and
+ // Hadoop versions that do not suffer from these thread-safety issues, this cloning is
+ // disabled by default.
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
- initLocalJobConfFuncOpt.map(f => f(newJobConf))
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ if (!conf.isInstanceOf[JobConf]) {
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ }
newJobConf
}
+ } else {
+ if (conf.isInstanceOf[JobConf]) {
+ logDebug("Re-using user-broadcasted JobConf")
+ conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ logDebug("Re-using cached JobConf")
+ HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
+ HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Creating new JobConf and caching it for later re-use")
+ val newJobConf = new JobConf(conf)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ newJobConf
+ }
+ }
}
}
@@ -276,7 +296,10 @@ class HadoopRDD[K, V](
}
private[spark] object HadoopRDD extends Logging {
- /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
+ /**
+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+ * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
+ */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/**