aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala21
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala35
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala4
14 files changed, 110 insertions, 37 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 14dd707fa0..259008f183 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -154,7 +154,8 @@ abstract class ExternalCatalog {
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit
+ purge: Boolean,
+ retainData: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a3ffeaa63f..880a7a0dc4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -385,7 +385,8 @@ class InMemoryCatalog(
table: String,
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = synchronized {
+ purge: Boolean,
+ retainData: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
@@ -395,7 +396,12 @@ class InMemoryCatalog(
}
}
- val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+ val shouldRemovePartitionLocation = if (retainData) {
+ false
+ } else {
+ getTable(db, table).tableType == CatalogTableType.MANAGED
+ }
+
// TODO: we should follow hive to roll back if one partition path failed to delete, and support
// partial partition spec.
partSpecs.foreach { p =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0b6a91fff7..da3a2079f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,13 +687,14 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = {
+ purge: Boolean,
+ retainData: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
- externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 303a8662d3..3b39f420af 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions(
- "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
resetState()
val catalog2 = newBasicCatalog()
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions(
- "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false,
+ retainData = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
@@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
+ "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
+ "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
}
@@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false)
}
test("get partition") {
@@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
- purge = false)
+ purge = false, retainData = false)
assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
@@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val fs = partPath.getFileSystem(new Configuration)
assert(fs.exists(partPath))
- catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+ catalog.dropPartitions(
+ "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(fs.exists(partPath))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3f27160d63..f9c4b2687b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
@@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2"),
Seq(part2.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
// Drop multiple partitions at once
sessionCatalog.createPartitions(
@@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec, part2.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
}
@@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
intercept[NoSuchTableException] {
catalog.dropPartitions(
TableIdentifier("does_not_exist", Some("db2")),
Seq(),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
}
@@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = true,
- purge = false)
+ purge = false,
+ retainData = false)
}
test("drop partitions with invalid partition spec") {
@@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithMoreColumns.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
@@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithUnknownColumns.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ffd6b0146b..4400174e92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -843,8 +843,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
- ctx.EXISTS != null,
- ctx.PURGE != null)
+ ifExists = ctx.EXISTS != null,
+ purge = ctx.PURGE != null,
+ retainData = false)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0f126d0200..c62c14200c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
- purge: Boolean)
+ purge: Boolean,
+ retainData: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand(
}
catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+ table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
+ retainData = retainData)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f3d92bf7cc..4468dc58e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
l.catalogTable.get.identifier, deletedPartitions.toSeq,
- ifExists = true, purge = true).run(t.sparkSession)
+ ifExists = true, purge = false,
+ retainData = true /* already deleted */).run(t.sparkSession)
}
}
}
t.location.refresh()
}
+ val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
+ overwrite.staticPartitionKeys.map { case (k, v) =>
+ (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
+ }
+ } else {
+ Map.empty
+ }
+
val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
- if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
+ staticPartitionKeys,
customPartitionLocations,
partitionSchema,
t.bucketSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d31e7aeb3a..5ef5f8ee77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest {
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
- purge = false)
+ purge = false,
+ retainData = false)
val expected2_table = expected1_table.copy(ifExists = false)
val expected1_purge = expected1_table.copy(purge = true)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 065883234a..c213e8e0b2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = withClient {
+ purge: Boolean,
+ retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
+ client.dropPartitions(
+ db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
}
override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 569a9c1139..4c76932b61 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -125,7 +125,8 @@ private[hive] trait HiveClient {
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit
+ purge: Boolean,
+ retainData: Boolean): Unit
/**
* Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 590029a517..bd840af5b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -453,7 +453,8 @@ private[hive] class HiveClientImpl(
table: String,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = withHiveState {
+ purge: Boolean,
+ retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
@@ -473,8 +474,7 @@ private[hive] class HiveClientImpl(
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
try {
- val deleteData = true
- shim.dropPartition(client, db, table, partition, deleteData, purge)
+ shim.dropPartition(client, db, table, partition, !retainData, purge)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index e8e4238d1c..c2ac032760 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite
}
}
}
+
+ test(s"SPARK-18659 insert overwrite table files - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ spark.sql("insert overwrite table test select id, id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 1)
+
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ spark.sql(
+ "insert overwrite table test partition (A, B) select id, id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 1)
+ }
+ }
+ }
+
+ test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ spark.range(10)
+ .selectExpr("id", "id as A", "'x' as B")
+ .write.partitionBy("A", "B").mode("overwrite")
+ .saveAsTable("test")
+ // note that 'A', 'B' are lowercase instead of their original case here
+ spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)")
+ assert(spark.sql("select * from test").count() == 10)
+ }
+ }
+ }
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 081b0ed9bd..16ae345de6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
// with a version that is older than the minimum (1.2 in this case).
try {
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
- purge = true)
+ purge = true, retainData = false)
assert(!versionsWithoutPurge.contains(version))
} catch {
case _: UnsupportedOperationException =>
assert(versionsWithoutPurge.contains(version))
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
- purge = false)
+ purge = false, retainData = false)
}
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)