aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBogdan Raducanu <bogdan@databricks.com>2017-04-20 18:49:39 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-04-20 18:49:39 +0200
commitc5a31d160f47ba51bb9f8a4f3141851034640fc7 (patch)
treead4eb98ddd4b85baa4f94eb95f3cd898c9dff33a
parentb91873db0930c6fe885c27936e1243d5fabd03ed (diff)
downloadspark-c5a31d160f47ba51bb9f8a4f3141851034640fc7.tar.gz
spark-c5a31d160f47ba51bb9f8a4f3141851034640fc7.tar.bz2
spark-c5a31d160f47ba51bb9f8a4f3141851034640fc7.zip
[SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test
## What changes were proposed in this pull request? SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually. SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory. ## How was this patch tested? Added new test in ParquetQuerySuite based on the flaky test Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17701 from bogdanrdc/SPARK-20407.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala35
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala13
3 files changed, 60 insertions, 7 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c36609586c..2efff3f57d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ /**
+ * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+ * to increase the chance of failure
+ */
+ ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+ def testIgnoreCorruptFiles(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.parquet(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(
+ df,
+ Seq(Row(0), Row(1)))
+ }
+ }
+
+ for (i <- 1 to 100) {
+ DebugFilesystem.clearOpenStreams()
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val exception = intercept[SparkException] {
+ testIgnoreCorruptFiles()
+ }
+ assert(exception.getMessage().contains("is not a Parquet file"))
+ }
+ DebugFilesystem.assertNoOpenStreams()
+ }
+ }
+
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index b5ad73b746..44c0fc70d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,11 +22,13 @@ import java.net.URI
import java.nio.file.Files
import java.util.{Locale, UUID}
+import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
@@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
*/
private[sql] trait SQLTestUtils
- extends SparkFunSuite
+ extends SparkFunSuite with Eventually
with BeforeAndAfterAll
with SQLTestData { self =>
@@ -139,6 +141,15 @@ private[sql] trait SQLTestUtils
}
/**
+ * Waits for all tasks on all executors to be finished.
+ */
+ protected def waitForTasksToFinish(): Unit = {
+ eventually(timeout(10.seconds)) {
+ assert(spark.sparkContext.statusTracker
+ .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+ }
+ }
+ /**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
@@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally Utils.deleteRecursively(dir)
+ try f(dir) finally {
+ // wait for all tasks to finish before deleting files
+ waitForTasksToFinish()
+ Utils.deleteRecursively(dir)
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e122b39f6f..3d76e05f61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,17 +17,18 @@
package org.apache.spark.sql.test
+import scala.concurrent.duration._
+
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
protected val sparkConf = new SparkConf()
@@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
protected override def afterEach(): Unit = {
super.afterEach()
- DebugFilesystem.assertNoOpenStreams()
+ // files can be closed from other threads, so wait a bit
+ // normally this doesn't take more than 1s
+ eventually(timeout(10.seconds)) {
+ DebugFilesystem.assertNoOpenStreams()
+ }
}
}