aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-02-08 10:11:44 -0500
committergatorsmile <gatorsmile@gmail.com>2017-02-08 10:11:44 -0500
commit4d4d0de7f64cefbca28dc532b7864de9626aa241 (patch)
treecb18df849f06b90cdaae81c0a89ab0f55c4edb41 /sql/hive
parent0077bfcb93832d93009f73f4b80f2e3d98fd2fa4 (diff)
downloadspark-4d4d0de7f64cefbca28dc532b7864de9626aa241.tar.gz
spark-4d4d0de7f64cefbca28dc532b7864de9626aa241.tar.bz2
spark-4d4d0de7f64cefbca28dc532b7864de9626aa241.zip
[SPARK-19279][SQL][FOLLOW-UP] Infer Schema for Hive Serde Tables
### What changes were proposed in this pull request? `table.schema` is always not empty for partitioned tables, because `table.schema` also contains the partitioned columns, even if the original table does not have any column. This PR is to fix the issue. ### How was this patch tested? Added a test case Author: gatorsmile <gatorsmile@gmail.com> Closes #16848 from gatorsmile/inferHiveSerdeSchema.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala44
2 files changed, 45 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 30abc62803..312ec6776b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -580,7 +580,7 @@ private[spark] object HiveUtils extends Logging {
* CatalogTable.
*/
def inferSchema(table: CatalogTable): CatalogTable = {
- if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
+ if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) {
table
} else {
val hiveTable = toHiveTable(table)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c262095df6..cf1fe2bc70 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.execution.command.CreateTableCommand
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hive.client.HiveClient
@@ -1308,6 +1309,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("Infer schema for Hive serde tables") {
+ val tableName = "tab1"
+ val avroSchema =
+ """{
+ | "name": "test_record",
+ | "type": "record",
+ | "fields": [ {
+ | "name": "f0",
+ | "type": "int"
+ | }]
+ |}
+ """.stripMargin
+
+ Seq(true, false).foreach { isPartitioned =>
+ withTable(tableName) {
+ val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else ""
+ // Creates the (non-)partitioned Avro table
+ val plan = sql(
+ s"""
+ |CREATE TABLE $tableName
+ |$partitionClause
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+ """.stripMargin
+ ).queryExecution.analyzed
+
+ assert(plan.isInstanceOf[CreateTableCommand] &&
+ plan.asInstanceOf[CreateTableCommand].table.dataSchema.nonEmpty)
+
+ if (isPartitioned) {
+ sql(s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1")
+ checkAnswer(spark.table(tableName), Row(1, "a"))
+ } else {
+ sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1")
+ checkAnswer(spark.table(tableName), Row(1))
+ }
+ }
+ }
+ }
+
private def withDebugMode(f: => Unit): Unit = {
val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE)
try {