diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-03-12 23:16:45 -0700 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2017-03-12 23:16:45 -0700 |
commit | 05887fc3d8d517b416992ee870d0f865b1f9a3d0 (patch) | |
tree | 9575e1f3ec0f67a965d93fc257d2666a00dfddab | |
parent | 9456688547522a62f1e7520e9b3564550c57aa5d (diff) | |
download | spark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.tar.gz spark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.tar.bz2 spark-05887fc3d8d517b416992ee870d0f865b1f9a3d0.zip |
[SPARK-19916][SQL] simplify bad file handling
## What changes were proposed in this pull request?
We should only have one centre place to try catch the exception for corrupted files.
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Closes #17253 from cloud-fan/bad-file.
3 files changed, 43 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 6784ee243c..dacf462953 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -90,7 +90,7 @@ trait FileFormat { * @param options A set of string -> string configuration options. * @return */ - def buildReader( + protected def buildReader( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, @@ -98,8 +98,6 @@ trait FileFormat { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - // TODO: Remove this default implementation when the other formats have been ported - // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 14f721d6a7..a89d172a91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.IOException +import java.io.{FileNotFoundException, IOException} import scala.collection.mutable @@ -44,7 +44,7 @@ case class PartitionedFile( filePath: String, start: Long, length: Long, - locations: Array[String] = Array.empty) { + @transient locations: Array[String] = Array.empty) { override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" } @@ -121,6 +121,20 @@ class FileScanRDD( nextElement } + private def readCurrentFile(): Iterator[InternalRow] = { + try { + readFunction(currentFile) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { updateBytesReadWithFileSize() @@ -130,54 +144,36 @@ class FileScanRDD( // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - try { - if (ignoreCorruptFiles) { - currentIterator = new NextIterator[Object] { - private val internalIter = { - try { - // The readFunction may read files before consuming the iterator. - // E.g., vectorized Parquet reader. - readFunction(currentFile) - } catch { - case e @(_: RuntimeException | _: IOException) => - logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) - Iterator.empty - } - } - - override def getNext(): AnyRef = { - try { - if (internalIter.hasNext) { - internalIter.next() - } else { - finished = true - null - } - } catch { - case e: IOException => - logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) - finished = true - null + if (ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readCurrentFile() + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null } + } catch { + // Throw FileNotFoundException even `ignoreCorruptFiles` is true + case e: FileNotFoundException => throw e + case e @ (_: RuntimeException | _: IOException) => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null } - - override def close(): Unit = {} } - } else { - currentIterator = readFunction(currentFile) + + override def close(): Unit = {} } - } catch { - case e: IOException if ignoreCorruptFiles => - logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) - currentIterator = Iterator.empty - case e: java.io.FileNotFoundException => - throw new java.io.FileNotFoundException( - e.getMessage + "\n" + - "It is possible the underlying files have been updated. " + - "You can explicitly invalidate the cache in Spark by " + - "running 'REFRESH TABLE tableName' command in SQL or " + - "by recreating the Dataset/DataFrame involved." - ) + } else { + currentIterator = readCurrentFile() } hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5313c2f374..062aa5c8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -283,20 +283,6 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - // For Parquet data source, `buildReader` already handles partition values appending. Here we - // simply delegate to `buildReader`. - buildReader( - sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } - - override def buildReader( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, |