aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-11-10 13:42:48 -0800
committerReynold Xin <rxin@databricks.com>2016-11-10 13:42:48 -0800
commit2f7461f31331cfc37f6cfa3586b7bbefb3af5547 (patch)
treeaf2f567805b8b53c20f7a362243816028468f0df
parentb533fa2b205544b42dcebe0a6fee9d8275f6da7d (diff)
downloadspark-2f7461f31331cfc37f6cfa3586b7bbefb3af5547.tar.gz
spark-2f7461f31331cfc37f6cfa3586b7bbefb3af5547.tar.bz2
spark-2f7461f31331cfc37f6cfa3586b7bbefb3af5547.zip
[SPARK-17990][SPARK-18302][SQL] correct several partition related behaviours of ExternalCatalog
## What changes were proposed in this pull request? This PR corrects several partition related behaviors of `ExternalCatalog`: 1. default partition location should not always lower case the partition column names in path string(fix `HiveExternalCatalog`) 2. rename partition should not always lower case the partition column names in updated partition path string(fix `HiveExternalCatalog`) 3. rename partition should update the partition location only for managed table(fix `InMemoryCatalog`) 4. create partition with existing directory should be fine(fix `InMemoryCatalog`) 5. create partition with non-existing directory should create that directory(fix `InMemoryCatalog`) 6. drop partition from external table should not delete the directory(fix `InMemoryCatalog`) ## How was this patch tested? new tests in `ExternalCatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15797 from cloud-fan/partition.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala121
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala92
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala150
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala94
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala51
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
19 files changed, 397 insertions, 208 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
new file mode 100644
index 0000000000..b1442eec16
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+
+object ExternalCatalogUtils {
+ // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
+ // depend on Hive.
+ val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val charToEscape = {
+ val bitSet = new java.util.BitSet(128)
+
+ /**
+ * ASCII 01-1F are HTTP control characters that need to be escaped.
+ * \u000A and \u000D are \n and \r, respectively.
+ */
+ val clist = Array(
+ '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+ '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+ '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+ '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+ '{', '[', ']', '^')
+
+ clist.foreach(bitSet.set(_))
+
+ if (Shell.WINDOWS) {
+ Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+ }
+
+ bitSet
+ }
+
+ def needsEscaping(c: Char): Boolean = {
+ c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+ }
+
+ def escapePathName(path: String): String = {
+ val builder = new StringBuilder()
+ path.foreach { c =>
+ if (needsEscaping(c)) {
+ builder.append('%')
+ builder.append(f"${c.asInstanceOf[Int]}%02X")
+ } else {
+ builder.append(c)
+ }
+ }
+
+ builder.toString()
+ }
+
+
+ def unescapePathName(path: String): String = {
+ val sb = new StringBuilder
+ var i = 0
+
+ while (i < path.length) {
+ val c = path.charAt(i)
+ if (c == '%' && i + 2 < path.length) {
+ val code: Int = try {
+ Integer.parseInt(path.substring(i + 1, i + 3), 16)
+ } catch {
+ case _: Exception => -1
+ }
+ if (code >= 0) {
+ sb.append(code.asInstanceOf[Char])
+ i += 3
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ }
+
+ sb.toString()
+ }
+
+ def generatePartitionPath(
+ spec: TablePartitionSpec,
+ partitionColumnNames: Seq[String],
+ tablePath: Path): Path = {
+ val partitionPathStrings = partitionColumnNames.map { col =>
+ val partitionValue = spec(col)
+ val partitionString = if (partitionValue == null) {
+ DEFAULT_PARTITION_NAME
+ } else {
+ escapePathName(partitionValue)
+ }
+ escapePathName(col) + "=" + partitionString
+ }
+ partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
+ new Path(totalPath, nextPartPath)
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 20db81e6f9..a3ffeaa63f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -231,7 +231,7 @@ class InMemoryCatalog(
assert(tableMeta.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
- val dir = new Path(tableMeta.storage.locationUri.get)
+ val dir = new Path(tableMeta.location)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.delete(dir, true)
@@ -259,7 +259,7 @@ class InMemoryCatalog(
assert(oldDesc.table.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
- val oldDir = new Path(oldDesc.table.storage.locationUri.get)
+ val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(catalog(db).db.locationUri, newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
@@ -355,25 +355,28 @@ class InMemoryCatalog(
}
}
- val tableDir = new Path(catalog(db).db.locationUri, table)
- val partitionColumnNames = getTable(db, table).partitionColumnNames
+ val tableMeta = getTable(db, table)
+ val partitionColumnNames = tableMeta.partitionColumnNames
+ val tablePath = new Path(tableMeta.location)
// TODO: we should follow hive to roll back if one partition path failed to create.
parts.foreach { p =>
- // If location is set, the partition is using an external partition location and we don't
- // need to handle its directory.
- if (p.storage.locationUri.isEmpty) {
- val partitionPath = partitionColumnNames.flatMap { col =>
- p.spec.get(col).map(col + "=" + _)
- }.mkString("/")
- try {
- val fs = tableDir.getFileSystem(hadoopConfig)
- fs.mkdirs(new Path(tableDir, partitionPath))
- } catch {
- case e: IOException =>
- throw new SparkException(s"Unable to create partition path $partitionPath", e)
+ val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
+ ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
+ }
+
+ try {
+ val fs = tablePath.getFileSystem(hadoopConfig)
+ if (!fs.exists(partitionPath)) {
+ fs.mkdirs(partitionPath)
}
+ } catch {
+ case e: IOException =>
+ throw new SparkException(s"Unable to create partition path $partitionPath", e)
}
- existingParts.put(p.spec, p)
+
+ existingParts.put(
+ p.spec,
+ p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
}
}
@@ -392,19 +395,15 @@ class InMemoryCatalog(
}
}
- val tableDir = new Path(catalog(db).db.locationUri, table)
- val partitionColumnNames = getTable(db, table).partitionColumnNames
- // TODO: we should follow hive to roll back if one partition path failed to delete.
+ val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+ // TODO: we should follow hive to roll back if one partition path failed to delete, and support
+ // partial partition spec.
partSpecs.foreach { p =>
- // If location is set, the partition is using an external partition location and we don't
- // need to handle its directory.
- if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) {
- val partitionPath = partitionColumnNames.flatMap { col =>
- p.get(col).map(col + "=" + _)
- }.mkString("/")
+ if (existingParts.contains(p) && shouldRemovePartitionLocation) {
+ val partitionPath = new Path(existingParts(p).location)
try {
- val fs = tableDir.getFileSystem(hadoopConfig)
- fs.delete(new Path(tableDir, partitionPath), true)
+ val fs = partitionPath.getFileSystem(hadoopConfig)
+ fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
@@ -423,33 +422,34 @@ class InMemoryCatalog(
requirePartitionsExist(db, table, specs)
requirePartitionsNotExist(db, table, newSpecs)
- val tableDir = new Path(catalog(db).db.locationUri, table)
- val partitionColumnNames = getTable(db, table).partitionColumnNames
+ val tableMeta = getTable(db, table)
+ val partitionColumnNames = tableMeta.partitionColumnNames
+ val tablePath = new Path(tableMeta.location)
+ val shouldUpdatePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+ val existingParts = catalog(db).tables(table).partitions
// TODO: we should follow hive to roll back if one partition path failed to rename.
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
- val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
- val existingParts = catalog(db).tables(table).partitions
-
- // If location is set, the partition is using an external partition location and we don't
- // need to handle its directory.
- if (newPart.storage.locationUri.isEmpty) {
- val oldPath = partitionColumnNames.flatMap { col =>
- oldSpec.get(col).map(col + "=" + _)
- }.mkString("/")
- val newPath = partitionColumnNames.flatMap { col =>
- newSpec.get(col).map(col + "=" + _)
- }.mkString("/")
+ val oldPartition = getPartition(db, table, oldSpec)
+ val newPartition = if (shouldUpdatePartitionLocation) {
+ val oldPartPath = new Path(oldPartition.location)
+ val newPartPath = ExternalCatalogUtils.generatePartitionPath(
+ newSpec, partitionColumnNames, tablePath)
try {
- val fs = tableDir.getFileSystem(hadoopConfig)
- fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
+ val fs = tablePath.getFileSystem(hadoopConfig)
+ fs.rename(oldPartPath, newPartPath)
} catch {
case e: IOException =>
- throw new SparkException(s"Unable to rename partition path $oldPath", e)
+ throw new SparkException(s"Unable to rename partition path $oldPartPath", e)
}
+ oldPartition.copy(
+ spec = newSpec,
+ storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
+ } else {
+ oldPartition.copy(spec = newSpec)
}
existingParts.remove(oldSpec)
- existingParts.put(newSpec, newPart)
+ existingParts.put(newSpec, newPartition)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 34748a0485..93c70de18a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -99,6 +99,12 @@ case class CatalogTablePartition(
output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
}
+ /** Return the partition location, assuming it is specified. */
+ def location: String = storage.locationUri.getOrElse {
+ val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
+ throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
+ }
+
/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
@@ -171,6 +177,11 @@ case class CatalogTable(
throw new AnalysisException(s"table $identifier did not specify database")
}
+ /** Return the table location, assuming it is specified. */
+ def location: String = storage.locationUri.getOrElse {
+ throw new AnalysisException(s"table $identifier did not specify locationUri")
+ }
+
/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = identifier.unquotedString
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 34bdfc8a98..303a8662d3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,9 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
-import java.io.File
-import java.net.URI
-
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
@@ -320,6 +319,33 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
}
+ test("create partitions without location") {
+ val catalog = newBasicCatalog()
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("db1")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("partCol1", "int")
+ .add("partCol2", "string"),
+ provider = Some("hive"),
+ partitionColumnNames = Seq("partCol1", "partCol2"))
+ catalog.createTable(table, ignoreIfExists = false)
+
+ val partition = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+ catalog.createPartitions("db1", "tbl", Seq(partition), ignoreIfExists = false)
+
+ val partitionLocation = catalog.getPartition(
+ "db1",
+ "tbl",
+ Map("partCol1" -> "1", "partCol2" -> "2")).location
+ val tableLocation = catalog.getTable("db1", "tbl").location
+ val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
+ assert(new Path(partitionLocation) == defaultPartitionLocation)
+ }
+
test("list partitions with partial partition spec") {
val catalog = newBasicCatalog()
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
@@ -399,6 +425,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
}
+ test("rename partitions should update the location for managed table") {
+ val catalog = newBasicCatalog()
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("db1")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("partCol1", "int")
+ .add("partCol2", "string"),
+ provider = Some("hive"),
+ partitionColumnNames = Seq("partCol1", "partCol2"))
+ catalog.createTable(table, ignoreIfExists = false)
+
+ val tableLocation = catalog.getTable("db1", "tbl").location
+
+ val mixedCasePart1 = CatalogTablePartition(
+ Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+ val mixedCasePart2 = CatalogTablePartition(
+ Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat)
+
+ catalog.createPartitions("db1", "tbl", Seq(mixedCasePart1), ignoreIfExists = false)
+ assert(
+ new Path(catalog.getPartition("db1", "tbl", mixedCasePart1.spec).location) ==
+ new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2"))
+
+ catalog.renamePartitions("db1", "tbl", Seq(mixedCasePart1.spec), Seq(mixedCasePart2.spec))
+ assert(
+ new Path(catalog.getPartition("db1", "tbl", mixedCasePart2.spec).location) ==
+ new Path(new Path(tableLocation, "partCol1=3"), "partCol2=4"))
+
+ // For external tables, RENAME PARTITION should not update the partition location.
+ val existingPartLoc = catalog.getPartition("db2", "tbl2", part1.spec).location
+ catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part3.spec))
+ assert(
+ new Path(catalog.getPartition("db2", "tbl2", part3.spec).location) ==
+ new Path(existingPartLoc))
+ }
+
test("rename partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
@@ -419,11 +485,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("alter partitions") {
val catalog = newBasicCatalog()
try {
- // Note: Before altering table partitions in Hive, you *must* set the current database
- // to the one that contains the table of interest. Otherwise you will end up with the
- // most helpful error message ever: "Unable to alter partition. alter is not possible."
- // See HIVE-2742 for more detail.
- catalog.setCurrentDatabase("db2")
val newLocation = newUriForDatabase()
val newSerde = "com.sparkbricks.text.EasySerde"
val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
@@ -571,10 +632,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// --------------------------------------------------------------------------
private def exists(uri: String, children: String*): Boolean = {
- val base = new File(new URI(uri))
- children.foldLeft(base) {
- case (parent, child) => new File(parent, child)
- }.exists()
+ val base = new Path(uri)
+ val finalPath = children.foldLeft(base) {
+ case (parent, child) => new Path(parent, child)
+ }
+ base.getFileSystem(new Configuration()).exists(finalPath)
}
test("create/drop database should create/delete the directory") {
@@ -623,7 +685,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("create/drop/rename partitions should create/delete/rename the directory") {
val catalog = newBasicCatalog()
- val databaseDir = catalog.getDatabase("db1").locationUri
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
@@ -631,34 +692,61 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
- .add("a", "int")
- .add("b", "string"),
+ .add("partCol1", "int")
+ .add("partCol2", "string"),
provider = Some("hive"),
- partitionColumnNames = Seq("a", "b")
- )
+ partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)
+ val tableLocation = catalog.getTable("db1", "tbl").location
+
+ val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
+ val part2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat)
+ val part3 = CatalogTablePartition(Map("partCol1" -> "5", "partCol2" -> "6"), storageFormat)
+
catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false)
- assert(exists(databaseDir, "tbl", "a=1", "b=2"))
- assert(exists(databaseDir, "tbl", "a=3", "b=4"))
+ assert(exists(tableLocation, "partCol1=1", "partCol2=2"))
+ assert(exists(tableLocation, "partCol1=3", "partCol2=4"))
catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec))
- assert(!exists(databaseDir, "tbl", "a=1", "b=2"))
- assert(exists(databaseDir, "tbl", "a=5", "b=6"))
+ assert(!exists(tableLocation, "partCol1=1", "partCol2=2"))
+ assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
purge = false)
- assert(!exists(databaseDir, "tbl", "a=3", "b=4"))
- assert(!exists(databaseDir, "tbl", "a=5", "b=6"))
+ assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
+ assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
- val externalPartition = CatalogTablePartition(
- Map("a" -> "7", "b" -> "8"),
+ val tempPath = Utils.createTempDir()
+ // create partition with existing directory is OK.
+ val partWithExistingDir = CatalogTablePartition(
+ Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
- Some(Utils.createTempDir().getAbsolutePath),
- None, None, None, false, Map.empty)
- )
- catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false)
- assert(!exists(databaseDir, "tbl", "a=7", "b=8"))
+ Some(tempPath.getAbsolutePath),
+ None, None, None, false, Map.empty))
+ catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)
+
+ tempPath.delete()
+ // create partition with non-existing directory will create that directory.
+ val partWithNonExistingDir = CatalogTablePartition(
+ Map("partCol1" -> "9", "partCol2" -> "10"),
+ CatalogStorageFormat(
+ Some(tempPath.getAbsolutePath),
+ None, None, None, false, Map.empty))
+ catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
+ assert(tempPath.exists())
+ }
+
+ test("drop partition from external table should not delete the directory") {
+ val catalog = newBasicCatalog()
+ catalog.createPartitions("db2", "tbl1", Seq(part1), ignoreIfExists = false)
+
+ val partPath = new Path(catalog.getPartition("db2", "tbl1", part1.spec).location)
+ val fs = partPath.getFileSystem(new Configuration)
+ assert(fs.exists(partPath))
+
+ catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+ assert(fs.exists(partPath))
}
}
@@ -731,7 +819,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
- storage = storageFormat,
+ storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 001d9c4778..52385de50d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -527,13 +527,13 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
sessionCatalog.createPartitions(
TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
- assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
+ assert(catalogPartitionsEqual(externalCatalog.listPartitions("mydb", "tbl"), part1, part2))
// Create partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
sessionCatalog.createPartitions(
TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
assert(catalogPartitionsEqual(
- externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder)))
+ externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder))
}
test("create partitions when database/table does not exist") {
@@ -586,13 +586,13 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop partitions") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+ assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
sessionCatalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec),
ignoreIfNotExists = false,
purge = false)
- assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
+ assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.dropPartitions(
@@ -604,7 +604,7 @@ class SessionCatalogSuite extends SparkFunSuite {
// Drop multiple partitions at once
sessionCatalog.createPartitions(
TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
- assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+ assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
sessionCatalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec, part2.spec),
@@ -844,10 +844,11 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
- assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2))
// List partitions without explicitly specifying database
catalog.setCurrentDatabase("db2")
- assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2))
}
test("list partitions when database/table does not exist") {
@@ -860,6 +861,15 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ private def catalogPartitionsEqual(
+ actualParts: Seq[CatalogTablePartition],
+ expectedParts: CatalogTablePartition*): Boolean = {
+ // ExternalCatalog may set a default location for partitions, here we ignore the partition
+ // location when comparing them.
+ actualParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet ==
+ expectedParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet
+ }
+
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8500ab460a..84a63fdb9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
@@ -500,7 +500,7 @@ case class AlterTableRecoverPartitionsCommand(
s"location provided: $tableIdentWithDB")
}
- val root = new Path(table.storage.locationUri.get)
+ val root = new Path(table.location)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
@@ -558,9 +558,9 @@ case class AlterTableRecoverPartitionsCommand(
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
- val columnName = PartitioningUtils.unescapePathName(ps(0))
+ val columnName = ExternalCatalogUtils.unescapePathName(ps(0))
// TODO: Validate the value
- val value = PartitioningUtils.unescapePathName(ps(1))
+ val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e49a1f5acd..119e732d02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -710,7 +710,8 @@ case class ShowPartitionsCommand(
private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
partColNames.map { name =>
- PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
+ ExternalCatalogUtils.escapePathName(name) + "=" +
+ ExternalCatalogUtils.escapePathName(spec(name))
}.mkString(File.separator)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 443a2ec033..4ad91dcceb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -67,7 +67,7 @@ class CatalogFileIndex(
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
- val path = new Path(p.storage.locationUri.get)
+ val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2d43a6ad09..739aeac877 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -190,7 +190,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
val effectiveOutputPath = if (overwritingSinglePartition) {
val partition = t.sparkSession.sessionState.catalog.getPartition(
l.catalogTable.get.identifier, overwrite.specificPartition.get)
- new Path(partition.storage.locationUri.get)
+ new Path(partition.location)
} else {
outputPath
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index fa7fe143da..69b3fa667e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
@@ -281,11 +281,11 @@ object FileFormatWriter extends Logging {
private def partitionStringExpression: Seq[Expression] = {
description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val escaped = ScalaUDF(
- PartitioningUtils.escapePathName _,
+ ExternalCatalogUtils.escapePathName _,
StringType,
Seq(Cast(c, StringType)),
Seq(StringType))
- val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+ val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index a8a722dd3c..3740caa22c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -128,7 +128,6 @@ abstract class PartitioningAwareFileIndex(
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = false,
basePaths = basePaths)
@@ -148,7 +147,6 @@ abstract class PartitioningAwareFileIndex(
case _ =>
PartitioningUtils.parsePartitions(
leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index b51b41869b..a28b04ca3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.Shell
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
@@ -56,15 +55,15 @@ object PartitionSpec {
}
object PartitioningUtils {
- // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
- // depend on Hive.
- val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
{
require(columnNames.size == literals.size)
}
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
+
/**
* Given a group of qualified paths, tries to parse them and returns a partition specification.
* For example, given:
@@ -90,12 +89,11 @@ object PartitioningUtils {
*/
private[datasources] def parsePartitions(
paths: Seq[Path],
- defaultPartitionName: String,
typeInference: Boolean,
basePaths: Set[Path]): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
- parsePartition(path, defaultPartitionName, typeInference, basePaths)
+ parsePartition(path, typeInference, basePaths)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -173,7 +171,6 @@ object PartitioningUtils {
*/
private[datasources] def parsePartition(
path: Path,
- defaultPartitionName: String,
typeInference: Boolean,
basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
@@ -196,7 +193,7 @@ object PartitioningUtils {
// Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
- parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
+ parsePartitionColumn(currentPath.getName, typeInference)
maybeColumn.foreach(columns += _)
// Now, we determine if we should stop.
@@ -228,7 +225,6 @@ object PartitioningUtils {
private def parsePartitionColumn(
columnSpec: String,
- defaultPartitionName: String,
typeInference: Boolean): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
@@ -240,7 +236,7 @@ object PartitioningUtils {
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
- val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
+ val literal = inferPartitionColumnValue(rawColumnValue, typeInference)
Some(columnName -> literal)
}
}
@@ -355,7 +351,6 @@ object PartitioningUtils {
*/
private[datasources] def inferPartitionColumnValue(
raw: String,
- defaultPartitionName: String,
typeInference: Boolean): Literal = {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
@@ -380,14 +375,14 @@ object PartitioningUtils {
.orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw)))))
// Then falls back to string
.getOrElse {
- if (raw == defaultPartitionName) {
+ if (raw == DEFAULT_PARTITION_NAME) {
Literal.create(null, NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
}
}
} else {
- if (raw == defaultPartitionName) {
+ if (raw == DEFAULT_PARTITION_NAME) {
Literal.create(null, NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
@@ -450,77 +445,4 @@ object PartitioningUtils {
Literal.create(Cast(l, desiredType).eval(), desiredType)
}
}
-
- //////////////////////////////////////////////////////////////////////////////////////////////////
- // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- val charToEscape = {
- val bitSet = new java.util.BitSet(128)
-
- /**
- * ASCII 01-1F are HTTP control characters that need to be escaped.
- * \u000A and \u000D are \n and \r, respectively.
- */
- val clist = Array(
- '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
- '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
- '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
- '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
- '{', '[', ']', '^')
-
- clist.foreach(bitSet.set(_))
-
- if (Shell.WINDOWS) {
- Array(' ', '<', '>', '|').foreach(bitSet.set(_))
- }
-
- bitSet
- }
-
- def needsEscaping(c: Char): Boolean = {
- c >= 0 && c < charToEscape.size() && charToEscape.get(c)
- }
-
- def escapePathName(path: String): String = {
- val builder = new StringBuilder()
- path.foreach { c =>
- if (needsEscaping(c)) {
- builder.append('%')
- builder.append(f"${c.asInstanceOf[Int]}%02X")
- } else {
- builder.append(c)
- }
- }
-
- builder.toString()
- }
-
- def unescapePathName(path: String): String = {
- val sb = new StringBuilder
- var i = 0
-
- while (i < path.length) {
- val c = path.charAt(i)
- if (c == '%' && i + 2 < path.length) {
- val code: Int = try {
- Integer.parseInt(path.substring(i + 1, i + 3), 16)
- } catch {
- case _: Exception => -1
- }
- if (code >= 0) {
- sb.append(code.asInstanceOf[Char])
- i += 3
- } else {
- sb.append(c)
- i += 1
- }
- } else {
- sb.append(c)
- i += 1
- }
- }
-
- sb.toString()
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index df3a3c34c3..363715c6d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -875,7 +875,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
val part2 = Map("a" -> "2", "b" -> "6")
- val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
@@ -1133,7 +1133,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
- assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
// Verify that the location is set to the expected string
def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
@@ -1296,9 +1296,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
"PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
- assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
- assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
// add partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 120a3a2ef3..22e35a1bc0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
@@ -48,11 +49,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
import PartitioningUtils._
import testImplicits._
- val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
+ val defaultPartitionName = ExternalCatalogUtils.DEFAULT_PARTITION_NAME
test("column type inference") {
def check(raw: String, literal: Literal): Unit = {
- assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal)
+ assert(inferPartitionColumnValue(raw, true) === literal)
}
check("10", Literal.create(10, IntegerType))
@@ -76,7 +77,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path])
+ parsePartitions(paths.map(new Path(_)), true, Set.empty[Path])
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -88,7 +89,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/")))
@@ -101,7 +101,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/something=true/table")))
@@ -114,7 +113,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/table=true")))
@@ -127,7 +125,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
exception = intercept[AssertionError] {
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/")))
}
@@ -147,7 +144,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
exception = intercept[AssertionError] {
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/tmp/tables/")))
}
@@ -156,13 +152,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
- val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1
+ val actual = parsePartition(new Path(path), true, Set.empty[Path])._1
assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])
+ parsePartition(new Path(path), true, Set.empty[Path])
}.getMessage
assert(message.contains(expected))
@@ -204,7 +200,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
// when the basePaths is the same as the path to a leaf directory
val partitionSpec1: Option[PartitionValues] = parsePartition(
path = new Path("file://path/a=10"),
- defaultPartitionName = defaultPartitionName,
typeInference = true,
basePaths = Set(new Path("file://path/a=10")))._1
@@ -213,7 +208,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
// when the basePaths is the path to a base directory of leaf directories
val partitionSpec2: Option[PartitionValues] = parsePartition(
path = new Path("file://path/a=10"),
- defaultPartitionName = defaultPartitionName,
typeInference = true,
basePaths = Set(new Path("file://path")))._1
@@ -231,7 +225,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val actualSpec =
parsePartitions(
paths.map(new Path(_)),
- defaultPartitionName,
true,
rootPaths)
assert(actualSpec === spec)
@@ -314,7 +307,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partitions with type inference disabled") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
val actualSpec =
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path])
+ parsePartitions(paths.map(new Path(_)), false, Set.empty[Path])
assert(actualSpec === spec)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b537061d0d..42ce1a88a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
+import java.io.IOException
import java.util
import scala.util.control.NonFatal
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -255,7 +256,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// compatible format, which means the data source is file-based and must have a `path`.
require(tableDefinition.storage.locationUri.isDefined,
"External file-based data source table must have a `path` entry in storage properties.")
- Some(new Path(tableDefinition.storage.locationUri.get).toUri.toString)
+ Some(new Path(tableDefinition.location).toUri.toString)
} else {
None
}
@@ -789,7 +790,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withClient {
requireTableExists(db, table)
- val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+
+ val tableMeta = getTable(db, table)
+ val partitionColumnNames = tableMeta.partitionColumnNames
+ val tablePath = new Path(tableMeta.location)
+ val partsWithLocation = parts.map { p =>
+ // Ideally we can leave the partition location empty and let Hive metastore to set it.
+ // However, Hive metastore is not case preserving and will generate wrong partition location
+ // with lower cased partition column names. Here we set the default partition location
+ // manually to avoid this problem.
+ val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
+ ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
+ }
+ p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))
+ }
+ val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
}
@@ -810,6 +825,31 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
client.renamePartitions(
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
+
+ val tableMeta = getTable(db, table)
+ val partitionColumnNames = tableMeta.partitionColumnNames
+ // Hive metastore is not case preserving and keeps partition columns with lower cased names.
+ // When Hive rename partition for managed tables, it will create the partition location with
+ // a default path generate by the new spec with lower cased partition column names. This is
+ // unexpected and we need to rename them manually and alter the partition location.
+ val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
+ if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
+ val tablePath = new Path(tableMeta.location)
+ val newParts = newSpecs.map { spec =>
+ val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+ val wrongPath = new Path(partition.location)
+ val rightPath = ExternalCatalogUtils.generatePartitionPath(
+ spec, partitionColumnNames, tablePath)
+ try {
+ tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
+ } catch {
+ case e: IOException => throw new SparkException(
+ s"Unable to rename partition path from $wrongPath to $rightPath", e)
+ }
+ partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
+ }
+ alterPartitions(db, table, newParts)
+ }
}
override def alterPartitions(
@@ -817,6 +857,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ // Note: Before altering table partitions in Hive, you *must* set the current database
+ // to the one that contains the table of interest. Otherwise you will end up with the
+ // most helpful error message ever: "Unable to alter partition. alter is not possible."
+ // See HIVE-2742 for more detail.
+ client.setCurrentDatabase(db)
client.alterPartitions(db, table, lowerCasedParts)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d3873cf6c8..fbd705172c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -445,7 +445,7 @@ object SetWarehouseLocationTest extends Logging {
catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
val expectedLocation =
"file:" + expectedWarehouseLocation.toString + "/testlocation"
- val actualLocation = tableMetadata.storage.locationUri.get
+ val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
@@ -461,7 +461,7 @@ object SetWarehouseLocationTest extends Logging {
catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
val expectedLocation =
"file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
- val actualLocation = tableMetadata.storage.locationUri.get
+ val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index cfc1d81d54..9f4401ae22 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
val expectedPath =
spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
- assert(metastoreTable.storage.locationUri.get === expectedPath)
+ assert(metastoreTable.location === expectedPath)
}
private def getTableNames(dbName: Option[String] = None): Array[String] = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 0076a77868..6efae13ddf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -425,7 +425,7 @@ class HiveDDLSuite
sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
val part1 = Map("a" -> "1", "b" -> "5")
val part2 = Map("a" -> "2", "b" -> "6")
- val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c21db3595f..e607af67f9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -542,7 +542,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
userSpecifiedLocation match {
case Some(location) =>
- assert(r.catalogTable.storage.locationUri.get === location)
+ assert(r.catalogTable.location === location)
case None => // OK.
}
// Also make sure that the format and serde are as desired.