aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2017-01-12 17:45:55 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-12 17:45:55 +0800
commitc71b25481aa5f7bc27d5c979e66bed54cd46b97e (patch)
tree16c7a3450f83aa3ea9f43d93a6ade9e9ae7cbc74 /sql/hive/src/test/scala/org
parent5db35b312e96dea07f03100c64b58723c2430cd7 (diff)
downloadspark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.gz
spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.bz2
spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.zip
[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API
## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #16554 from ericl/add-delete-protocol.
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala77
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala6
3 files changed, 48 insertions, 39 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index e678cf6f22..4f771caa1d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -93,7 +93,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(path)
// Check if this is compressed as ZLIB.
- val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
+ val maybeOrcFile = new File(path).listFiles().find { f =>
+ !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
+ }
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 2446bed58a..d23b66a530 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("SPARK-8578 specified custom output committer will not be used to append data") {
- val extraOptions = Map[String, String](
- SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
- // Since Parquet has its own output committer setting, also set it
- // to AlwaysFailParquetOutputCommitter at here.
- "spark.sql.parquet.output.committer.class" ->
- classOf[AlwaysFailParquetOutputCommitter].getName
- )
+ withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+ val extraOptions = Map[String, String](
+ SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ "spark.sql.parquet.output.committer.class" ->
+ classOf[AlwaysFailParquetOutputCommitter].getName
+ )
- val df = spark.range(1, 10).toDF("i")
- withTempPath { dir =>
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- // Because there data already exists,
- // this append should succeed because we will use the output committer associated
- // with file format and AlwaysFailOutputCommitter will not be used.
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- checkAnswer(
- spark.read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .options(extraOptions)
- .load(dir.getCanonicalPath),
- df.union(df))
-
- // This will fail because AlwaysFailOutputCommitter is used when we do append.
- intercept[Exception] {
- df.write.mode("overwrite")
- .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+ val df = spark.range(1, 10).toDF("i")
+ withTempPath { dir =>
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ // Because there data already exists,
+ // this append should succeed because we will use the output committer associated
+ // with file format and AlwaysFailOutputCommitter will not be used.
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ spark.read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .load(dir.getCanonicalPath),
+ df.union(df))
+
+ // This will fail because AlwaysFailOutputCommitter is used when we do append.
+ intercept[Exception] {
+ df.write.mode("overwrite")
+ .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+ }
}
- }
- withTempPath { dir =>
- // Because there is no existing data,
- // this append will fail because AlwaysFailOutputCommitter is used when we do append
- // and there is no existing data.
- intercept[Exception] {
- df.write.mode("append")
- .options(extraOptions)
- .format(dataSourceName)
- .save(dir.getCanonicalPath)
+ withTempPath { dir =>
+ // Because there is no existing data,
+ // this append will fail because AlwaysFailOutputCommitter is used when we do append
+ // and there is no existing data.
+ intercept[Exception] {
+ df.write.mode("append")
+ .options(extraOptions)
+ .format(dataSourceName)
+ .save(dir.getCanonicalPath)
+ }
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 8aa018d0a9..03207ab869 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
- withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+ withSQLConf(
+ ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
+ SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = spark.range(0, 5).toDF()