aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-10-12 13:51:53 -0700
committerMridul Muralidharan <mmuralidharan@HW11853.local>2016-10-12 13:51:53 -0700
commit47776e7c0c68590fe446cef910900b1aaead06f9 (patch)
tree62a7e0aae43310e3c598f1fd160fc2b2c8c93016 /sql/core
parenteb69335cdbce54f943ae6168aed39687f40e53ed (diff)
downloadspark-47776e7c0c68590fe446cef910900b1aaead06f9.tar.gz
spark-47776e7c0c68590fe446cef910900b1aaead06f9.tar.bz2
spark-47776e7c0c68590fe446cef910900b1aaead06f9.zip
[SPARK-17850][CORE] Add a flag to ignore corrupt files
## What changes were proposed in this pull request? Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`. ## How was this patch tested? The added unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #15422 from zsxwing/SPARK-17850.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala37
3 files changed, 73 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index c66da3a831..89944570df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.io.IOException
+
import scala.collection.mutable
import org.apache.spark.{Partition => RDDPartition, TaskContext}
@@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
/**
* A part (i.e. "block") of a single file that should be read, along with partition column values
@@ -62,6 +65,8 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
+ private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private val inputMetrics = context.taskMetrics().inputMetrics
@@ -119,7 +124,30 @@ class FileScanRDD(
InputFileNameHolder.setInputFileName(currentFile.filePath)
try {
- currentIterator = readFunction(currentFile)
+ if (ignoreCorruptFiles) {
+ currentIterator = new NextIterator[Object] {
+ private val internalIter = readFunction(currentFile)
+
+ override def getNext(): AnyRef = {
+ try {
+ if (internalIter.hasNext) {
+ internalIter.next()
+ } else {
+ finished = true
+ null
+ }
+ } catch {
+ case e: IOException =>
+ finished = true
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+ } else {
+ currentIterator = readFunction(currentFile)
+ }
} catch {
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8cbfc4c762..9e7c1ec211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -576,6 +576,12 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)
+ val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
+ .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
+ "encountering corrupt files and contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -743,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
+ def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
+
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45411fa065..c5deb31fec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.datasources
-import java.io.File
+import java.io._
import java.util.concurrent.atomic.AtomicInteger
+import java.util.zip.GZIPOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
@@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("spark.files.ignoreCorruptFiles should work in SQL") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.text(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =