diff options
author | Eric Liang <ekl@databricks.com> | 2016-12-02 21:59:02 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-12-02 21:59:02 +0800 |
commit | 7935c8470c5c162ef7213e394fe8588e5dd42ca2 (patch) | |
tree | 0b35862d5113a400a2464c22282f6040b6bcc0d5 | |
parent | 55d528f2ba0ba689dbb881616d9436dc7958e943 (diff) | |
download | spark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.tar.gz spark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.tar.bz2 spark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.zip |
[SPARK-18659][SQL] Incorrect behaviors in overwrite table for datasource tables
## What changes were proposed in this pull request?
Two bugs are addressed here
1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management was enabled. This was because when dropping partitions after an overwrite operation, the Hive client will attempt to delete the partition files. If the entire partition directory was dropped, this would fail. The PR fixes this by adding a flag to control whether the Hive client should attempt to delete files.
2. The static partition spec for OVERWRITE TABLE was not correctly resolved to the case-sensitive original partition names. This resulted in the entire table being overwritten if you did not correctly capitalize your partition names.
cc yhuai cloud-fan
## How was this patch tested?
Unit tests. Surprisingly, the existing overwrite table tests did not catch these edge cases.
Author: Eric Liang <ekl@databricks.com>
Closes #16088 from ericl/spark-18659.
14 files changed, 110 insertions, 37 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 14dd707fa0..259008f183 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -154,7 +154,8 @@ abstract class ExternalCatalog { table: String, parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit + purge: Boolean, + retainData: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index a3ffeaa63f..880a7a0dc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -385,7 +385,8 @@ class InMemoryCatalog( table: String, partSpecs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = synchronized { + purge: Boolean, + retainData: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { @@ -395,7 +396,12 @@ class InMemoryCatalog( } } - val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED + val shouldRemovePartitionLocation = if (retainData) { + false + } else { + getTable(db, table).tableType == CatalogTableType.MANAGED + } + // TODO: we should follow hive to roll back if one partition path failed to delete, and support // partial partition spec. partSpecs.foreach { p => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0b6a91fff7..da3a2079f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -687,13 +687,14 @@ class SessionCatalog( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = { + purge: Boolean, + retainData: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) - externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge) + externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 303a8662d3..3b39f420af 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false, + retainData = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } @@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false) + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false) } intercept[AnalysisException] { catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false) + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false) } } @@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false, + retainData = false) } catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false) } test("get partition") { @@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(exists(tableLocation, "partCol1=5", "partCol2=6")) catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, - purge = false) + purge = false, retainData = false) assert(!exists(tableLocation, "partCol1=3", "partCol2=4")) assert(!exists(tableLocation, "partCol1=5", "partCol2=6")) @@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val fs = partPath.getFileSystem(new Configuration) assert(fs.exists(partPath)) - catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false) + catalog.dropPartitions( + "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(fs.exists(partPath)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 3f27160d63..f9c4b2687b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2)) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") @@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( @@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } intercept[NoSuchTableException] { catalog.dropPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } } @@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true, - purge = false) + purge = false, + retainData = false) } test("drop partitions with invalid partition spec") { @@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(partWithMoreColumns.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, b, c) must be contained within " + @@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(partWithUnknownColumns.spec), ignoreIfNotExists = false, - purge = false) + purge = false, + retainData = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, unknown) must be contained within " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ffd6b0146b..4400174e92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -843,8 +843,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), - ctx.EXISTS != null, - ctx.PURGE != null) + ifExists = ctx.EXISTS != null, + purge = ctx.PURGE != null, + retainData = false) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0f126d0200..c62c14200c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], ifExists: Boolean, - purge: Boolean) + purge: Boolean, + retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand( } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) + table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f3d92bf7cc..4468dc58e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( l.catalogTable.get.identifier, deletedPartitions.toSeq, - ifExists = true, purge = true).run(t.sparkSession) + ifExists = true, purge = false, + retainData = true /* already deleted */).run(t.sparkSession) } } } t.location.refresh() } + val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) { + overwrite.staticPartitionKeys.map { case (k, v) => + (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v) + } + } else { + Map.empty + } + val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, - if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty, + staticPartitionKeys, customPartitionLocations, partitionSchema, t.bucketSpec, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index d31e7aeb3a..5ef5f8ee77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest { Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), ifExists = true, - purge = false) + purge = false, + retainData = false) val expected2_table = expected1_table.copy(ifExists = false) val expected1_purge = expected1_table.copy(purge = true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 065883234a..c213e8e0b2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withClient { + purge: Boolean, + retainData: Boolean): Unit = withClient { requireTableExists(db, table) - client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge) + client.dropPartitions( + db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 569a9c1139..4c76932b61 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -125,7 +125,8 @@ private[hive] trait HiveClient { table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit + purge: Boolean, + retainData: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 590029a517..bd840af5b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -453,7 +453,8 @@ private[hive] class HiveClientImpl( table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, - purge: Boolean): Unit = withHiveState { + purge: Boolean, + retainData: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) // do the check at first and collect all the matching partitions @@ -473,8 +474,7 @@ private[hive] class HiveClientImpl( var droppedParts = ArrayBuffer.empty[java.util.List[String]] matchingParts.foreach { partition => try { - val deleteData = true - shim.dropPartition(client, db, table, partition, deleteData, purge) + shim.dropPartition(client, db, table, partition, !retainData, purge) } catch { case e: Exception => val remainingParts = matchingParts.toBuffer -- droppedParts diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index e8e4238d1c..c2ac032760 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite } } } + + test(s"SPARK-18659 insert overwrite table files - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + spark.sql("insert overwrite table test select id, id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 1) + + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + spark.sql( + "insert overwrite table test partition (A, B) select id, id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 1) + } + } + } + + test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'x' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + // note that 'A', 'B' are lowercase instead of their original case here + spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)") + assert(spark.sql("select * from test").count() == 10) + } + } + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 081b0ed9bd..16ae345de6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging { // with a version that is older than the minimum (1.2 in this case). try { client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = true) + purge = true, retainData = false) assert(!versionsWithoutPurge.contains(version)) } catch { case _: UnsupportedOperationException => assert(versionsWithoutPurge.contains(version)) client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = false) + purge = false, retainData = false) } assert(client.getPartitionOption("default", "src_part", spec).isEmpty) |