aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-02-10 11:06:57 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-10 11:06:57 -0800
commitde8a03e68202647555e30fffba551f65bc77608d (patch)
treef529ed7b5fe76475226cef8a99061c0bec235198 /sql/hive
parentdadff5f0789cce7cf3728a8adaab42118e5dc019 (diff)
downloadspark-de8a03e68202647555e30fffba551f65bc77608d.tar.gz
spark-de8a03e68202647555e30fffba551f65bc77608d.tar.bz2
spark-de8a03e68202647555e30fffba551f65bc77608d.zip
[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata
## What changes were proposed in this pull request? Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column. This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see https://github.com/apache/spark/pull/16060 for more details on how the metadata is used. ## How was this patch tested? Added a regression test to `OrcSourceSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16804 from hvanhovell/SPARK-19459.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala37
3 files changed, 39 insertions, 20 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 312ec6776b..13ab4e88e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
- /**
- * The property key that is used to store the raw hive type string in the metadata of StructField.
- * For example, in the case where the Hive type is varchar, the type gets mapped to a string type
- * in Spark SQL, but we need to preserve the original type in order to invoke the correct object
- * inspector in Hive.
- */
- val hiveTypeString: String = "HIVE_TYPE_STRING"
-
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
@@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging {
/** Converts the native StructField to Hive's FieldSchema. */
private def toHiveColumn(c: StructField): FieldSchema = {
- val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
- c.metadata.getString(HiveUtils.hiveTypeString)
+ val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+ c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
@@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging {
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
- val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+ val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bf703a5ab6..f0d01ebfcf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
/**
@@ -790,8 +790,8 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveColumn(c: StructField): FieldSchema = {
- val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
- c.metadata.getString(HiveUtils.hiveTypeString)
+ val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+ c.metadata.getString(HIVE_TYPE_STRING)
} else {
c.dataType.catalogString
}
@@ -806,7 +806,7 @@ private[hive] class HiveClientImpl(
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
- val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+ val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index fe1e17dd08..59ea8916ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -152,14 +152,41 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
}
- test("SPARK-18220: read Hive orc table with varchar column") {
+ test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ val location = Utils.createTempDir()
+ val uri = location.toURI
try {
- hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
- hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
- checkAnswer(spark.table("orc_varchar"), Row("a"))
+ hiveClient.runSqlHive(
+ """
+ |CREATE EXTERNAL TABLE hive_orc(
+ | a STRING,
+ | b CHAR(10),
+ | c VARCHAR(10))
+ |STORED AS orc""".stripMargin)
+ // Hive throws an exception if I assign the location in the create table statement.
+ hiveClient.runSqlHive(
+ s"ALTER TABLE hive_orc SET LOCATION '$uri'")
+ hiveClient.runSqlHive(
+ "INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t")
+
+ // We create a different table in Spark using the same schema which points to
+ // the same location.
+ spark.sql(
+ s"""
+ |CREATE EXTERNAL TABLE spark_orc(
+ | a STRING,
+ | b CHAR(10),
+ | c VARCHAR(10))
+ |STORED AS orc
+ |LOCATION '$uri'""".stripMargin)
+ val result = Row("a", "b ", "c")
+ checkAnswer(spark.table("hive_orc"), result)
+ checkAnswer(spark.table("spark_orc"), result)
} finally {
- hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
+ hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
+ hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
+ Utils.deleteRecursively(location)
}
}
}