aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala101
1 files changed, 50 insertions, 51 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0c1b41e337..012634cb5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
// TODO: Support persisting partitioned data source relations in Hive compatible format
- val hiveTable = (maybeSerDe, dataSource.relation) match {
+ val qualifiedTableName = tableIdent.quotedString
+ val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
- if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
- // Hive ParquetSerDe doesn't support decimal type until 1.2.0.
- val isParquetSerDe = serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
- val hasDecimalFields = relation.schema.existsRecursively(_.isInstanceOf[DecimalType])
-
- val hiveParquetSupportsDecimal = client.version match {
- case org.apache.spark.sql.hive.client.hive.v1_2 => true
- case _ => false
- }
-
- if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields) {
- // If Hive version is below 1.2.0, we cannot save Hive compatible schema to
- // metastore when the file format is Parquet and the schema has DecimalType.
- logWarning {
- "Persisting Parquet relation with decimal field(s) into Hive metastore in Spark SQL " +
- "specific format, which is NOT compatible with Hive. Because ParquetHiveSerDe in " +
- s"Hive ${client.version.fullVersion} doesn't support decimal type. See HIVE-6384."
- }
- newSparkSQLSpecificMetastoreTable()
- } else {
- logInfo {
- "Persisting data source relation with a single input path into Hive metastore in " +
- s"Hive compatible format. Input path: ${relation.paths.head}"
- }
- newHiveCompatibleMetastoreTable(relation, serde)
- }
+ if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+ val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+ val message =
+ s"Persisting data source relation $qualifiedTableName with a single input path " +
+ s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
+ (Some(hiveTable), message)
case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
- logWarning {
- "Persisting partitioned data source relation into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
- relation.paths.mkString("\n", "\n", "")
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Persisting partitioned data source relation $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ "Input path(s): " + relation.paths.mkString("\n", "\n", "")
+ (None, message)
case (Some(serde), relation: HadoopFsRelation) =>
- logWarning {
- "Persisting data source relation with multiple input paths into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
- relation.paths.mkString("\n", "\n", "")
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ s"Input paths: " + relation.paths.mkString("\n", "\n", "")
+ (None, message)
case (Some(serde), _) =>
- logWarning {
- s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
- "Persisting it into Hive metastore in Spark SQL specific format, " +
- "which is NOT compatible with Hive."
- }
- newSparkSQLSpecificMetastoreTable()
+ val message =
+ s"Data source relation $qualifiedTableName is not a " +
+ s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
+ "in Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
case _ =>
- logWarning {
+ val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
- "Persisting data source relation into Hive metastore in Spark SQL specific format, " +
- "which is NOT compatible with Hive."
- }
- newSparkSQLSpecificMetastoreTable()
+ s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
+ s"Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
}
- client.createTable(hiveTable)
+ (hiveCompitiableTable, logMessage) match {
+ case (Some(table), message) =>
+ // We first try to save the metadata of the table in a Hive compatiable way.
+ // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+ // specific way.
+ try {
+ logInfo(message)
+ client.createTable(table)
+ } catch {
+ case throwable: Throwable =>
+ val warningMessage =
+ s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
+ s"it into Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, throwable)
+ val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
+ client.createTable(sparkSqlSpecificTable)
+ }
+
+ case (None, message) =>
+ logWarning(message)
+ val hiveTable = newSparkSQLSpecificMetastoreTable()
+ client.createTable(hiveTable)
+ }
}
def hiveDefaultTableFilePath(tableName: String): String = {