From 7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 28 Feb 2017 09:24:36 -0800 Subject: [SPARK-19678][SQL] remove MetastoreRelation ## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan Closes #17015 from cloud-fan/table-relation. --- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +-- .../sql/execution/OptimizeMetadataOnlyQuery.scala | 8 +-- .../execution/command/AnalyzeColumnCommand.scala | 49 ++++---------- .../execution/command/AnalyzeTableCommand.scala | 78 +++++++++------------- .../execution/datasources/DataSourceStrategy.scala | 29 +++++--- .../spark/sql/execution/datasources/rules.scala | 2 +- 6 files changed, 71 insertions(+), 103 deletions(-) (limited to 'sql/core/src/main') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 393925161f..49e85dc7b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) => - relation.catalogTable.identifier + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) => + relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed @@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) - && srcRelations.contains(relation.catalogTable.identifier) => + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) + && srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index b8ac070e3a..b02edd4c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery( LocalRelation(partAttrs, partitionData.map(_.values)) case relation: CatalogRelation => - val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation) - val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => // TODO: use correct timezone for partition values. Cast(Literal(p.spec(attr.name)), attr.dataType, @@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery( val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some(AttributeSet(partAttrs), l) - case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty => - val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation) + case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) Some(AttributeSet(partAttrs), relation) case p @ Project(projectList, child) if projectList.forall(_.deterministic) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index d024a3673d..b89014ed8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.datasources.LogicalRelation /** @@ -40,60 +37,40 @@ case class AnalyzeColumnCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) - - // Compute total size - val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match { - case catalogRel: CatalogRelation => - // This is a Hive serde format table - (catalogRel.catalogTable, - AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) - - case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => - // This is a data source format table - (logicalRel.catalogTable.get, - AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) - - case otherRelation => - throw new AnalysisException("ANALYZE TABLE is not supported for " + - s"${otherRelation.nodeName}.") + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") } + val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) // Compute stats for each column - val (rowCount, newColStats) = - AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames) + val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames) // We also update table-level stats in order to keep them consistent with column-level stats. val statistics = CatalogStatistics( sizeInBytes = sizeInBytes, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. - colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) + colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) - sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics))) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } -} - -object AnalyzeColumnCommand extends Logging { /** * Compute stats for the given columns. * @return (row count, map from column name to ColumnStats) - * - * This is visible for testing. */ - def computeColumnStats( + private def computeColumnStats( sparkSession: SparkSession, - tableName: String, - relation: LogicalPlan, + tableIdent: TableIdentifier, columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = { + val relation = sparkSession.table(tableIdent).logicalPlan // Resolve the column names and dedup using AttributeSet val resolver = sparkSession.sessionState.conf.resolver val attributesToAnalyze = AttributeSet(columnNames.map { col => @@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging { attributesToAnalyze.foreach { attr => if (!ColumnStat.supportsType(attr.dataType)) { throw new AnalysisException( - s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " + + s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " + "and Spark does not support statistics collection on this column type.") } } @@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging { // The layout of each struct follows the layout of the ColumnStats. val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError val expressions = Count(Literal(1)).toAggregateExpression() +: - attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr)) + attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr)) val namedExpressions = expressions.map(e => Alias(e, e.toString)()) val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 30b6cc7617..d2ea0cdf61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -22,11 +22,9 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} import org.apache.spark.sql.internal.SessionState @@ -41,53 +39,39 @@ case class AnalyzeTableCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) - - relation match { - case relation: CatalogRelation => - updateTableStats(relation.catalogTable, - AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable)) - - // data source tables have been converted into LogicalRelations - case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => - updateTableStats(logicalRel.catalogTable.get, - AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) - - case otherRelation => - throw new AnalysisException("ANALYZE TABLE is not supported for " + - s"${otherRelation.nodeName}.") + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") } + val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) - def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { - val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) - val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) - var newStats: Option[CatalogStatistics] = None - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) - } - // We only set rowCount when noscan is false, because otherwise: - // 1. when total size is not changed, we don't need to alter the table; - // 2. when total size is changed, `oldRowCount` becomes invalid. - // This is to make sure that we only record the right statistics. - if (!noscan) { - val newRowCount = Dataset.ofRows(sparkSession, relation).count() - if (newRowCount >= 0 && newRowCount != oldRowCount) { - newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) - } else { - Some(CatalogStatistics( - sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) - } + val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[CatalogStatistics] = None + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) + } + // We only set rowCount when noscan is false, because otherwise: + // 1. when total size is not changed, we don't need to alter the table; + // 2. when total size is changed, `oldRowCount` becomes invalid. + // This is to make sure that we only record the right statistics. + if (!noscan) { + val newRowCount = sparkSession.table(tableIdentWithDB).count() + if (newRowCount >= 0 && newRowCount != oldRowCount) { + newStats = if (newStats.isDefined) { + newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) + } else { + Some(CatalogStatistics( + sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) } } - // Update the metastore if the above statistics of the table are different from those - // recorded in the metastore. - if (newStats.isDefined) { - sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) - } + } + // Update the metastore if the above statistics of the table are different from those + // recorded in the metastore. + if (newStats.isDefined) { + sessionState.catalog.alterTable(tableMeta.copy(stats = newStats)) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f4292320e4..f694a0d6d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -208,16 +208,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { /** - * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive. + * Replaces [[CatalogRelation]] with data source table if its table provider is not hive. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { + private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { + val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val cache = sparkSession.sessionState.catalog.tableRelationCache val withHiveSupport = sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" - cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> _) val dataSource = @@ -233,19 +234,25 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) - LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), + LogicalRelation( + dataSource.resolveRelation(checkFilesExist = false), catalogTable = Some(table)) } - }) + }).asInstanceOf[LogicalRelation] + + // It's possible that the table schema is empty and need to be inferred at runtime. We should + // not specify expected outputs for this case. + val expectedOutputs = if (r.output.isEmpty) None else Some(r.output) + plan.copy(expectedOutputAttributes = expectedOutputs) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(s.metadata)) + case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _) + if DDLUtils.isDatasourceTable(r.tableMeta) => + i.copy(table = readDataSourceTable(r)) - case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(s.metadata) + case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) => + readDataSourceTable(r) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index e7a59d4ad4..4d781b96ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -379,7 +379,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { case relation: CatalogRelation => - val metadata = relation.catalogTable + val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") -- cgit v1.2.3