aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2017-01-16 15:26:41 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-16 15:26:41 +0800
commit61e48f52d1d8c7431707bd3511b6fe9f0ae996c0 (patch)
tree4d44623793560ed50771db7e9f3905e60a48f435 /core
parentde62ddf7ff42bdc383da127e6b1155897565354c (diff)
downloadspark-61e48f52d1d8c7431707bd3511b6fe9f0ae996c0.tar.gz
spark-61e48f52d1d8c7431707bd3511b6fe9f0ae996c0.tar.bz2
spark-61e48f52d1d8c7431707bd3511b6fe9f0ae996c0.zip
[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ad1fddbde7..60e383afad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
+import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag