aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-09-22 13:29:39 -0700
committerYin Huai <yhuai@databricks.com>2015-09-22 13:29:39 -0700
commit2204cdb28483b249616068085d4e88554fe6acef (patch)
tree1789dc690f1fb2076ed5be8e9d81e811503d4421 /sql
parent5017c685f484ec256101d1d33bad11d9e0c0f641 (diff)
downloadspark-2204cdb28483b249616068085d4e88554fe6acef.tar.gz
spark-2204cdb28483b249616068085d4e88554fe6acef.tar.bz2
spark-2204cdb28483b249616068085d4e88554fe6acef.zip
[SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a data source table in a hive compatible way
https://issues.apache.org/jira/browse/SPARK-10672 With changes in this PR, we will fallback to same the metadata of a table in Spark SQL specific way if we fail to save it in a hive compatible way (Hive throws an exception because of its internal restrictions, e.g. binary and decimal types cannot be saved to parquet if the metastore is running Hive 0.13). I manually tested the fix with the following test in `DataSourceWithHiveMetastoreCatalogSuite` (`spark.sql.hive.metastore.version=0.13` and `spark.sql.hive.metastore.jars`=`maven`). ``` test(s"fail to save metadata of a parquet table in hive 0.13") { withTempPath { dir => withTable("t") { val path = dir.getCanonicalPath sql( s"""CREATE TABLE t USING $provider |OPTIONS (path '$path') |AS SELECT 1 AS d1, cast("val_1" as binary) AS d2 """.stripMargin) sql( s"""describe formatted t """.stripMargin).collect.foreach(println) sqlContext.table("t").show } } } } ``` Without this fix, we will fail with the following error. ``` org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException: Unknown field type: binary at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:619) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply$mcV$sp(ClientWrapper.scala:359) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:256) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:211) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:248) at org.apache.spark.sql.hive.client.ClientWrapper.createTable(ClientWrapper.scala:357) at org.apache.spark.sql.hive.HiveMetastoreCatalog.createDataSourceTable(HiveMetastoreCatalog.scala:358) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:285) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:58) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:58) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:129) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:165) at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:150) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:52) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:162) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:125) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTempPath(HiveMetastoreCatalogSuite.scala:52) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.org$scalatest$BeforeAndAfterAll$$super$run(HiveMetastoreCatalogSuite.scala:52) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.run(HiveMetastoreCatalogSuite.scala:52) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671) at sbt.ForkMain$Run$2.call(ForkMain.java:294) at sbt.ForkMain$Run$2.call(ForkMain.java:284) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException: Unknown field type: binary at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:108) at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.<init>(ArrayWritableObjectInspector.java:60) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:339) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288) at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:194) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:597) ... 76 more ``` Author: Yin Huai <yhuai@databricks.com> Closes #8824 from yhuai/datasourceMetadata.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala101
1 files changed, 50 insertions, 51 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0c1b41e337..012634cb5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
// TODO: Support persisting partitioned data source relations in Hive compatible format
- val hiveTable = (maybeSerDe, dataSource.relation) match {
+ val qualifiedTableName = tableIdent.quotedString
+ val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
- if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
- // Hive ParquetSerDe doesn't support decimal type until 1.2.0.
- val isParquetSerDe = serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
- val hasDecimalFields = relation.schema.existsRecursively(_.isInstanceOf[DecimalType])
-
- val hiveParquetSupportsDecimal = client.version match {
- case org.apache.spark.sql.hive.client.hive.v1_2 => true
- case _ => false
- }
-
- if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields) {
- // If Hive version is below 1.2.0, we cannot save Hive compatible schema to
- // metastore when the file format is Parquet and the schema has DecimalType.
- logWarning {
- "Persisting Parquet relation with decimal field(s) into Hive metastore in Spark SQL " +
- "specific format, which is NOT compatible with Hive. Because ParquetHiveSerDe in " +
- s"Hive ${client.version.fullVersion} doesn't support decimal type. See HIVE-6384."
- }
- newSparkSQLSpecificMetastoreTable()
- } else {
- logInfo {
- "Persisting data source relation with a single input path into Hive metastore in " +
- s"Hive compatible format. Input path: ${relation.paths.head}"
- }
- newHiveCompatibleMetastoreTable(relation, serde)
- }
+ if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+ val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+ val message =
+ s"Persisting data source relation $qualifiedTableName with a single input path " +
+ s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
+ (Some(hiveTable), message)
case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
- logWarning {
- "Persisting partitioned data source relation into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
- relation.paths.mkString("\n", "\n", "")
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Persisting partitioned data source relation $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ "Input path(s): " + relation.paths.mkString("\n", "\n", "")
+ (None, message)
case (Some(serde), relation: HadoopFsRelation) =>
- logWarning {
- "Persisting data source relation with multiple input paths into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
- relation.paths.mkString("\n", "\n", "")
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ s"Input paths: " + relation.paths.mkString("\n", "\n", "")
+ (None, message)
case (Some(serde), _) =>
- logWarning {
- s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
- "Persisting it into Hive metastore in Spark SQL specific format, " +
- "which is NOT compatible with Hive."
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Data source relation $qualifiedTableName is not a " +
+ s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
+ "in Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
case _ =>
- logWarning {
+ val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
- "Persisting data source relation into Hive metastore in Spark SQL specific format, " +
- "which is NOT compatible with Hive."
- }
- newSparkSQLSpecificMetastoreTable()
+ s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
+ s"Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
}
- client.createTable(hiveTable)
+ (hiveCompitiableTable, logMessage) match {
+ case (Some(table), message) =>
+ // We first try to save the metadata of the table in a Hive compatiable way.
+ // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+ // specific way.
+ try {
+ logInfo(message)
+ client.createTable(table)
+ } catch {
+ case throwable: Throwable =>
+ val warningMessage =
+ s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
+ s"it into Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, throwable)
+ val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
+ client.createTable(sparkSqlSpecificTable)
+ }
+
+ case (None, message) =>
+ logWarning(message)
+ val hiveTable = newSparkSQLSpecificMetastoreTable()
+ client.createTable(hiveTable)
+ }
}
def hiveDefaultTableFilePath(tableName: String): String = {