aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMingyu Kim <mkim@palantir.com>2015-09-18 15:40:58 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-18 15:40:58 -0700
commit8074208fa47fa654c1055c48cfa0d923edeeb04f (patch)
tree455a0c56c66ae1477087a09dc68a6d2bd3fd6b79 /core
parent348d7c9a93dd00d3d1859342a8eb0aea2e77f597 (diff)
downloadspark-8074208fa47fa654c1055c48cfa0d923edeeb04f.tar.gz
spark-8074208fa47fa654c1055c48cfa0d923edeeb04f.tar.bz2
spark-8074208fa47fa654c1055c48cfa0d923edeeb04f.zip
[SPARK-10611] Clone Configuration for each task for NewHadoopRDD
This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu Kim <mkim@palantir.com> Closes #8763 from mingyukim/mkim/SPARK-10611.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala37
2 files changed, 34 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6fec00dcd0..aedced7408 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
+ val conf = getConf
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(getConf)
+ configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(getConf, jobId)
+ val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 174979aaeb..2872b93b87 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
override def hashCode(): Int = 41 * (41 + rddId) + index
}
@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
@transient protected val jobId = new JobID(jobTrackerId, id)
+ private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+
+ def getConf: Configuration = {
+ val conf: Configuration = confBroadcast.value.value
+ if (shouldCloneJobConf) {
+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
+ // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
+ // problem occurs somewhat rarely because most jobs treat the configuration as though it's
+ // immutable. One solution, implemented here, is to clone the Configuration object.
+ // Unfortunately, this clone can be very expensive. To avoid unexpected performance
+ // regressions for workloads and Hadoop versions that do not suffer from these thread-safety
+ // issues, this cloning is disabled by default.
+ NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
+ logDebug("Cloning Hadoop Configuration")
+ new Configuration(conf)
+ }
+ } else {
+ conf
+ }
+ }
+
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
- val conf = confBroadcast.value.value
+ val conf = getConf
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,12 +250,16 @@ class NewHadoopRDD[K, V](
super.persist(storageLevel)
}
-
- def getConf: Configuration = confBroadcast.value.value
}
private[spark] object NewHadoopRDD {
/**
+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
+ * Therefore, we synchronize on this lock before calling new Configuration().
+ */
+ val CONFIGURATION_INSTANTIATION_LOCK = new Object()
+
+ /**
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
* the given function rather than the index of the partition.
*/
@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
+ val conf = getConf
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(getConf)
+ configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(getConf, jobId)
+ val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)