aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala14
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6e87233cd9..a83e139c13 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -248,12 +248,20 @@ class HadoopRDD[K, V](
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
- reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+ reader =
+ try {
+ inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+ } catch {
+ case e: IOException if ignoreCorruptFiles =>
+ logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
+ finished = true
+ null
+ }
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
- private val key: K = reader.createKey()
- private val value: V = reader.createValue()
+ private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
+ private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {