aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala37
7 files changed, 153 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5a710158db..517fc3e9e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -156,4 +156,9 @@ package object config {
.doc("Port to use for the block managed on the driver.")
.fallbackConf(BLOCK_MANAGER_PORT)
+ private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
+ .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
+ "encountering corrupt files and contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 4640b5dc2f..e1cf3938de 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.EOFException
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
@@ -43,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -139,6 +140,8 @@ class HadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
@@ -253,8 +256,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
- case eof: EOFException =>
- finished = true
+ case e: IOException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 1c7aec919b..baf31fb658 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
@@ -33,6 +34,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -85,6 +87,8 @@ class NewHadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
@@ -179,7 +183,11 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
- finished = !reader.nextKeyValue
+ try {
+ finished = !reader.nextKeyValue
+ } catch {
+ case e: IOException if ignoreCorruptFiles => finished = true
+ }
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 993834f8d7..cc52bb1d23 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark
-import java.io.{File, FileWriter}
+import java.io._
+import java.util.zip.GZIPOutputStream
import scala.io.Source
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.input.PortableDataStream
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -541,4 +543,62 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}
+
+ test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+
+ // Reading a corrupt gzip file should throw EOFException
+ sc = new SparkContext("local", "test")
+ // Test HadoopRDD
+ var e = intercept[SparkException] {
+ sc.textFile(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ // Test NewHadoopRDD
+ e = intercept[SparkException] {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ sc.stop()
+
+ val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
+ sc = new SparkContext("local", "test", conf)
+ // Test HadoopRDD
+ assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
+ // Test NewHadoopRDD
+ assert {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect().isEmpty
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index c66da3a831..89944570df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.io.IOException
+
import scala.collection.mutable
import org.apache.spark.{Partition => RDDPartition, TaskContext}
@@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
/**
* A part (i.e. "block") of a single file that should be read, along with partition column values
@@ -62,6 +65,8 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
+ private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private val inputMetrics = context.taskMetrics().inputMetrics
@@ -119,7 +124,30 @@ class FileScanRDD(
InputFileNameHolder.setInputFileName(currentFile.filePath)
try {
- currentIterator = readFunction(currentFile)
+ if (ignoreCorruptFiles) {
+ currentIterator = new NextIterator[Object] {
+ private val internalIter = readFunction(currentFile)
+
+ override def getNext(): AnyRef = {
+ try {
+ if (internalIter.hasNext) {
+ internalIter.next()
+ } else {
+ finished = true
+ null
+ }
+ } catch {
+ case e: IOException =>
+ finished = true
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+ } else {
+ currentIterator = readFunction(currentFile)
+ }
} catch {
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8cbfc4c762..9e7c1ec211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -576,6 +576,12 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)
+ val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
+ .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
+ "encountering corrupt files and contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -743,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
+ def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
+
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45411fa065..c5deb31fec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.datasources
-import java.io.File
+import java.io._
import java.util.concurrent.atomic.AtomicInteger
+import java.util.zip.GZIPOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
@@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("spark.files.ignoreCorruptFiles should work in SQL") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.text(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =