aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-05 14:11:05 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-05 14:11:05 +0800
commit30345c43b7d17bb00184b60a547225bae8ee78e7 (patch)
treef7bdd72e3f720c5e8491803f9246bb140c79baa7 /sql/hive/src/test/scala/org
parent6873430cb5ec0096991a02d4e01266945e79ebb9 (diff)
downloadspark-30345c43b7d17bb00184b60a547225bae8ee78e7.tar.gz
spark-30345c43b7d17bb00184b60a547225bae8ee78e7.tar.bz2
spark-30345c43b7d17bb00184b60a547225bae8ee78e7.zip
[SPARK-19058][SQL] fix partition related behaviors with DataFrameWriter.saveAsTable
## What changes were proposed in this pull request? When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues: 1. doesn't work when the partition has custom location. 2. will recover all partitions This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it. ## How was this patch tested? newly added regression tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16460 from cloud-fan/append.
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala40
1 files changed, 36 insertions, 4 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index f88fc4a2ce..44233cfbf0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.util.Utils
class PartitionProviderCompatibilitySuite
extends QueryTest with TestHiveSingleton with SQLTestUtils {
+ import testImplicits._
private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
@@ -195,12 +196,30 @@ class PartitionProviderCompatibilitySuite
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
if (enabled) {
- spark.sql("msck repair table test")
+ assert(spark.table("test").count() == 0)
+ } else {
+ assert(spark.table("test").count() == 5)
}
- assert(spark.sql("select * from test").count() == 5)
- spark.range(10).selectExpr("id as fieldOne", "id as partCol")
+ // Table `test` has 5 partitions, from `partCol=0` to `partCol=4`, which are invisible
+ // because we have not run `REPAIR TABLE` yet. Here we add 10 more partitions from
+ // `partCol=3` to `partCol=12`, to test the following behaviors:
+ // 1. invisible partitions are still invisible if they are not overwritten.
+ // 2. invisible partitions become visible if they are overwritten.
+ // 3. newly added partitions should be visible.
+ spark.range(3, 13).selectExpr("id as fieldOne", "id as partCol")
.write.partitionBy("partCol").mode("append").saveAsTable("test")
- assert(spark.sql("select * from test").count() == 15)
+
+ if (enabled) {
+ // Only the newly written partitions are visible, which means the partitions
+ // `partCol=0`, `partCol=1` and `partCol=2` are still invisible, so we can only see
+ // 5 + 10 - 3 = 12 records.
+ assert(spark.table("test").count() == 12)
+ // Repair the table to make all partitions visible.
+ sql("msck repair table test")
+ assert(spark.table("test").count() == 15)
+ } else {
+ assert(spark.table("test").count() == 15)
+ }
}
}
}
@@ -449,4 +468,17 @@ class PartitionProviderCompatibilitySuite
assert(spark.sql("show partitions test").count() == 5)
}
}
+
+ test("append data with DataFrameWriter") {
+ testCustomLocations {
+ val df = Seq((1L, 0, 0), (2L, 0, 0)).toDF("id", "P1", "P2")
+ df.write.partitionBy("P1", "P2").mode("append").saveAsTable("test")
+ assert(spark.sql("select * from test").count() == 2)
+ assert(spark.sql("show partitions test").count() == 4)
+ val df2 = Seq((3L, 2, 2)).toDF("id", "P1", "P2")
+ df2.write.partitionBy("P1", "P2").mode("append").saveAsTable("test")
+ assert(spark.sql("select * from test").count() == 3)
+ assert(spark.sql("show partitions test").count() == 5)
+ }
+ }
}