aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala35
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala4
5 files changed, 46 insertions, 8 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 065883234a..c213e8e0b2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = withClient {
+ purge: Boolean,
+ retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
+ client.dropPartitions(
+ db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
}
override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 569a9c1139..4c76932b61 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -125,7 +125,8 @@ private[hive] trait HiveClient {
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit
+ purge: Boolean,
+ retainData: Boolean): Unit
/**
* Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 590029a517..bd840af5b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -453,7 +453,8 @@ private[hive] class HiveClientImpl(
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = withHiveState {
+ purge: Boolean,
+ retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
@@ -473,8 +474,7 @@ private[hive] class HiveClientImpl(
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
try {
- val deleteData = true
- shim.dropPartition(client, db, table, partition, deleteData, purge)
+ shim.dropPartition(client, db, table, partition, !retainData, purge)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index e8e4238d1c..c2ac032760 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite
}
}
}
+
+ test(s"SPARK-18659 insert overwrite table files - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ spark.sql("insert overwrite table test select id, id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 1)
+
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ spark.sql(
+ "insert overwrite table test partition (A, B) select id, id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 1)
+ }
+ }
+ }
+
+ test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ // note that 'A', 'B' are lowercase instead of their original case here
+ spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 10)
+ }
+ }
+ }
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 081b0ed9bd..16ae345de6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
// with a version that is older than the minimum (1.2 in this case).
try {
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
- purge = true)
+ purge = true, retainData = false)
assert(!versionsWithoutPurge.contains(version))
} catch {
case _: UnsupportedOperationException =>
assert(versionsWithoutPurge.contains(version))
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
- purge = false)
+ purge = false, retainData = false)
}
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)