aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala77
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala6
3 files changed, 48 insertions, 39 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index e678cf6f22..4f771caa1d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -93,7 +93,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(path)
// Check if this is compressed as ZLIB.
- val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
+ val maybeOrcFile = new File(path).listFiles().find { f =>
+ !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
+ }
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 2446bed58a..d23b66a530 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("SPARK-8578 specified custom output committer will not be used to append data") {
- val extraOptions = Map[String, String](
- SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
- // Since Parquet has its own output committer setting, also set it
- // to AlwaysFailParquetOutputCommitter at here.
- "spark.sql.parquet.output.committer.class" ->
- classOf[AlwaysFailParquetOutputCommitter].getName
- )
+ withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+ val extraOptions = Map[String, String](
+ SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ "spark.sql.parquet.output.committer.class" ->
+ classOf[AlwaysFailParquetOutputCommitter].getName
+ )
- val df = spark.range(1, 10).toDF("i")
- withTempPath { dir =>
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- // Because there data already exists,
- // this append should succeed because we will use the output committer associated
- // with file format and AlwaysFailOutputCommitter will not be used.
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- checkAnswer(
- spark.read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .options(extraOptions)
- .load(dir.getCanonicalPath),
- df.union(df))
-
- // This will fail because AlwaysFailOutputCommitter is used when we do append.
- intercept[Exception] {
- df.write.mode("overwrite")
- .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+ val df = spark.range(1, 10).toDF("i")
+ withTempPath { dir =>
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ // Because there data already exists,
+ // this append should succeed because we will use the output committer associated
+ // with file format and AlwaysFailOutputCommitter will not be used.
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ spark.read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .load(dir.getCanonicalPath),
+ df.union(df))
+
+ // This will fail because AlwaysFailOutputCommitter is used when we do append.
+ intercept[Exception] {
+ df.write.mode("overwrite")
+ .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+ }
}
- }
- withTempPath { dir =>
- // Because there is no existing data,
- // this append will fail because AlwaysFailOutputCommitter is used when we do append
- // and there is no existing data.
- intercept[Exception] {
- df.write.mode("append")
- .options(extraOptions)
- .format(dataSourceName)
- .save(dir.getCanonicalPath)
+ withTempPath { dir =>
+ // Because there is no existing data,
+ // this append will fail because AlwaysFailOutputCommitter is used when we do append
+ // and there is no existing data.
+ intercept[Exception] {
+ df.write.mode("append")
+ .options(extraOptions)
+ .format(dataSourceName)
+ .save(dir.getCanonicalPath)
+ }
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 8aa018d0a9..03207ab869 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
- withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+ withSQLConf(
+ ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
+ SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = spark.range(0, 5).toDF()