aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-26 21:14:26 -0700
committerYin Huai <yhuai@databricks.com>2015-10-26 21:14:26 -0700
commita150e6c1b03b64a35855b8074b2fe077a6081a34 (patch)
tree083e721011328610068bedb616368dd236212376
parentdc3220ce11c7513b1452c82ee82cb86e908bcc2d (diff)
downloadspark-a150e6c1b03b64a35855b8074b2fe077a6081a34.tar.gz
spark-a150e6c1b03b64a35855b8074b2fe077a6081a34.tar.bz2
spark-a150e6c1b03b64a35855b8074b2fe077a6081a34.zip
[SPARK-10562] [SQL] support mixed case partitionBy column names for tables stored in metastore
https://issues.apache.org/jira/browse/SPARK-10562 Author: Wenchen Fan <wenchen@databricks.com> Closes #9226 from cloud-fan/par.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala61
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala11
3 files changed, 54 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index fdb576bedb..f4d45714fa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -143,6 +143,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
+ def partColsFromParts: Option[Seq[String]] = {
+ table.properties.get("spark.sql.sources.schema.numPartCols").map { numPartCols =>
+ (0 until numPartCols.toInt).map { index =>
+ val partCol = table.properties.get(s"spark.sql.sources.schema.partCol.$index").orNull
+ if (partCol == null) {
+ throw new AnalysisException(
+ "Could not read partitioned columns from the metastore because it is corrupted " +
+ s"(missing part $index of the it, $numPartCols parts are expected).")
+ }
+
+ partCol
+ }
+ }
+ }
+
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using spark.sql.sources.schema any more, we need to still support.
@@ -155,7 +170,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// We only need names at here since userSpecifiedSchema we loaded from the metastore
// contains partition columns. We can always get datatypes of partitioning columns
// from userSpecifiedSchema.
- val partitionColumns = table.partitionColumns.map(_.name)
+ val partitionColumns = partColsFromParts.getOrElse(Nil)
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
@@ -218,25 +233,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
- val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
- val fields = partitionColumns.map(col => schema(col))
- fields.map { field =>
- HiveColumn(
- name = field.name,
- hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
- comment = "")
- }.toSeq
- }.getOrElse {
- if (partitionColumns.length > 0) {
- // The table does not have a specified schema, which means that the schema will be inferred
- // when we load the table. So, we are not expecting partition columns and we will discover
- // partitions when we load the table. However, if there are specified partition columns,
- // we simply ignore them and provide a warning message.
- logWarning(
- s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
- s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
+ if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+ tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+ partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
}
- Seq.empty[HiveColumn]
+ }
+
+ if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
+ // The table does not have a specified schema, which means that the schema will be inferred
+ // when we load the table. So, we are not expecting partition columns and we will discover
+ // partitions when we load the table. However, if there are specified partition columns,
+ // we simply ignore them and provide a warning message.
+ logWarning(
+ s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
+ s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}
val tableType = if (isExternal) {
@@ -255,8 +266,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = Seq.empty,
- partitionColumns = metastorePartitionColumns,
+ schema = Nil,
+ partitionColumns = Nil,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options)
@@ -272,14 +283,14 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
- val partitionColumns = schemaToHiveColumn(relation.partitionColumns)
- val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains)
+ assert(partitionColumns.isEmpty)
+ assert(relation.partitionColumns.isEmpty)
HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = dataColumns,
- partitionColumns = partitionColumns,
+ schema = schemaToHiveColumn(relation.schema),
+ partitionColumns = Nil,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d292887688..f74eb1500b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -753,10 +753,15 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
invalidateTable(tableName)
val metastoreTable = catalog.client.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
+
+ val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
+ assert(numPartCols == 2)
+
val actualPartitionColumns =
StructType(
- metastoreTable.partitionColumns.map(c =>
- StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
+ (0 until numPartCols).map { index =>
+ df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index"))
+ })
// Make sure partition columns are correctly stored in metastore.
assert(
expectedPartitionColumns.sameType(actualPartitionColumns),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 396150be76..fd380641dc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1410,4 +1410,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}
+
+ test("SPARK-10562: partition by column with mixed case name") {
+ withTable("tbl10562") {
+ val df = Seq(2012 -> "a").toDF("Year", "val")
+ df.write.partitionBy("Year").saveAsTable("tbl10562")
+ checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012))
+ checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012))
+ checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil)
+ checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a"))
+ }
+ }
}