aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-04-01 00:56:18 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-01 00:56:18 +0800
commitb2349e6a00d569851f0ca91a60e9299306208e92 (patch)
treeca7de87f56ab0baf9ef715ae6b5921a189afbb3b /sql/hive
parentc4c03eed67c05a78dc8944f6119ea708d6b955be (diff)
downloadspark-b2349e6a00d569851f0ca91a60e9299306208e92.tar.gz
spark-b2349e6a00d569851f0ca91a60e9299306208e92.tar.bz2
spark-b2349e6a00d569851f0ca91a60e9299306208e92.zip
[SPARK-20160][SQL] Move ParquetConversions and OrcConversions Out Of HiveSessionCatalog
### What changes were proposed in this pull request? `ParquetConversions` and `OrcConversions` should be treated as regular `Analyzer` rules. It is not reasonable to be part of `HiveSessionCatalog`. This PR also combines two rules `ParquetConversions` and `OrcConversions` to build a new rule `RelationConversions `. After moving these two rules out of HiveSessionCatalog, the next step is to clean up, rename and move `HiveMetastoreCatalog` because it is not related to the hive package any more. ### How was this patch tested? The existing test cases Author: Xiao Li <gatorsmile@gmail.com> Closes #17484 from gatorsmile/cleanup.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala96
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala25
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala56
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala5
7 files changed, 70 insertions, 122 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 305bd007c9..10f432570e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
-import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._
@@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
import HiveMetastoreCatalog._
- private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
-
- def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
- QualifiedTableName(
- tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
- tableIdent.table.toLowerCase)
- }
-
/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
private val tableCreationLocks = Striped.lazyWeakLock(100)
@@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
- def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
- // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
- val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
- new Path(new Path(dbLocation), tblName).toString
+ // For testing only
+ private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+ val key = QualifiedTableName(
+ table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
+ table.table.toLowerCase)
+ tableRelationCache.getIfPresent(key)
}
private def getCached(
@@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
- private def convertToLogicalRelation(
+ def convertToLogicalRelation(
relation: CatalogRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
@@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case NonFatal(ex) =>
logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
}
-
- /**
- * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
- * data source relations for better performance.
- */
- object ParquetConversions extends Rule[LogicalPlan] {
- private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
- relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
- sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
- }
-
- private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
- val fileFormatClass = classOf[ParquetFileFormat]
- val mergeSchema = sessionState.conf.getConf(
- HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
- val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
-
- convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan transformUp {
- // Write path
- case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Parquet data source (yet).
- if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
- !r.isPartitioned && shouldConvertMetastoreParquet(r) =>
- InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
-
- // Read path
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
- shouldConvertMetastoreParquet(relation) =>
- convertToParquetRelation(relation)
- }
- }
- }
-
- /**
- * When scanning Metastore ORC tables, convert them to ORC data source relations
- * for better performance.
- */
- object OrcConversions extends Rule[LogicalPlan] {
- private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
- relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
- sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
- }
-
- private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
- val fileFormatClass = classOf[OrcFileFormat]
- val options = Map[String, String]()
-
- convertToLogicalRelation(relation, options, fileFormatClass, "orc")
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan transformUp {
- // Write path
- case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Orc data source (yet).
- if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
- !r.isPartitioned && shouldConvertMetastoreOrc(r) =>
- InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
-
- // Read path
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
- shouldConvertMetastoreOrc(relation) =>
- convertToOrcRelation(relation)
- }
- }
- }
}
+
private[hive] object HiveMetastoreCatalog {
def mergeWithMetastoreSchema(
metastoreSchema: StructType,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 2cc20a791d..9e3eb2dd82 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}
@@ -43,7 +41,7 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
- private val metastoreCatalog: HiveMetastoreCatalog,
+ val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
@@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog(
parser,
functionResourceLoader) {
- // ----------------------------------------------------------------
- // | Methods and fields for interacting with HiveMetastoreCatalog |
- // ----------------------------------------------------------------
-
- // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
- // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
- val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
- val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
-
- def hiveDefaultTableFilePath(name: TableIdentifier): String = {
- metastoreCatalog.hiveDefaultTableFilePath(name)
- }
-
- // For testing only
- private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
- val key = metastoreCatalog.getQualifiedTableName(table)
- tableRelationCache.getIfPresent(key)
- }
-
override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
makeFunctionBuilder(funcName, Utils.classForName(className))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 2f3dfa05e9..9d3b31f39c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetermineTableStats(session) +:
- catalog.ParquetConversions +:
- catalog.OrcConversions +:
+ RelationConversions(conf, catalog) +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b5ce027d51..0465e9c031 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hive
import java.io.IOException
-import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
/**
@@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] {
}
}
+/**
+ * Relation conversion from metastore relations to data source relations for better performance
+ *
+ * - When writing to non-partitioned Hive-serde Parquet/Orc tables
+ * - When scanning Hive-serde Parquet/ORC tables
+ *
+ * This rule must be run before all other DDL post-hoc resolution rules, i.e.
+ * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
+ */
+case class RelationConversions(
+ conf: SQLConf,
+ sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
+ private def isConvertible(relation: CatalogRelation): Boolean = {
+ (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
+ conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) ||
+ (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
+ conf.getConf(HiveUtils.CONVERT_METASTORE_ORC))
+ }
+
+ private def convert(relation: CatalogRelation): LogicalRelation = {
+ if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) {
+ val options = Map(ParquetOptions.MERGE_SCHEMA ->
+ conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
+ sessionCatalog.metastoreCatalog
+ .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
+ } else {
+ val options = Map[String, String]()
+ sessionCatalog.metastoreCatalog
+ .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan transformUp {
+ // Write path
+ case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
+ if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+ !r.isPartitioned && isConvertible(r) =>
+ InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
+
+ // Read path
+ case relation: CatalogRelation
+ if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
+ convert(relation)
+ }
+ }
+}
+
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 0b157a45e6..25bd4d0017 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -72,8 +72,7 @@ public class JavaMetastoreDataSourcesSuite {
path.delete();
}
HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
- hiveManagedPath = new Path(
- catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
+ hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 962998ea6f..3191b9975f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
// Table lookup will make the table cached.
spark.table(tableIndent)
- statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
+ statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
sql(s"INSERT INTO $tableName SELECT 2")
@@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
}
spark.table(tableIndent)
- statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
+ statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
}
(statsBeforeUpdate, statsAfterUpdate)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9fc2923bb6..23f21e6b99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}
- private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
- sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
+ private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+ sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+ .getCachedDataSourceTable(table)
}
test("Caching converted data source Parquet Relations") {