aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-02-28 09:24:36 -0800
committerXiao Li <gatorsmile@gmail.com>2017-02-28 09:24:36 -0800
commit7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (patch)
tree469c01ca9d14d9a37ca6d3754af1b2dccbab1c02 /sql/core/src
parentb405466513bcc02cadf1477b6b682ace95d81658 (diff)
downloadspark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.gz
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.bz2
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.zip
[SPARK-19678][SQL] remove MetastoreRelation
## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17015 from cloud-fan/table-relation.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala2
7 files changed, 72 insertions, 104 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 393925161f..49e85dc7b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
- relation.catalogTable.identifier
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
+ relation.tableMeta.identifier
}
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
- && srcRelations.contains(relation.catalogTable.identifier) =>
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
+ && srcRelations.contains(relation.tableMeta.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index b8ac070e3a..b02edd4c74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery(
LocalRelation(partAttrs, partitionData.map(_.values))
case relation: CatalogRelation =>
- val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
- val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+ val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+ val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
InternalRow.fromSeq(partAttrs.map { attr =>
// TODO: use correct timezone for partition values.
Cast(Literal(p.spec(attr.name)), attr.dataType,
@@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery(
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)
- case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
- val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
+ case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+ val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Some(AttributeSet(partAttrs), relation)
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d024a3673d..b89014ed8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
@@ -40,60 +37,40 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
- val relation =
- EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
- // Compute total size
- val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
- case catalogRel: CatalogRelation =>
- // This is a Hive serde format table
- (catalogRel.catalogTable,
- AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
-
- case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- // This is a data source format table
- (logicalRel.catalogTable.get,
- AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
- case otherRelation =>
- throw new AnalysisException("ANALYZE TABLE is not supported for " +
- s"${otherRelation.nodeName}.")
+ val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+ if (tableMeta.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
+ val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
// Compute stats for each column
- val (rowCount, newColStats) =
- AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
+ val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
- colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
+ colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
- sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
+ sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics)))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
-}
-
-object AnalyzeColumnCommand extends Logging {
/**
* Compute stats for the given columns.
* @return (row count, map from column name to ColumnStats)
- *
- * This is visible for testing.
*/
- def computeColumnStats(
+ private def computeColumnStats(
sparkSession: SparkSession,
- tableName: String,
- relation: LogicalPlan,
+ tableIdent: TableIdentifier,
columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
+ val relation = sparkSession.table(tableIdent).logicalPlan
// Resolve the column names and dedup using AttributeSet
val resolver = sparkSession.sessionState.conf.resolver
val attributesToAnalyze = AttributeSet(columnNames.map { col =>
@@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging {
attributesToAnalyze.foreach { attr =>
if (!ColumnStat.supportsType(attr.dataType)) {
throw new AnalysisException(
- s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " +
+ s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
"and Spark does not support statistics collection on this column type.")
}
}
@@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging {
// The layout of each struct follows the layout of the ColumnStats.
val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
val expressions = Count(Literal(1)).toAggregateExpression() +:
- attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+ attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 30b6cc7617..d2ea0cdf61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -22,11 +22,9 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.internal.SessionState
@@ -41,53 +39,39 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
- val relation =
- EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
- relation match {
- case relation: CatalogRelation =>
- updateTableStats(relation.catalogTable,
- AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable))
-
- // data source tables have been converted into LogicalRelations
- case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateTableStats(logicalRel.catalogTable.get,
- AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
- case otherRelation =>
- throw new AnalysisException("ANALYZE TABLE is not supported for " +
- s"${otherRelation.nodeName}.")
+ val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+ if (tableMeta.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
+ val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
- def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
- val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
- val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
- var newStats: Option[CatalogStatistics] = None
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
- }
- // We only set rowCount when noscan is false, because otherwise:
- // 1. when total size is not changed, we don't need to alter the table;
- // 2. when total size is changed, `oldRowCount` becomes invalid.
- // This is to make sure that we only record the right statistics.
- if (!noscan) {
- val newRowCount = Dataset.ofRows(sparkSession, relation).count()
- if (newRowCount >= 0 && newRowCount != oldRowCount) {
- newStats = if (newStats.isDefined) {
- newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
- } else {
- Some(CatalogStatistics(
- sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
- }
+ val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+ var newStats: Option[CatalogStatistics] = None
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
+ }
+ // We only set rowCount when noscan is false, because otherwise:
+ // 1. when total size is not changed, we don't need to alter the table;
+ // 2. when total size is changed, `oldRowCount` becomes invalid.
+ // This is to make sure that we only record the right statistics.
+ if (!noscan) {
+ val newRowCount = sparkSession.table(tableIdentWithDB).count()
+ if (newRowCount >= 0 && newRowCount != oldRowCount) {
+ newStats = if (newStats.isDefined) {
+ newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+ } else {
+ Some(CatalogStatistics(
+ sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
- // Update the metastore if the above statistics of the table are different from those
- // recorded in the metastore.
- if (newStats.isDefined) {
- sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
- // Refresh the cached data source table in the catalog.
- sessionState.catalog.refreshTable(tableIdentWithDB)
- }
+ }
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTable(tableMeta.copy(stats = newStats))
+ // Refresh the cached data source table in the catalog.
+ sessionState.catalog.refreshTable(tableIdentWithDB)
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f4292320e4..f694a0d6d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -208,16 +208,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
/**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
- private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+ private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
+ val table = r.tableMeta
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val withHiveSupport =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
- cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+ val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
@@ -233,19 +234,25 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)
- LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
+ LogicalRelation(
+ dataSource.resolveRelation(checkFilesExist = false),
catalogTable = Some(table))
}
- })
+ }).asInstanceOf[LogicalRelation]
+
+ // It's possible that the table schema is empty and need to be inferred at runtime. We should
+ // not specify expected outputs for this case.
+ val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
+ plan.copy(expectedOutputAttributes = expectedOutputs)
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
- if DDLUtils.isDatasourceTable(s.metadata) =>
- i.copy(table = readDataSourceTable(s.metadata))
+ case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
+ if DDLUtils.isDatasourceTable(r.tableMeta) =>
+ i.copy(table = readDataSourceTable(r))
- case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- readDataSourceTable(s.metadata)
+ case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
+ readDataSourceTable(r)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index e7a59d4ad4..4d781b96ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -379,7 +379,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
case relation: CatalogRelation =>
- val metadata = relation.catalogTable
+ val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b38bbd8e7e..bbb31dbc8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -306,7 +306,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
- case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
+ case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)