aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala22
-rw-r--r--docs/configuration.md8
3 files changed, 34 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 223fef7926..c40564137f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -689,7 +689,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
- if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+ if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+ jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
@@ -755,7 +756,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
- if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+ if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+ outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1f2206b1f0..070e974657 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}
+ test ("allow user to disable the output directory existence checking (old Hadoop API") {
+ val sf = new SparkConf()
+ sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+ sc = new SparkContext(sf)
+ val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+ }
+
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}
+ test ("allow user to disable the output directory existence checking (new Hadoop API") {
+ val sf = new SparkConf()
+ sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+ sc = new SparkContext(sf)
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+ }
+
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
diff --git a/docs/configuration.md b/docs/configuration.md
index 0697f7fc2f..71fafa5734 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
+<tr>
+ <td>spark.hadoop.validateOutputSpecs</td>
+ <td>true</td>
+ <td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
+ used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
+ output directories. We recommend that users do not disable this except if trying to achieve compatibility with
+ previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
+</tr>
</table>
#### Networking