diff options
author | windpiger <songjun@outlook.com> | 2017-01-27 17:17:17 -0800 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2017-01-27 17:17:17 -0800 |
commit | 1b5ee2003c368d18a5f8c17c2a869ef5770c60a1 (patch) | |
tree | d73dcb492c9e0b14e868a68eff748a498af5ecc9 /sql/hive | |
parent | bb1a1fe05e293c480c88123d4c83a6b8c25f6e2e (diff) | |
download | spark-1b5ee2003c368d18a5f8c17c2a869ef5770c60a1.tar.gz spark-1b5ee2003c368d18a5f8c17c2a869ef5770c60a1.tar.bz2 spark-1b5ee2003c368d18a5f8c17c2a869ef5770c60a1.zip |
[SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog
## What changes were proposed in this pull request?
Hive metastore is not case preserving and keep partition columns with lower case names.
If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.
while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes #16700 from windpiger/clearUselessPathAfterRenamPartition.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 35 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala | 36 |
2 files changed, 71 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 208c8c9d5d..5be991b471 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -839,6 +839,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v } } + + /** + * The partition path created by Hive is in lowercase, while Spark SQL will + * rename it with the partition name in partitionColumnNames, and this function + * returns the extra lowercase path created by Hive, and then we can delete it. + * e.g. /path/A=1/B=2/C=3 is changed to /path/A=4/B=5/C=6, this function returns + * /path/a=4 + */ + def getExtraPartPathCreatedByHive( + spec: TablePartitionSpec, + partitionColumnNames: Seq[String], + tablePath: Path): Path = { + val partColumnNames = partitionColumnNames + .take(partitionColumnNames.indexWhere(col => col.toLowerCase != col) + 1) + .map(_.toLowerCase) + + ExternalCatalogUtils.generatePartitionPath(lowerCasePartitionSpec(spec), + partColumnNames, tablePath) + } + override def createPartitions( db: String, table: String, @@ -899,6 +919,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat spec, partitionColumnNames, tablePath) try { tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath) + + // If the newSpec contains more than one depth partition, FileSystem.rename just deletes + // the leaf(i.e. wrongPath), we should check if wrongPath's parents need to be deleted. + // For example, give a newSpec 'A=1/B=2', after calling Hive's client.renamePartitions, + // the location path in FileSystem is changed to 'a=1/b=2', which is wrongPath, then + // although we renamed it to 'A=1/B=2', 'a=1/b=2' in FileSystem is deleted, but 'a=1' + // is still exists, which we also need to delete + val delHivePartPathAfterRename = getExtraPartPathCreatedByHive( + spec, + partitionColumnNames, + tablePath) + + if (delHivePartPathAfterRename != wrongPath) { + tablePath.getFileSystem(hadoopConf).delete(delHivePartPathAfterRename, true) + } } catch { case e: IOException => throw new SparkException( s"Unable to rename partition path from $wrongPath to $rightPath", e) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index dca207a72d..1214a92b76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive import java.io.File +import org.apache.hadoop.fs.Path + import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -481,4 +484,37 @@ class PartitionProviderCompatibilitySuite assert(spark.sql("show partitions test").count() == 5) } } + + test("partition path created by Hive should be deleted after renamePartitions with upper-case") { + withTable("t", "t1", "t2") { + Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t") + spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)") + + var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + var extraHivePath = new Path(table.location + "/a=4") + assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf()) + .exists(extraHivePath), "partition path created by Hive should be deleted " + + "after renamePartitions with upper-case") + + Seq((1, 2, 3, 4)).toDF("id", "A", "B", "C").write.partitionBy("A", "B", "C").saveAsTable("t1") + spark.sql("alter table t1 partition(A=2, B=3, C=4) rename to partition(A=5, B=6, C=7)") + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + extraHivePath = new Path(table.location + "/a=5") + assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf()) + .exists(extraHivePath), "partition path created by Hive should be deleted " + + "after renamePartitions with upper-case") + + Seq((1, 2, 3, 4)).toDF("id", "a", "B", "C").write.partitionBy("a", "B", "C").saveAsTable("t2") + spark.sql("alter table t2 partition(a=2, B=3, C=4) rename to partition(a=4, B=5, C=6)") + table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + val partPath = new Path(table.location + "/a=4") + assert(partPath.getFileSystem(spark.sessionState.newHadoopConf()) + .exists(partPath), "partition path of lower-case partition name should not be deleted") + + extraHivePath = new Path(table.location + "/a=4/b=5") + assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf()) + .exists(extraHivePath), "partition path created by Hive should be deleted " + + "after renamePartitions with upper-case") + } + } } |