aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-12 23:16:45 -0700
committergatorsmile <gatorsmile@gmail.com>2017-03-12 23:16:45 -0700
commit05887fc3d8d517b416992ee870d0f865b1f9a3d0 (patch)
tree9575e1f3ec0f67a965d93fc257d2666a00dfddab /sql/core/src
parent9456688547522a62f1e7520e9b3564550c57aa5d (diff)
downloadspark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.tar.gz
spark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.tar.bz2
spark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.zip
[SPARK-19916][SQL] simplify bad file handling
## What changes were proposed in this pull request? We should only have one centre place to try catch the exception for corrupted files. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes #17253 from cloud-fan/bad-file.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala14
3 files changed, 43 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 6784ee243c..dacf462953 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -90,7 +90,7 @@ trait FileFormat {
* @param options A set of string -> string configuration options.
* @return
*/
- def buildReader(
+ protected def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
@@ -98,8 +98,6 @@ trait FileFormat {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
- // TODO: Remove this default implementation when the other formats have been ported
- // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 14f721d6a7..a89d172a91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
import scala.collection.mutable
@@ -44,7 +44,7 @@ case class PartitionedFile(
filePath: String,
start: Long,
length: Long,
- locations: Array[String] = Array.empty) {
+ @transient locations: Array[String] = Array.empty) {
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
@@ -121,6 +121,20 @@ class FileScanRDD(
nextElement
}
+ private def readCurrentFile(): Iterator[InternalRow] = {
+ try {
+ readFunction(currentFile)
+ } catch {
+ case e: FileNotFoundException =>
+ throw new FileNotFoundException(
+ e.getMessage + "\n" +
+ "It is possible the underlying files have been updated. " +
+ "You can explicitly invalidate the cache in Spark by " +
+ "running 'REFRESH TABLE tableName' command in SQL or " +
+ "by recreating the Dataset/DataFrame involved.")
+ }
+ }
+
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
updateBytesReadWithFileSize()
@@ -130,54 +144,36 @@ class FileScanRDD(
// Sets InputFileBlockHolder for the file block's information
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
- try {
- if (ignoreCorruptFiles) {
- currentIterator = new NextIterator[Object] {
- private val internalIter = {
- try {
- // The readFunction may read files before consuming the iterator.
- // E.g., vectorized Parquet reader.
- readFunction(currentFile)
- } catch {
- case e @(_: RuntimeException | _: IOException) =>
- logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
- Iterator.empty
- }
- }
-
- override def getNext(): AnyRef = {
- try {
- if (internalIter.hasNext) {
- internalIter.next()
- } else {
- finished = true
- null
- }
- } catch {
- case e: IOException =>
- logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
- finished = true
- null
+ if (ignoreCorruptFiles) {
+ currentIterator = new NextIterator[Object] {
+ // The readFunction may read some bytes before consuming the iterator, e.g.,
+ // vectorized Parquet reader. Here we use lazy val to delay the creation of
+ // iterator so that we will throw exception in `getNext`.
+ private lazy val internalIter = readCurrentFile()
+
+ override def getNext(): AnyRef = {
+ try {
+ if (internalIter.hasNext) {
+ internalIter.next()
+ } else {
+ finished = true
+ null
}
+ } catch {
+ // Throw FileNotFoundException even `ignoreCorruptFiles` is true
+ case e: FileNotFoundException => throw e
+ case e @ (_: RuntimeException | _: IOException) =>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file: $currentFile", e)
+ finished = true
+ null
}
-
- override def close(): Unit = {}
}
- } else {
- currentIterator = readFunction(currentFile)
+
+ override def close(): Unit = {}
}
- } catch {
- case e: IOException if ignoreCorruptFiles =>
- logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
- currentIterator = Iterator.empty
- case e: java.io.FileNotFoundException =>
- throw new java.io.FileNotFoundException(
- e.getMessage + "\n" +
- "It is possible the underlying files have been updated. " +
- "You can explicitly invalidate the cache in Spark by " +
- "running 'REFRESH TABLE tableName' command in SQL or " +
- "by recreating the Dataset/DataFrame involved."
- )
+ } else {
+ currentIterator = readCurrentFile()
}
hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 5313c2f374..062aa5c8ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -283,20 +283,6 @@ class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- // For Parquet data source, `buildReader` already handles partition values appending. Here we
- // simply delegate to `buildReader`.
- buildReader(
- sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
- }
-
- override def buildReader(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,