aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala35
1 files changed, 34 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c36609586c..2efff3f57d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ /**
+ * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+ * to increase the chance of failure
+ */
+ ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+ def testIgnoreCorruptFiles(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.parquet(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(
+ df,
+ Seq(Row(0), Row(1)))
+ }
+ }
+
+ for (i <- 1 to 100) {
+ DebugFilesystem.clearOpenStreams()
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val exception = intercept[SparkException] {
+ testIgnoreCorruptFiles()
+ }
+ assert(exception.getMessage().contains("is not a Parquet file"))
+ }
+ DebugFilesystem.assertNoOpenStreams()
+ }
+ }
+
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath