diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-02-28 09:24:36 -0800 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-02-28 09:24:36 -0800 |
commit | 7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (patch) | |
tree | 469c01ca9d14d9a37ca6d3754af1b2dccbab1c02 /sql/hive | |
parent | b405466513bcc02cadf1477b6b682ace95d81658 (diff) | |
download | spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.gz spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.bz2 spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.zip |
[SPARK-19678][SQL] remove MetastoreRelation
## What changes were proposed in this pull request?
`MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation`
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #17015 from cloud-fan/table-relation.
Diffstat (limited to 'sql/hive')
15 files changed, 319 insertions, 530 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 677da0dbdc..151a69aebf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.net.URI + import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path @@ -26,6 +28,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -71,10 +74,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, pathsInMetastore: Seq[Path], - metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], - expectedBucketSpec: Option[BucketSpec], partitionSchema: Option[StructType]): Option[LogicalRelation] = { tableRelationCache.getIfPresent(tableIdentifier) match { @@ -89,7 +90,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val useCached = relation.location.rootPaths.toSet == pathsInMetastore.toSet && logical.schema.sameType(schemaInMetastore) && - relation.bucketSpec == expectedBucketSpec && + // We don't support hive bucketed tables. This function `getCached` is only used for + // converting supported Hive tables to data source tables. + relation.bucketSpec.isEmpty && relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil)) if (useCached) { @@ -100,52 +103,48 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None } case _ => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + - s"should be stored as $expectedFileFormat. However, we are getting " + - s"a ${relation.fileFormat} from the metastore cache. This cached " + - s"entry will be invalidated.") + logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + + s"However, we are getting a ${relation.fileFormat} from the metastore cache. " + + "This cached entry will be invalidated.") tableRelationCache.invalidate(tableIdentifier) None } case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") + logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + + s"However, we are getting a $other from the metastore cache. " + + "This cached entry will be invalidated.") tableRelationCache.invalidate(tableIdentifier) None } } private def convertToLogicalRelation( - metastoreRelation: MetastoreRelation, + relation: CatalogRelation, options: Map[String, String], - defaultSource: FileFormat, fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val metastoreSchema = relation.tableMeta.schema val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. + QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { - val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - + val tablePath = new Path(new URI(relation.tableMeta.location)) + val result = if (relation.isPartitioned) { + val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(tablePath) } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. // Partitioned tables without partitions use the location of the table's base path. // Partitioned tables with partitions use the locations of those partitions' data // locations,_omitting_ the table's base path. - val paths = metastoreRelation.getHiveQlPartitions().map { p => - new Path(p.getLocation) - } + val paths = sparkSession.sharedState.externalCatalog + .listPartitions(tableIdentifier.database, tableIdentifier.name) + .map(p => new Path(new URI(p.storage.locationUri.get))) + if (paths.isEmpty) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(tablePath) } else { paths } @@ -155,39 +154,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val cached = getCached( tableIdentifier, rootPaths, - metastoreRelation, metastoreSchema, fileFormatClass, - bucketSpec, Some(partitionSchema)) val logicalRelation = cached.getOrElse { - val sizeInBytes = - metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong + val sizeInBytes = relation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { - val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) + val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes) if (lazyPruningEnabled) { index } else { index.filterPartitions(Nil) // materialize all the partitions in memory } } - val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = - StructType(metastoreSchema - .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) - val relation = HadoopFsRelation( + val fsRelation = HadoopFsRelation( location = fileIndex, partitionSchema = partitionSchema, - dataSchema = dataSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, + dataSchema = relation.tableMeta.dataSchema, + // We don't support hive bucketed tables, only ones we write out. + bucketSpec = None, + fileFormat = fileFormatClass.newInstance(), options = options)(sparkSession = sparkSession) - val created = LogicalRelation(relation, - catalogTable = Some(metastoreRelation.catalogTable)) + val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta)) tableRelationCache.put(tableIdentifier, created) created } @@ -195,14 +186,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = metastoreRelation.hiveQlTable.getDataLocation + val rootPath = tablePath withTableCreationLock(tableIdentifier, { - val cached = getCached(tableIdentifier, + val cached = getCached( + tableIdentifier, Seq(rootPath), - metastoreRelation, metastoreSchema, fileFormatClass, - bucketSpec, None) val logicalRelation = cached.getOrElse { val created = @@ -210,11 +200,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log DataSource( sparkSession = sparkSession, paths = rootPath.toString :: Nil, - userSpecifiedSchema = Some(metastoreRelation.schema), - bucketSpec = bucketSpec, + userSpecifiedSchema = Some(metastoreSchema), + // We don't support hive bucketed tables, only ones we write out. + bucketSpec = None, options = options, className = fileType).resolveRelation(), - catalogTable = Some(metastoreRelation.catalogTable)) + catalogTable = Some(relation.tableMeta)) tableRelationCache.put(tableIdentifier, created) created @@ -223,7 +214,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + result.copy(expectedOutputAttributes = Some(relation.output)) } /** @@ -231,33 +222,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = { + relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && sessionState.convertMetastoreParquet } - private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new ParquetFileFormat() + private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = { val fileFormatClass = classOf[ParquetFileFormat] - val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") + convertToLogicalRelation(relation, options, fileFormatClass, "parquet") } override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && shouldConvertMetastoreParquet(r) => InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => - val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation, None) + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && + shouldConvertMetastoreParquet(relation) => + convertToParquetRelation(relation) } } } @@ -267,31 +257,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * for better performance. */ object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = { + relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && sessionState.convertMetastoreOrc } - private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new OrcFileFormat() + private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = { val fileFormatClass = classOf[OrcFileFormat] val options = Map[String, String]() - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + convertToLogicalRelation(relation, options, fileFormatClass, "orc") } override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Orc data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && shouldConvertMetastoreOrc(r) => InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => - val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation, None) + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && + shouldConvertMetastoreOrc(relation) => + convertToOrcRelation(relation) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 273cf85df3..5a08a6bc66 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,10 +62,10 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = new ResolveHiveSerdeTable(sparkSession) :: new FindDataSourceTable(sparkSession) :: - new FindHiveSerdeTable(sparkSession) :: new ResolveSQLOnFile(sparkSession) :: Nil override val postHocResolutionRules = + new DetermineTableStats(sparkSession) :: catalog.ParquetConversions :: catalog.OrcConversions :: PreprocessTableCreation(sparkSession) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index f45532cc38..624cfa206e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,8 +17,14 @@ package org.apache.spark.sql.hive +import java.io.IOException +import java.net.URI + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -91,18 +97,56 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { // Infers the schema, if empty, because the schema could be determined by Hive // serde. - val catalogTable = if (query.isEmpty) { - val withSchema = HiveUtils.inferSchema(withStorage) - if (withSchema.schema.length <= 0) { + val withSchema = if (query.isEmpty) { + val inferred = HiveUtils.inferSchema(withStorage) + if (inferred.schema.length <= 0) { throw new AnalysisException("Unable to infer the schema. " + - s"The schema specification is required to create the table ${withSchema.identifier}.") + s"The schema specification is required to create the table ${inferred.identifier}.") } - withSchema + inferred } else { withStorage } - c.copy(tableDesc = catalogTable) + c.copy(tableDesc = withSchema) + } +} + +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case relation: CatalogRelation + if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => + val table = relation.tableMeta + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead. + val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) { + totalSize.get + } else if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get + } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = session.sessionState.newHadoopConf() + val tablePath = new Path(new URI(table.location)) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + session.sessionState.conf.defaultSizeInBytes + } + } else { + session.sessionState.conf.defaultSizeInBytes + } + + val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + relation.copy(tableMeta = withStats) } } @@ -114,8 +158,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => - InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists) + if DDLUtils.isHiveTable(relation.tableMeta) => + InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -125,21 +170,6 @@ object HiveAnalysis extends Rule[LogicalPlan] { } } -/** - * Replaces `SimpleCatalogRelation` with [[MetastoreRelation]] if its table provider is hive. - */ -class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) - if DDLUtils.isHiveTable(s.metadata) => - i.copy(table = - MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)) - - case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) => - MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session) - } -} - private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => @@ -161,10 +191,10 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => + case PhysicalOperation(projectList, predicates, relation: CatalogRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. - val partitionKeyIds = AttributeSet(relation.partitionKeys) + val partitionKeyIds = AttributeSet(relation.partitionCols) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala deleted file mode 100644 index 97b120758b..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io.IOException - -import com.google.common.base.Objects -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} -import org.apache.hadoop.hive.ql.plan.TableDesc - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.hive.client.HiveClientImpl -import org.apache.spark.sql.types.StructField - - -private[hive] case class MetastoreRelation( - databaseName: String, - tableName: String) - (val catalogTable: CatalogTable, - @transient private val sparkSession: SparkSession) - extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { - - override def equals(other: Any): Boolean = other match { - case relation: MetastoreRelation => - databaseName == relation.databaseName && - tableName == relation.tableName && - output == relation.output - case _ => false - } - - override def hashCode(): Int = { - Objects.hashCode(databaseName, tableName, output) - } - - override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - - @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) - - @transient override def computeStats(conf: CatalystConf): Statistics = { - catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( - sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`, - // which is generated by analyze command. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes - } - } else { - sparkSession.sessionState.conf.defaultSizeInBytes - }) - } - )) - } - - // When metastore partition pruning is turned off, we cache the list of all partitions to - // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = { - sparkSession.sharedState.externalCatalog.listPartitions( - catalogTable.database, - catalogTable.identifier.table) - } - - def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { - sparkSession.sharedState.externalCatalog.listPartitionsByFilter( - catalogTable.database, - catalogTable.identifier.table, - predicates) - } else { - allPartitions - } - - rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) - } - - /** Only compare database and tablename, not alias. */ - override def sameResult(plan: LogicalPlan): Boolean = { - plan.canonicalized match { - case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName - case _ => false - } - } - - val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata - ) - - implicit class SchemaAttribute(f: StructField) { - def toAttribute: AttributeReference = AttributeReference( - f.name, - f.dataType, - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifier = Some(tableName)) - } - - /** PartitionKey attributes */ - val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) - - /** Non-partitionKey attributes */ - val dataColKeys = catalogTable.schema - .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } - .map(_.toAttribute) - - val output = dataColKeys ++ partitionKeys - - /** An attribute map that can be used to lookup original attributes based on expression id. */ - val attributeMap = AttributeMap(output.map(o => (o, o))) - - /** An attribute map for determining the ordinal for non-partition columns. */ - val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex) - - override def inputFiles: Array[String] = { - val partLocations = allPartitions - .flatMap(_.storage.locationUri) - .toArray - if (partLocations.nonEmpty) { - partLocations - } else { - Array( - catalogTable.storage.locationUri.getOrElse( - sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) - } - } - - override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d48702b610..16c1103dd1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -61,7 +61,8 @@ private[hive] sealed trait TableReader { private[hive] class HadoopTableReader( @transient private val attributes: Seq[Attribute], - @transient private val relation: MetastoreRelation, + @transient private val partitionKeys: Seq[Attribute], + @transient private val tableDesc: TableDesc, @transient private val sparkSession: SparkSession, hadoopConf: Configuration) extends TableReader with Logging { @@ -88,7 +89,7 @@ class HadoopTableReader( override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, - Utils.classForName(relation.tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]], + Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]], filterOpt = None) /** @@ -110,7 +111,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. - val tableDesc = relation.tableDesc + val localTableDesc = tableDesc val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath @@ -119,7 +120,7 @@ class HadoopTableReader( // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) @@ -127,7 +128,7 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) + deserializer.initialize(hconf, localTableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } @@ -212,8 +213,6 @@ class HadoopTableReader( partCols.map(col => new String(partSpec.get(col))).toArray } - // Create local references so that the outer object isn't serialized. - val tableDesc = relation.tableDesc val broadcastedHiveConf = _broadcastedHadoopConf val localDeserializer = partDeserializer val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) @@ -222,12 +221,12 @@ class HadoopTableReader( // Attached indices indicate the position of each attribute in the output schema. val (partitionKeyAttrs, nonPartitionKeyAttrs) = attributes.zipWithIndex.partition { case (attr, _) => - relation.partitionKeys.contains(attr) + partitionKeys.contains(attr) } def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { partitionKeyAttrs.foreach { case (attr, ordinal) => - val partOrdinal = relation.partitionKeys.indexOf(attr) + val partOrdinal = partitionKeys.indexOf(attr) row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) } } @@ -235,9 +234,11 @@ class HadoopTableReader( // Fill all partition keys to the given MutableRow object fillPartitionKeys(partValues, mutableRow) - val tableProperties = relation.tableDesc.getProperties + val tableProperties = tableDesc.getProperties - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => + // Create local references so that the outer object isn't serialized. + val localTableDesc = tableDesc + createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema @@ -251,8 +252,8 @@ class HadoopTableReader( } deserializer.initialize(hconf, props) // get the table deserializer - val tableSerDe = tableDesc.getDeserializerClass.newInstance() - tableSerDe.initialize(hconf, tableDesc.getProperties) + val tableSerDe = localTableDesc.getDeserializerClass.newInstance() + tableSerDe.initialize(hconf, localTableDesc.getProperties) // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 140c352fa6..14b9565be0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -29,10 +30,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -46,12 +49,12 @@ import org.apache.spark.util.Utils private[hive] case class HiveTableScanExec( requestedAttributes: Seq[Attribute], - relation: MetastoreRelation, + relation: CatalogRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) extends LeafExecNode { - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + require(partitionPruningPred.isEmpty || relation.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") override lazy val metrics = Map( @@ -60,42 +63,54 @@ case class HiveTableScanExec( override def producedAttributes: AttributeSet = outputSet ++ AttributeSet(partitionPruningPred.flatMap(_.references)) - // Retrieve the original attributes based on expression ID so that capitalization matches. - val attributes = requestedAttributes.map(relation.attributeMap) + private val originalAttributes = AttributeMap(relation.output.map(a => a -> a)) + + override val output: Seq[Attribute] = { + // Retrieve the original attributes based on expression ID so that capitalization matches. + requestedAttributes.map(originalAttributes) + } // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. - private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + private val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => require( pred.dataType == BooleanType, s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - BindReferences.bindReference(pred, relation.partitionKeys) + BindReferences.bindReference(pred, relation.partitionCols) } // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries - @transient - private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() + @transient private val hadoopConf = sparkSession.sessionState.newHadoopConf() + + @transient private val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata) // append columns ids and names before broadcast addColumnMetadataToConf(hadoopConf) - @transient - private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, sparkSession, hadoopConf) + @transient private val hadoopReader = new HadoopTableReader( + output, + relation.partitionCols, + tableDesc, + sparkSession, + hadoopConf) - private[this] def castFromString(value: String, dataType: DataType) = { + private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration) { // Specifies needed column IDs for those non-partitioning columns. - val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) - HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - val tableDesc = relation.tableDesc val deserializer = tableDesc.getDeserializerClass.newInstance deserializer.initialize(hiveConf, tableDesc.getProperties) @@ -113,7 +128,7 @@ case class HiveTableScanExec( .mkString(",") hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(",")) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) } /** @@ -126,7 +141,7 @@ case class HiveTableScanExec( boundPruningPred match { case None => partitions case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) + val dataTypes = relation.partitionCols.map(_.dataType) val castedValues = part.getValues.asScala.zip(dataTypes) .map { case (value, dataType) => castFromString(value, dataType) } @@ -138,27 +153,35 @@ case class HiveTableScanExec( } } + // exposed for tests + @transient lazy val rawPartitions = { + val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { + // Retrieve the original attributes based on expression ID so that capitalization matches. + val normalizedFilters = partitionPruningPred.map(_.transform { + case a: AttributeReference => originalAttributes(a) + }) + sparkSession.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + normalizedFilters) + } else { + sparkSession.sharedState.externalCatalog.listPartitions( + relation.tableMeta.database, + relation.tableMeta.identifier.table) + } + prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } + protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. - val rdd = if (!relation.hiveQlTable.isPartitioned) { + val rdd = if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) + hadoopReader.makeRDDForTable(hiveQlTable) } } else { - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = partitionPruningPred.map { e => - e transform { - case a: AttributeReference => - a.withName(relation.output.find(_.semanticEquals(a)).get.name) - } - } - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(normalizedFilters))) + hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions)) } } val numOutputRows = longMetric("numOutputRows") @@ -174,8 +197,6 @@ case class HiveTableScanExec( } } - override def output: Seq[Attribute] = attributes - override def sameResult(plan: SparkPlan): Boolean = plan match { case other: HiveTableScanExec => val thisPredicates = partitionPruningPred.map(cleanExpression) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 142f25defb..f107149ada 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -29,16 +29,18 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg +import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.HiveVersion +import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} import org.apache.spark.SparkException @@ -52,9 +54,7 @@ import org.apache.spark.SparkException * In the future we should converge the write path for Hive with the normal data source write path, * as defined in `org.apache.spark.sql.execution.datasources.FileFormatWriter`. * - * @param table the logical plan representing the table. In the future this should be a - * `org.apache.spark.sql.catalyst.catalog.CatalogTable` once we converge Hive tables - * and data source tables. + * @param table the metadata of the table. * @param partition a map from the partition key to the partition value (optional). If the partition * value is optional, dynamic partition insert will be performed. * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have @@ -74,7 +74,7 @@ import org.apache.spark.SparkException * @param ifNotExists If true, only write if the table or partition does not exist. */ case class InsertIntoHiveTable( - table: MetastoreRelation, + table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, @@ -218,10 +218,19 @@ case class InsertIntoHiveTable( val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + val hiveQlTable = HiveClientImpl.toHiveTable(table) // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = table.tableDesc - val tableLocation = table.hiveQlTable.getDataLocation + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + val tableLocation = hiveQlTable.getDataLocation val tmpLocation = getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) @@ -254,9 +263,9 @@ case class InsertIntoHiveTable( // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( - s"""Requested partitioning does not match the ${table.tableName} table: + s"""Requested partitioning does not match the ${table.identifier.table} table: |Requested partitions: ${partition.keys.mkString(",")} - |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) + |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions @@ -307,8 +316,8 @@ case class InsertIntoHiveTable( if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( - db = table.catalogTable.database, - table = table.catalogTable.identifier.table, + db = table.database, + table = table.identifier.table, tmpLocation.toString, partitionSpec, overwrite, @@ -320,8 +329,8 @@ case class InsertIntoHiveTable( // scalastyle:on val oldPart = externalCatalog.getPartitionOption( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, partitionSpec) var doHiveOverwrite = overwrite @@ -350,8 +359,8 @@ case class InsertIntoHiveTable( // which is currently considered as a Hive native command. val inheritTableSpecs = true externalCatalog.loadPartition( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, tmpLocation.toString, partitionSpec, isOverwrite = doHiveOverwrite, @@ -361,8 +370,8 @@ case class InsertIntoHiveTable( } } else { externalCatalog.loadTable( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, tmpLocation.toString, // TODO: URI overwrite, isSrcLocal = false) @@ -378,8 +387,8 @@ case class InsertIntoHiveTable( } // Invalidate the cache. - sparkSession.sharedState.cacheManager.invalidateCache(table) - sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier) + sparkSession.catalog.uncacheTable(table.qualifiedName) + sparkSession.sessionState.catalog.refreshTable(table.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala deleted file mode 100644 index 91ff711445..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} - -class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - test("makeCopy and toJSON should work") { - val table = CatalogTable( - identifier = TableIdentifier("test", Some("db")), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = StructType(StructField("a", IntegerType, true) :: Nil)) - val relation = MetastoreRelation("db", "test")(table, null) - - // No exception should be thrown - relation.makeCopy(Array("db", "test")) - // No exception should be thrown - relation.toJSON - } - - test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") { - withTable("bar") { - withTempView("foo") { - sql("select 0 as id").createOrReplaceTempView("foo") - // If we optimize the query in CTAS more than once, the following saveAsTable will fail - // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])` - sql("CREATE TABLE bar AS SELECT * FROM foo group by id") - checkAnswer(spark.table("bar"), Row(0) :: Nil) - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar")) - assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table") - } - } - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e2fcd2fd41..962998ea6f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogStatistics +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ @@ -33,52 +33,46 @@ import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { - test("MetastoreRelations fallback to HDFS for size estimation") { - val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled - try { - withTempDir { tempDir => - - // EXTERNAL OpenCSVSerde table pointing to LOCATION - - val file1 = new File(tempDir + "/data1") - val writer1 = new PrintWriter(file1) - writer1.write("1,2") - writer1.close() - - val file2 = new File(tempDir + "/data2") - val writer2 = new PrintWriter(file2) - writer2.write("1,2") - writer2.close() - - sql( - s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) - ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' - WITH SERDEPROPERTIES ( - \"separatorChar\" = \",\", - \"quoteChar\" = \"\\\"\", - \"escapeChar\" = \"\\\\\") - LOCATION '${tempDir.toURI}' - """) - - spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) - - val relation = spark.table("csv_table").queryExecution.analyzed.children.head - .asInstanceOf[MetastoreRelation] - - val properties = relation.hiveQlTable.getParameters - assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0") - assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") - - val sizeInBytes = relation.stats(conf).sizeInBytes - assert(sizeInBytes === BigInt(file1.length() + file2.length())) + test("Hive serde tables should fallback to HDFS for size estimation") { + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + withTable("csv_table") { + withTempDir { tempDir => + // EXTERNAL OpenCSVSerde table pointing to LOCATION + val file1 = new File(tempDir + "/data1") + val writer1 = new PrintWriter(file1) + writer1.write("1,2") + writer1.close() + + val file2 = new File(tempDir + "/data2") + val writer2 = new PrintWriter(file2) + writer2.write("1,2") + writer2.close() + + sql( + s""" + |CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |WITH SERDEPROPERTIES ( + |\"separatorChar\" = \",\", + |\"quoteChar\" = \"\\\"\", + |\"escapeChar\" = \"\\\\\") + |LOCATION '${tempDir.toURI}'""".stripMargin) + + val relation = spark.table("csv_table").queryExecution.analyzed.children.head + .asInstanceOf[CatalogRelation] + + val properties = relation.tableMeta.properties + assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0") + assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") + + val sizeInBytes = relation.stats(conf).sizeInBytes + assert(sizeInBytes === BigInt(file1.length() + file2.length())) + } } - } finally { - spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats) - sql("DROP TABLE csv_table ") } } - test("analyze MetastoreRelations") { + test("analyze Hive serde tables") { def queryTotalSize(tableName: String): BigInt = spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes @@ -152,9 +146,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } private def checkTableStats( - stats: Option[CatalogStatistics], + tableName: String, hasSizeInBytes: Boolean, - expectedRowCounts: Option[Int]): Unit = { + expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { + val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats + if (hasSizeInBytes || expectedRowCounts.nonEmpty) { assert(stats.isDefined) assert(stats.get.sizeInBytes > 0) @@ -162,26 +158,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } else { assert(stats.isEmpty) } - } - private def checkTableStats( - tableName: String, - isDataSourceTable: Boolean, - hasSizeInBytes: Boolean, - expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { - val df = sql(s"SELECT * FROM $tableName") - val stats = df.queryExecution.analyzed.collect { - case rel: MetastoreRelation => - checkTableStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts) - assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table") - rel.catalogTable.stats - case rel: LogicalRelation => - checkTableStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts) - assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table") - rel.catalogTable.get.stats - } - assert(stats.size == 1) - stats.head + stats } test("test table-level statistics for hive tables created in HiveExternalCatalog") { @@ -192,25 +170,23 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") checkTableStats( textTable, - isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None) sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") checkTableStats( textTable, - isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None) // noscan won't count the number of rows sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") - val fetchedStats1 = checkTableStats( - textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None) + val fetchedStats1 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None) // without noscan, we count the number of rows sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") - val fetchedStats2 = checkTableStats( - textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) + val fetchedStats2 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes) } } @@ -221,25 +197,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") - val fetchedStats1 = checkTableStats( - textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) + val fetchedStats1 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") // when the total size is not changed, the old row count is kept - val fetchedStats2 = checkTableStats( - textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500)) + val fetchedStats2 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) assert(fetchedStats1 == fetchedStats2) sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") // update total size and remove the old and invalid row count - val fetchedStats3 = checkTableStats( - textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None) + val fetchedStats3 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None) assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes) } } - test("test statistics of LogicalRelation converted from MetastoreRelation") { + test("test statistics of LogicalRelation converted from Hive serde tables") { val parquetTable = "parquetTable" val orcTable = "orcTable" withTable(parquetTable, orcTable) { @@ -251,21 +227,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it // for robustness withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") { - checkTableStats( - parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) + checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None) sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") - checkTableStats( - parquetTable, - isDataSourceTable = true, - hasSizeInBytes = true, - expectedRowCounts = Some(500)) + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) } withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") { - checkTableStats( - orcTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) + checkTableStats(orcTable, hasSizeInBytes = false, expectedRowCounts = None) sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS") - checkTableStats( - orcTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = Some(500)) + checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) } } } @@ -385,27 +354,23 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Add a filter to avoid creating too many partitions sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") - checkTableStats( - parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None) + checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None) // noscan won't count the number of rows sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats1 = checkTableStats( - parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None) + val fetchedStats1 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats2 = checkTableStats( - parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None) + val fetchedStats2 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes) // without noscan, we count the number of rows sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") - val fetchedStats3 = checkTableStats( - parquetTable, - isDataSourceTable = true, - hasSizeInBytes = true, - expectedRowCounts = Some(20)) + val fetchedStats3 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(20)) assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes) } } @@ -426,11 +391,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty)) dfNoCols.write.format("json").saveAsTable(table_no_cols) sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS") - checkTableStats( - table_no_cols, - isDataSourceTable = true, - hasSizeInBytes = true, - expectedRowCounts = Some(10)) + checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10)) } } @@ -478,10 +439,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(statsAfterUpdate.rowCount == Some(2)) } - test("estimates the size of a test MetastoreRelation") { + test("estimates the size of a test Hive serde tables") { val df = sql("""SELECT * FROM src""") - val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation => - mr.stats(conf).sizeInBytes + val sizes = df.queryExecution.analyzed.collect { + case relation: CatalogRelation => relation.stats(conf).sizeInBytes } assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), @@ -533,7 +494,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto after() } - /** Tests for MetastoreRelation */ + /** Tests for Hive serde tables */ val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" val metastoreAnswer = Seq.fill(4)(Row(238, "val_238", 238, "val_238")) mkTest( @@ -541,7 +502,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto () => (), metastoreQuery, metastoreAnswer, - implicitly[ClassTag[MetastoreRelation]] + implicitly[ClassTag[CatalogRelation]] ) } @@ -555,9 +516,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Assert src has a size smaller than the threshold. val sizes = df.queryExecution.analyzed.collect { - case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass - .isAssignableFrom(r.getClass) => - r.stats(conf).sizeInBytes + case relation: CatalogRelation => relation.stats(conf).sizeInBytes } assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index f3151d52f2..536ca8fd9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -385,7 +385,7 @@ abstract class HiveComparisonTest // also print out the query plans and results for those. val computedTablesMessages: String = try { val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect { - case ts: HiveTableScanExec => ts.relation.tableName + case ts: HiveTableScanExec => ts.relation.tableMeta.identifier }.toSet TestHive.reset() @@ -393,7 +393,7 @@ abstract class HiveComparisonTest executions.foreach(_.toRdd) val tablesGenerated = queryList.zip(executions).flatMap { case (q, e) => e.analyzed.collect { - case i: InsertIntoHiveTable if tablesRead contains i.table.tableName => + case i: InsertIntoHiveTable if tablesRead contains i.table.identifier => (q, e, i) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 5c460d25f3..90e037e292 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.Row -import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -95,8 +94,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH private def checkNumScannedPartitions(stmt: String, expectedNumParts: Int): Unit = { val plan = sql(stmt).queryExecution.sparkPlan val numPartitions = plan.collectFirst { - case p: HiveTableScanExec => - p.relation.getHiveQlPartitions(p.partitionPruningPred).length + case p: HiveTableScanExec => p.rawPartitions.length }.getOrElse(0) assert(numPartitions == expectedNumParts) } @@ -170,11 +168,11 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH s""" |SELECT * FROM $table """.stripMargin).queryExecution.sparkPlan - val relation = plan.collectFirst { - case p: HiveTableScanExec => p.relation + val scan = plan.collectFirst { + case p: HiveTableScanExec => p }.get - val tableCols = relation.hiveQlTable.getCols - relation.getHiveQlPartitions().foreach(p => assert(p.getCols.size == tableCols.size)) + val numDataCols = scan.relation.dataCols.length + scan.rawPartitions.foreach(p => assert(p.getCols.size == numDataCols)) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 24df73b40e..d535bef4cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -153,8 +153,8 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScanExec(columns, relation, _) => val columnNames = columns.map(_.name) - val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) { - p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) + val partValues = if (relation.isPartitioned) { + p.prunePartitions(p.rawPartitions).map(_.getValues) } else { Seq.empty } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9f6176339e..ef2d451e6b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -28,12 +28,12 @@ import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} -import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -526,7 +526,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceTable) { fail( - s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + + s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") } userSpecifiedLocation match { @@ -536,15 +536,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } assert(catalogTable.provider.get === format) - case r: MetastoreRelation => + case r: CatalogRelation => if (isDataSourceTable) { fail( s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + - s"${classOf[MetastoreRelation].getCanonicalName}.") + s"${classOf[CatalogRelation].getCanonicalName}.") } userSpecifiedLocation match { case Some(location) => - assert(r.catalogTable.location === location) + assert(r.tableMeta.location === location) case None => // OK. } // Also make sure that the format and serde are as desired. @@ -1030,7 +1030,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: MetastoreRelation, _) => // OK + case SubqueryAlias(_, r: CatalogRelation, _) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } @@ -2043,4 +2043,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") { + withTable("bar") { + withTempView("foo") { + sql("select 0 as id").createOrReplaceTempView("foo") + // If we optimize the query in CTAS more than once, the following saveAsTable will fail + // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])` + sql("SELECT * FROM foo group by id").toDF().write.format("hive").saveAsTable("bar") + checkAnswer(spark.table("bar"), Row(0) :: Nil) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar")) + assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 9fa1fb931d..38a5477796 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -26,8 +26,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} -import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf @@ -473,7 +474,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } else { queryExecution.analyzed.collectFirst { - case _: MetastoreRelation => () + case _: CatalogRelation => () }.getOrElse { fail(s"Expecting no conversion from orc to data sources, " + s"but got:\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 1a1b2571b6..3512c4a890 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,8 +21,8 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -806,7 +806,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { } } else { queryExecution.analyzed.collectFirst { - case _: MetastoreRelation => + case _: CatalogRelation => }.getOrElse { fail(s"Expecting no conversion from parquet to data sources, " + s"but got:\n$queryExecution") |