aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-07 22:37:04 -0800
committerReynold Xin <rxin@databricks.com>2016-12-07 22:37:04 -0800
commitb47b892e4579b7b06b4b2837ee4b614e517789f9 (patch)
tree5230de9b21a4f8f5fe3a620b1e732f6f3995674e
parent330fda8aa289e0142e174ed6f03b8fa28d08470f (diff)
downloadspark-b47b892e4579b7b06b4b2837ee4b614e517789f9.tar.gz
spark-b47b892e4579b7b06b4b2837ee4b614e517789f9.tar.bz2
spark-b47b892e4579b7b06b4b2837ee4b614e517789f9.zip
[SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled
## What changes were proposed in this pull request? When `ignoreCorruptFiles` is enabled, it's better to also ignore non-existing files. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16203 from zsxwing/ignore-file-not-found.
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala3
5 files changed, 33 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a69a2b5645..78aed4fb58 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -203,7 +203,8 @@ package object config {
private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
- "encountering corrupt files and contents that have been read will still be returned.")
+ "encountering corrupted or non-existing files and contents that have been read will still " +
+ "be returned.")
.booleanConf
.createWithDefault(false)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6e87233cd9..a83e139c13 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -248,12 +248,20 @@ class HadoopRDD[K, V](
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
- reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+ reader =
+ try {
+ inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+ } catch {
+ case e: IOException if ignoreCorruptFiles =>
+ logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
+ finished = true
+ null
+ }
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
- private val key: K = reader.createKey()
- private val value: V = reader.createValue()
+ private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
+ private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e805192bb6..733e85f305 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -174,14 +174,25 @@ class NewHadoopRDD[K, V](
}
private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- private var reader = format.createRecordReader(
- split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ private var finished = false
+ private var reader =
+ try {
+ val _reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ _reader
+ } catch {
+ case e: IOException if ignoreCorruptFiles =>
+ logWarning(
+ s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
+ e)
+ finished = true
+ null
+ }
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
private var havePair = false
- private var finished = false
private var recordsSinceMetricsUpdate = 0
override def hasNext: Boolean = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 6d8cd81431..e753cd962a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -151,6 +151,9 @@ class FileScanRDD(
currentIterator = readFunction(currentFile)
}
} catch {
+ case e: IOException if ignoreCorruptFiles =>
+ logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
+ currentIterator = Iterator.empty
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
e.getMessage + "\n" +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 91f3fe0fe9..c03e88b60e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -632,7 +632,8 @@ object SQLConf {
val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
- "encountering corrupt files and contents that have been read will still be returned.")
+ "encountering corrupted or non-existing and contents that have been read will still be " +
+ "returned.")
.booleanConf
.createWithDefault(false)