From b2349e6a00d569851f0ca91a60e9299306208e92 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sat, 1 Apr 2017 00:56:18 +0800 Subject: [SPARK-20160][SQL] Move ParquetConversions and OrcConversions Out Of HiveSessionCatalog ### What changes were proposed in this pull request? `ParquetConversions` and `OrcConversions` should be treated as regular `Analyzer` rules. It is not reasonable to be part of `HiveSessionCatalog`. This PR also combines two rules `ParquetConversions` and `OrcConversions` to build a new rule `RelationConversions `. After moving these two rules out of HiveSessionCatalog, the next step is to clean up, rename and move `HiveMetastoreCatalog` because it is not related to the hive package any more. ### How was this patch tested? The existing test cases Author: Xiao Li Closes #17484 from gatorsmile/cleanup. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 96 ++-------------------- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 25 +----- .../spark/sql/hive/HiveSessionStateBuilder.scala | 3 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 56 ++++++++++++- .../sql/hive/JavaMetastoreDataSourcesSuite.java | 3 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 4 +- .../org/apache/spark/sql/hive/parquetSuites.scala | 5 +- 7 files changed, 70 insertions(+), 122 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 305bd007c9..10f432570e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache import HiveMetastoreCatalog._ - private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase - - def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { - QualifiedTableName( - tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, - tableIdent.table.toLowerCase) - } - /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ private val tableCreationLocks = Striped.lazyWeakLock(100) @@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { - // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri - new Path(new Path(dbLocation), tblName).toString + // For testing only + private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { + val key = QualifiedTableName( + table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase, + table.table.toLowerCase) + tableRelationCache.getIfPresent(key) } private def getCached( @@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - private def convertToLogicalRelation( + def convertToLogicalRelation( relation: CatalogRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], @@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case NonFatal(ex) => logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) } - - /** - * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet - * data source relations for better performance. - */ - object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = { - relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && - sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) - } - - private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = { - val fileFormatClass = classOf[ParquetFileFormat] - val mergeSchema = sessionState.conf.getConf( - HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) - val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) - - convertToLogicalRelation(relation, options, fileFormatClass, "parquet") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - plan transformUp { - // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) - - // Read path - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && - shouldConvertMetastoreParquet(relation) => - convertToParquetRelation(relation) - } - } - } - - /** - * When scanning Metastore ORC tables, convert them to ORC data source relations - * for better performance. - */ - object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = { - relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && - sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) - } - - private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = { - val fileFormatClass = classOf[OrcFileFormat] - val options = Map[String, String]() - - convertToLogicalRelation(relation, options, fileFormatClass, "orc") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - plan transformUp { - // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) - - // Read path - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && - shouldConvertMetastoreOrc(relation) => - convertToOrcRelation(relation) - } - } - } } + private[hive] object HiveMetastoreCatalog { def mergeWithMetastoreSchema( metastoreSchema: StructType, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 2cc20a791d..9e3eb2dd82 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} @@ -43,7 +41,7 @@ import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( externalCatalog: HiveExternalCatalog, globalTempViewManager: GlobalTempViewManager, - private val metastoreCatalog: HiveMetastoreCatalog, + val metastoreCatalog: HiveMetastoreCatalog, functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, @@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog( parser, functionResourceLoader) { - // ---------------------------------------------------------------- - // | Methods and fields for interacting with HiveMetastoreCatalog | - // ---------------------------------------------------------------- - - // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e. - // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. - val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions - val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - - def hiveDefaultTableFilePath(name: TableIdentifier): String = { - metastoreCatalog.hiveDefaultTableFilePath(name) - } - - // For testing only - private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { - val key = metastoreCatalog.getQualifiedTableName(table) - tableRelationCache.getIfPresent(key) - } - override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { makeFunctionBuilder(funcName, Utils.classForName(className)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 2f3dfa05e9..9d3b31f39c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = new DetermineTableStats(session) +: - catalog.ParquetConversions +: - catalog.OrcConversions +: + RelationConversions(conf, catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: DataSourceAnalysis(conf) +: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b5ce027d51..0465e9c031 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive import java.io.IOException -import java.net.URI import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst @@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.hive.orc.OrcFileFormat +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} /** @@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] { } } +/** + * Relation conversion from metastore relations to data source relations for better performance + * + * - When writing to non-partitioned Hive-serde Parquet/Orc tables + * - When scanning Hive-serde Parquet/ORC tables + * + * This rule must be run before all other DDL post-hoc resolution rules, i.e. + * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. + */ +case class RelationConversions( + conf: SQLConf, + sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { + private def isConvertible(relation: CatalogRelation): Boolean = { + (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) || + (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && + conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)) + } + + private def convert(relation: CatalogRelation): LogicalRelation = { + if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) { + val options = Map(ParquetOptions.MERGE_SCHEMA -> + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + sessionCatalog.metastoreCatalog + .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") + } else { + val options = Map[String, String]() + sessionCatalog.metastoreCatalog + .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc") + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan transformUp { + // Write path + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && isConvertible(r) => + InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists) + + // Read path + case relation: CatalogRelation + if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => + convert(relation) + } + } +} + private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 0b157a45e6..25bd4d0017 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -72,8 +72,7 @@ public class JavaMetastoreDataSourcesSuite { path.delete(); } HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog(); - hiveManagedPath = new Path( - catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable"))); + hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 962998ea6f..3191b9975f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } // Table lookup will make the table cached. spark.table(tableIndent) - statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent) + statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get sql(s"INSERT INTO $tableName SELECT 2") @@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") } spark.table(tableIndent) - statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent) + statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get } (statsBeforeUpdate, statsAfterUpdate) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 9fc2923bb6..23f21e6b99 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } - private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = { - sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id) + private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { + sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + .getCachedDataSourceTable(table) } test("Caching converted data source Parquet Relations") { -- cgit v1.2.3