diff options
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 4469c89e6b..f8df5b2a08 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -25,6 +25,7 @@ import scala.collection.{Map, mutable} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} @@ -964,7 +965,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -1042,7 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { + if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) @@ -1117,8 +1118,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] def valueClass: Class[_] = vt.runtimeClass private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) + + // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation + // setting can take effect: + private def isOutputSpecValidationEnabled: Boolean = { + val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value + val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) + enabledInConf && !validationDisabled + } } private[spark] object PairRDDFunctions { val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 + + /** + * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case + * basis; see SPARK-4835 for more details. + */ + val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) } |