aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-02-28 09:24:36 -0800
committerXiao Li <gatorsmile@gmail.com>2017-02-28 09:24:36 -0800
commit7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (patch)
tree469c01ca9d14d9a37ca6d3754af1b2dccbab1c02 /sql
parentb405466513bcc02cadf1477b6b682ace95d81658 (diff)
downloadspark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.gz
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.bz2
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.zip
[SPARK-19678][SQL] remove MetastoreRelation
## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17015 from cloud-fan/table-relation.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala68
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala130
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala82
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala179
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala27
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala87
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala47
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala55
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala183
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
27 files changed, 442 insertions, 672 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 36ab8b8527..7529f90284 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,10 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans.UsingJoin
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0230626a66..06734891b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -592,7 +592,12 @@ class SessionCatalog(
child = parser.parsePlan(viewText))
SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db))))
} else {
- SubqueryAlias(table, SimpleCatalogRelation(metadata), None)
+ val tableRelation = CatalogRelation(
+ metadata,
+ // we assume all the columns are nullable.
+ metadata.dataSchema.asNullable.toAttributes,
+ metadata.partitionSchema.asNullable.toAttributes)
+ SubqueryAlias(table, tableRelation, None)
}
} else {
SubqueryAlias(table, tempTables(table), None)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2b3b575b4c..cb939026f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.catalog
import java.util.Date
-import scala.collection.mutable
+import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -349,36 +350,43 @@ object CatalogTypes {
/**
- * An interface that is implemented by logical plans to return the underlying catalog table.
- * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
- * probably remove this interface.
+ * A [[LogicalPlan]] that represents a table.
*/
-trait CatalogRelation {
- def catalogTable: CatalogTable
- def output: Seq[Attribute]
-}
+case class CatalogRelation(
+ tableMeta: CatalogTable,
+ dataCols: Seq[Attribute],
+ partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+ assert(tableMeta.identifier.database.isDefined)
+ assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+ assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
+
+ // The partition column should always appear after data columns.
+ override def output: Seq[Attribute] = dataCols ++ partitionCols
+
+ def isPartitioned: Boolean = partitionCols.nonEmpty
+
+ override def equals(relation: Any): Boolean = relation match {
+ case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+ case _ => false
+ }
+ override def hashCode(): Int = {
+ Objects.hashCode(tableMeta.identifier, output)
+ }
-/**
- * A [[LogicalPlan]] that wraps [[CatalogTable]].
- *
- * Note that in the future we should consolidate this and HiveCatalogRelation.
- */
-case class SimpleCatalogRelation(
- metadata: CatalogTable)
- extends LeafNode with CatalogRelation {
-
- override def catalogTable: CatalogTable = metadata
-
- override lazy val resolved: Boolean = false
-
- override val output: Seq[Attribute] = {
- val (partCols, dataCols) = metadata.schema.toAttributes
- // Since data can be dumped in randomly with no validation, everything is nullable.
- .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
- .partition { a =>
- metadata.partitionColumnNames.contains(a.name)
- }
- dataCols ++ partCols
+ /** Only compare table identifier. */
+ override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)
+
+ override def computeStats(conf: CatalystConf): Statistics = {
+ // For data source tables, we will create a `LogicalRelation` and won't call this method, for
+ // hive serde tables, we will always generate a statistics.
+ // TODO: unify the table stats generation.
+ tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
+ throw new IllegalStateException("table stats must be specified.")
+ }
}
+
+ override def newInstance(): LogicalPlan = copy(
+ dataCols = dataCols.map(_.newInstance()),
+ partitionCols = partitionCols.map(_.newInstance()))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 26697e9867..a3cc4529b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -63,7 +63,9 @@ case class TableIdentifier(table: String, database: Option[String])
}
/** A fully qualified identifier for a table (i.e., database.tableName) */
-case class QualifiedTableName(database: String, name: String)
+case class QualifiedTableName(database: String, name: String) {
+ override def toString: String = s"$database.$name"
+}
object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 44434324d3..a755231962 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -433,15 +433,15 @@ class SessionCatalogSuite extends PlanTest {
sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
- assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
- == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
+ .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, None))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
- assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
- == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head
+ .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
}
test("look up view relation") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 393925161f..49e85dc7b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
- relation.catalogTable.identifier
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
+ relation.tableMeta.identifier
}
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
- && srcRelations.contains(relation.catalogTable.identifier) =>
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
+ && srcRelations.contains(relation.tableMeta.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index b8ac070e3a..b02edd4c74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery(
LocalRelation(partAttrs, partitionData.map(_.values))
case relation: CatalogRelation =>
- val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
- val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+ val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+ val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
InternalRow.fromSeq(partAttrs.map { attr =>
// TODO: use correct timezone for partition values.
Cast(Literal(p.spec(attr.name)), attr.dataType,
@@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery(
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)
- case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
- val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
+ case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+ val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Some(AttributeSet(partAttrs), relation)
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d024a3673d..b89014ed8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
/**
@@ -40,60 +37,40 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
- val relation =
- EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
- // Compute total size
- val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
- case catalogRel: CatalogRelation =>
- // This is a Hive serde format table
- (catalogRel.catalogTable,
- AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
-
- case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- // This is a data source format table
- (logicalRel.catalogTable.get,
- AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
- case otherRelation =>
- throw new AnalysisException("ANALYZE TABLE is not supported for " +
- s"${otherRelation.nodeName}.")
+ val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+ if (tableMeta.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
+ val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
// Compute stats for each column
- val (rowCount, newColStats) =
- AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
+ val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
- colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
+ colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
- sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
+ sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics)))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
-}
-
-object AnalyzeColumnCommand extends Logging {
/**
* Compute stats for the given columns.
* @return (row count, map from column name to ColumnStats)
- *
- * This is visible for testing.
*/
- def computeColumnStats(
+ private def computeColumnStats(
sparkSession: SparkSession,
- tableName: String,
- relation: LogicalPlan,
+ tableIdent: TableIdentifier,
columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
+ val relation = sparkSession.table(tableIdent).logicalPlan
// Resolve the column names and dedup using AttributeSet
val resolver = sparkSession.sessionState.conf.resolver
val attributesToAnalyze = AttributeSet(columnNames.map { col =>
@@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging {
attributesToAnalyze.foreach { attr =>
if (!ColumnStat.supportsType(attr.dataType)) {
throw new AnalysisException(
- s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " +
+ s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
"and Spark does not support statistics collection on this column type.")
}
}
@@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging {
// The layout of each struct follows the layout of the ColumnStats.
val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
val expressions = Count(Literal(1)).toAggregateExpression() +:
- attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+ attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 30b6cc7617..d2ea0cdf61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -22,11 +22,9 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.internal.SessionState
@@ -41,53 +39,39 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
- val relation =
- EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
- relation match {
- case relation: CatalogRelation =>
- updateTableStats(relation.catalogTable,
- AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable))
-
- // data source tables have been converted into LogicalRelations
- case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateTableStats(logicalRel.catalogTable.get,
- AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
- case otherRelation =>
- throw new AnalysisException("ANALYZE TABLE is not supported for " +
- s"${otherRelation.nodeName}.")
+ val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+ if (tableMeta.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
+ val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
- def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
- val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
- val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
- var newStats: Option[CatalogStatistics] = None
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
- }
- // We only set rowCount when noscan is false, because otherwise:
- // 1. when total size is not changed, we don't need to alter the table;
- // 2. when total size is changed, `oldRowCount` becomes invalid.
- // This is to make sure that we only record the right statistics.
- if (!noscan) {
- val newRowCount = Dataset.ofRows(sparkSession, relation).count()
- if (newRowCount >= 0 && newRowCount != oldRowCount) {
- newStats = if (newStats.isDefined) {
- newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
- } else {
- Some(CatalogStatistics(
- sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
- }
+ val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+ var newStats: Option[CatalogStatistics] = None
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
+ }
+ // We only set rowCount when noscan is false, because otherwise:
+ // 1. when total size is not changed, we don't need to alter the table;
+ // 2. when total size is changed, `oldRowCount` becomes invalid.
+ // This is to make sure that we only record the right statistics.
+ if (!noscan) {
+ val newRowCount = sparkSession.table(tableIdentWithDB).count()
+ if (newRowCount >= 0 && newRowCount != oldRowCount) {
+ newStats = if (newStats.isDefined) {
+ newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+ } else {
+ Some(CatalogStatistics(
+ sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
- // Update the metastore if the above statistics of the table are different from those
- // recorded in the metastore.
- if (newStats.isDefined) {
- sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
- // Refresh the cached data source table in the catalog.
- sessionState.catalog.refreshTable(tableIdentWithDB)
- }
+ }
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTable(tableMeta.copy(stats = newStats))
+ // Refresh the cached data source table in the catalog.
+ sessionState.catalog.refreshTable(tableIdentWithDB)
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f4292320e4..f694a0d6d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -208,16 +208,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
/**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
- private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+ private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
+ val table = r.tableMeta
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val withHiveSupport =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
- cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+ val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
@@ -233,19 +234,25 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)
- LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
+ LogicalRelation(
+ dataSource.resolveRelation(checkFilesExist = false),
catalogTable = Some(table))
}
- })
+ }).asInstanceOf[LogicalRelation]
+
+ // It's possible that the table schema is empty and need to be inferred at runtime. We should
+ // not specify expected outputs for this case.
+ val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
+ plan.copy(expectedOutputAttributes = expectedOutputs)
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
- if DDLUtils.isDatasourceTable(s.metadata) =>
- i.copy(table = readDataSourceTable(s.metadata))
+ case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
+ if DDLUtils.isDatasourceTable(r.tableMeta) =>
+ i.copy(table = readDataSourceTable(r))
- case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- readDataSourceTable(s.metadata)
+ case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
+ readDataSourceTable(r)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index e7a59d4ad4..4d781b96ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -379,7 +379,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
case relation: CatalogRelation =>
- val metadata = relation.catalogTable
+ val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b38bbd8e7e..bbb31dbc8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -306,7 +306,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
- case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
+ case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 677da0dbdc..151a69aebf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.net.URI
+
import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path
@@ -26,6 +28,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -71,10 +74,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[Path],
- metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
- expectedBucketSpec: Option[BucketSpec],
partitionSchema: Option[StructType]): Option[LogicalRelation] = {
tableRelationCache.getIfPresent(tableIdentifier) match {
@@ -89,7 +90,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val useCached =
relation.location.rootPaths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(schemaInMetastore) &&
- relation.bucketSpec == expectedBucketSpec &&
+ // We don't support hive bucketed tables. This function `getCached` is only used for
+ // converting supported Hive tables to data source tables.
+ relation.bucketSpec.isEmpty &&
relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil))
if (useCached) {
@@ -100,52 +103,48 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
None
}
case _ =>
- logWarning(
- s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
- s"should be stored as $expectedFileFormat. However, we are getting " +
- s"a ${relation.fileFormat} from the metastore cache. This cached " +
- s"entry will be invalidated.")
+ logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
+ s"However, we are getting a ${relation.fileFormat} from the metastore cache. " +
+ "This cached entry will be invalidated.")
tableRelationCache.invalidate(tableIdentifier)
None
}
case other =>
- logWarning(
- s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
- s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
- s"This cached entry will be invalidated.")
+ logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
+ s"However, we are getting a $other from the metastore cache. " +
+ "This cached entry will be invalidated.")
tableRelationCache.invalidate(tableIdentifier)
None
}
}
private def convertToLogicalRelation(
- metastoreRelation: MetastoreRelation,
+ relation: CatalogRelation,
options: Map[String, String],
- defaultSource: FileFormat,
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
- val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+ val metastoreSchema = relation.tableMeta.schema
val tableIdentifier =
- QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
- val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
+ QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table)
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
- val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
- val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
-
+ val tablePath = new Path(new URI(relation.tableMeta.location))
+ val result = if (relation.isPartitioned) {
+ val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
- Seq(metastoreRelation.hiveQlTable.getDataLocation)
+ Seq(tablePath)
} else {
// By convention (for example, see CatalogFileIndex), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
// Partitioned tables without partitions use the location of the table's base path.
// Partitioned tables with partitions use the locations of those partitions' data
// locations,_omitting_ the table's base path.
- val paths = metastoreRelation.getHiveQlPartitions().map { p =>
- new Path(p.getLocation)
- }
+ val paths = sparkSession.sharedState.externalCatalog
+ .listPartitions(tableIdentifier.database, tableIdentifier.name)
+ .map(p => new Path(new URI(p.storage.locationUri.get)))
+
if (paths.isEmpty) {
- Seq(metastoreRelation.hiveQlTable.getDataLocation)
+ Seq(tablePath)
} else {
paths
}
@@ -155,39 +154,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val cached = getCached(
tableIdentifier,
rootPaths,
- metastoreRelation,
metastoreSchema,
fileFormatClass,
- bucketSpec,
Some(partitionSchema))
val logicalRelation = cached.getOrElse {
- val sizeInBytes =
- metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
+ val sizeInBytes = relation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
val fileIndex = {
- val index = new CatalogFileIndex(
- sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+ val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes)
if (lazyPruningEnabled) {
index
} else {
index.filterPartitions(Nil) // materialize all the partitions in memory
}
}
- val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
- val dataSchema =
- StructType(metastoreSchema
- .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
- val relation = HadoopFsRelation(
+ val fsRelation = HadoopFsRelation(
location = fileIndex,
partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- bucketSpec = bucketSpec,
- fileFormat = defaultSource,
+ dataSchema = relation.tableMeta.dataSchema,
+ // We don't support hive bucketed tables, only ones we write out.
+ bucketSpec = None,
+ fileFormat = fileFormatClass.newInstance(),
options = options)(sparkSession = sparkSession)
- val created = LogicalRelation(relation,
- catalogTable = Some(metastoreRelation.catalogTable))
+ val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta))
tableRelationCache.put(tableIdentifier, created)
created
}
@@ -195,14 +186,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
} else {
- val rootPath = metastoreRelation.hiveQlTable.getDataLocation
+ val rootPath = tablePath
withTableCreationLock(tableIdentifier, {
- val cached = getCached(tableIdentifier,
+ val cached = getCached(
+ tableIdentifier,
Seq(rootPath),
- metastoreRelation,
metastoreSchema,
fileFormatClass,
- bucketSpec,
None)
val logicalRelation = cached.getOrElse {
val created =
@@ -210,11 +200,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
- userSpecifiedSchema = Some(metastoreRelation.schema),
- bucketSpec = bucketSpec,
+ userSpecifiedSchema = Some(metastoreSchema),
+ // We don't support hive bucketed tables, only ones we write out.
+ bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
- catalogTable = Some(metastoreRelation.catalogTable))
+ catalogTable = Some(relation.tableMeta))
tableRelationCache.put(tableIdentifier, created)
created
@@ -223,7 +214,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
}
- result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
+ result.copy(expectedOutputAttributes = Some(relation.output))
}
/**
@@ -231,33 +222,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
- private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
+ private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
+ relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
sessionState.convertMetastoreParquet
}
- private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
- val defaultSource = new ParquetFileFormat()
+ private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[ParquetFileFormat]
-
val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
- convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
+ convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
}
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+ !r.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
// Read path
- case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
- val parquetRelation = convertToParquetRelation(relation)
- SubqueryAlias(relation.tableName, parquetRelation, None)
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
+ shouldConvertMetastoreParquet(relation) =>
+ convertToParquetRelation(relation)
}
}
}
@@ -267,31 +257,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
* for better performance.
*/
object OrcConversions extends Rule[LogicalPlan] {
- private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
+ private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
+ relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
sessionState.convertMetastoreOrc
}
- private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
- val defaultSource = new OrcFileFormat()
+ private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[OrcFileFormat]
val options = Map[String, String]()
- convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
+ convertToLogicalRelation(relation, options, fileFormatClass, "orc")
}
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
- if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+ !r.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
// Read path
- case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
- val orcRelation = convertToOrcRelation(relation)
- SubqueryAlias(relation.tableName, orcRelation, None)
+ case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
+ shouldConvertMetastoreOrc(relation) =>
+ convertToOrcRelation(relation)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 273cf85df3..5a08a6bc66 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,10 +62,10 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
new ResolveHiveSerdeTable(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
- new FindHiveSerdeTable(sparkSession) ::
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
+ new DetermineTableStats(sparkSession) ::
catalog.ParquetConversions ::
catalog.OrcConversions ::
PreprocessTableCreation(sparkSession) ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index f45532cc38..624cfa206e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,8 +17,14 @@
package org.apache.spark.sql.hive
+import java.io.IOException
+import java.net.URI
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.StatsSetupConst
+
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -91,18 +97,56 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
// Infers the schema, if empty, because the schema could be determined by Hive
// serde.
- val catalogTable = if (query.isEmpty) {
- val withSchema = HiveUtils.inferSchema(withStorage)
- if (withSchema.schema.length <= 0) {
+ val withSchema = if (query.isEmpty) {
+ val inferred = HiveUtils.inferSchema(withStorage)
+ if (inferred.schema.length <= 0) {
throw new AnalysisException("Unable to infer the schema. " +
- s"The schema specification is required to create the table ${withSchema.identifier}.")
+ s"The schema specification is required to create the table ${inferred.identifier}.")
}
- withSchema
+ inferred
} else {
withStorage
}
- c.copy(tableDesc = catalogTable)
+ c.copy(tableDesc = withSchema)
+ }
+}
+
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case relation: CatalogRelation
+ if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
+ val table = relation.tableMeta
+ // TODO: check if this estimate is valid for tables after partition pruning.
+ // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
+ // When table is external,`totalSize` is always zero, which will influence join strategy
+ // so when `totalSize` is zero, use `rawDataSize` instead.
+ val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+ val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) {
+ totalSize.get
+ } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
+ rawDataSize.get
+ } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(new URI(table.location))
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ fs.getContentSummary(tablePath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ session.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ session.sessionState.conf.defaultSizeInBytes
+ }
+
+ val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
+ relation.copy(tableMeta = withStats)
}
}
@@ -114,8 +158,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
- InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+ case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists)
+ if DDLUtils.isHiveTable(relation.tableMeta) =>
+ InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists)
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
@@ -125,21 +170,6 @@ object HiveAnalysis extends Rule[LogicalPlan] {
}
}
-/**
- * Replaces `SimpleCatalogRelation` with [[MetastoreRelation]] if its table provider is hive.
- */
-class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
- if DDLUtils.isHiveTable(s.metadata) =>
- i.copy(table =
- MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session))
-
- case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
- MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)
- }
-}
-
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
@@ -161,10 +191,10 @@ private[hive] trait HiveStrategies {
*/
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) =>
+ case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
- val partitionKeyIds = AttributeSet(relation.partitionKeys)
+ val partitionKeyIds = AttributeSet(relation.partitionCols)
val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
!predicate.references.isEmpty &&
predicate.references.subsetOf(partitionKeyIds)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
deleted file mode 100644
index 97b120758b..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.hive.client.HiveClientImpl
-import org.apache.spark.sql.types.StructField
-
-
-private[hive] case class MetastoreRelation(
- databaseName: String,
- tableName: String)
- (val catalogTable: CatalogTable,
- @transient private val sparkSession: SparkSession)
- extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
-
- override def equals(other: Any): Boolean = other match {
- case relation: MetastoreRelation =>
- databaseName == relation.databaseName &&
- tableName == relation.tableName &&
- output == relation.output
- case _ => false
- }
-
- override def hashCode(): Int = {
- Objects.hashCode(databaseName, tableName, output)
- }
-
- override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
-
- @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable)
-
- @transient override def computeStats(conf: CatalystConf): Statistics = {
- catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
- sizeInBytes = {
- val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
- val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
- // TODO: check if this estimate is valid for tables after partition pruning.
- // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore.
- // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
- // (see StatsSetupConst in Hive) that we can look at in the future.
- BigInt(
- // When table is external,`totalSize` is always zero, which will influence join strategy
- // so when `totalSize` is zero, use `rawDataSize` instead
- // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
- // which is generated by analyze command.
- if (totalSize != null && totalSize.toLong > 0L) {
- totalSize.toLong
- } else if (rawDataSize != null && rawDataSize.toLong > 0) {
- rawDataSize.toLong
- } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
- fs.getContentSummary(hiveQlTable.getPath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- sparkSession.sessionState.conf.defaultSizeInBytes
- }
- } else {
- sparkSession.sessionState.conf.defaultSizeInBytes
- })
- }
- ))
- }
-
- // When metastore partition pruning is turned off, we cache the list of all partitions to
- // mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = {
- sparkSession.sharedState.externalCatalog.listPartitions(
- catalogTable.database,
- catalogTable.identifier.table)
- }
-
- def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
- val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
- sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
- catalogTable.database,
- catalogTable.identifier.table,
- predicates)
- } else {
- allPartitions
- }
-
- rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
- }
-
- /** Only compare database and tablename, not alias. */
- override def sameResult(plan: LogicalPlan): Boolean = {
- plan.canonicalized match {
- case mr: MetastoreRelation =>
- mr.databaseName == databaseName && mr.tableName == tableName
- case _ => false
- }
- }
-
- val tableDesc = new TableDesc(
- hiveQlTable.getInputFormatClass,
- // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
- // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
- // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
- // HiveSequenceFileOutputFormat.
- hiveQlTable.getOutputFormatClass,
- hiveQlTable.getMetadata
- )
-
- implicit class SchemaAttribute(f: StructField) {
- def toAttribute: AttributeReference = AttributeReference(
- f.name,
- f.dataType,
- // Since data can be dumped in randomly with no validation, everything is nullable.
- nullable = true
- )(qualifier = Some(tableName))
- }
-
- /** PartitionKey attributes */
- val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
-
- /** Non-partitionKey attributes */
- val dataColKeys = catalogTable.schema
- .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
- .map(_.toAttribute)
-
- val output = dataColKeys ++ partitionKeys
-
- /** An attribute map that can be used to lookup original attributes based on expression id. */
- val attributeMap = AttributeMap(output.map(o => (o, o)))
-
- /** An attribute map for determining the ordinal for non-partition columns. */
- val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex)
-
- override def inputFiles: Array[String] = {
- val partLocations = allPartitions
- .flatMap(_.storage.locationUri)
- .toArray
- if (partLocations.nonEmpty) {
- partLocations
- } else {
- Array(
- catalogTable.storage.locationUri.getOrElse(
- sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
- }
- }
-
- override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d48702b610..16c1103dd1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -61,7 +61,8 @@ private[hive] sealed trait TableReader {
private[hive]
class HadoopTableReader(
@transient private val attributes: Seq[Attribute],
- @transient private val relation: MetastoreRelation,
+ @transient private val partitionKeys: Seq[Attribute],
+ @transient private val tableDesc: TableDesc,
@transient private val sparkSession: SparkSession,
hadoopConf: Configuration)
extends TableReader with Logging {
@@ -88,7 +89,7 @@ class HadoopTableReader(
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
- Utils.classForName(relation.tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
+ Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
filterOpt = None)
/**
@@ -110,7 +111,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
- val tableDesc = relation.tableDesc
+ val localTableDesc = tableDesc
val broadcastedHadoopConf = _broadcastedHadoopConf
val tablePath = hiveTable.getPath
@@ -119,7 +120,7 @@ class HadoopTableReader(
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
- val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
+ val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -127,7 +128,7 @@ class HadoopTableReader(
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
- deserializer.initialize(hconf, tableDesc.getProperties)
+ deserializer.initialize(hconf, localTableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
@@ -212,8 +213,6 @@ class HadoopTableReader(
partCols.map(col => new String(partSpec.get(col))).toArray
}
- // Create local references so that the outer object isn't serialized.
- val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHadoopConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -222,12 +221,12 @@ class HadoopTableReader(
// Attached indices indicate the position of each attribute in the output schema.
val (partitionKeyAttrs, nonPartitionKeyAttrs) =
attributes.zipWithIndex.partition { case (attr, _) =>
- relation.partitionKeys.contains(attr)
+ partitionKeys.contains(attr)
}
def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
partitionKeyAttrs.foreach { case (attr, ordinal) =>
- val partOrdinal = relation.partitionKeys.indexOf(attr)
+ val partOrdinal = partitionKeys.indexOf(attr)
row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
}
}
@@ -235,9 +234,11 @@ class HadoopTableReader(
// Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)
- val tableProperties = relation.tableDesc.getProperties
+ val tableProperties = tableDesc.getProperties
- createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
+ // Create local references so that the outer object isn't serialized.
+ val localTableDesc = tableDesc
+ createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
// SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
@@ -251,8 +252,8 @@ class HadoopTableReader(
}
deserializer.initialize(hconf, props)
// get the table deserializer
- val tableSerDe = tableDesc.getDeserializerClass.newInstance()
- tableSerDe.initialize(hconf, tableDesc.getProperties)
+ val tableSerDe = localTableDesc.getDeserializerClass.newInstance()
+ tableSerDe.initialize(hconf, localTableDesc.getProperties)
// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 140c352fa6..14b9565be0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
@@ -29,10 +30,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.util.Utils
@@ -46,12 +49,12 @@ import org.apache.spark.util.Utils
private[hive]
case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
- relation: MetastoreRelation,
+ relation: CatalogRelation,
partitionPruningPred: Seq[Expression])(
@transient private val sparkSession: SparkSession)
extends LeafExecNode {
- require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
+ require(partitionPruningPred.isEmpty || relation.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
override lazy val metrics = Map(
@@ -60,42 +63,54 @@ case class HiveTableScanExec(
override def producedAttributes: AttributeSet = outputSet ++
AttributeSet(partitionPruningPred.flatMap(_.references))
- // Retrieve the original attributes based on expression ID so that capitalization matches.
- val attributes = requestedAttributes.map(relation.attributeMap)
+ private val originalAttributes = AttributeMap(relation.output.map(a => a -> a))
+
+ override val output: Seq[Attribute] = {
+ // Retrieve the original attributes based on expression ID so that capitalization matches.
+ requestedAttributes.map(originalAttributes)
+ }
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
- private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
+ private val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
- BindReferences.bindReference(pred, relation.partitionKeys)
+ BindReferences.bindReference(pred, relation.partitionCols)
}
// Create a local copy of hadoopConf,so that scan specific modifications should not impact
// other queries
- @transient
- private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ @transient private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
+ @transient private val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta)
+ @transient private val tableDesc = new TableDesc(
+ hiveQlTable.getInputFormatClass,
+ hiveQlTable.getOutputFormatClass,
+ hiveQlTable.getMetadata)
// append columns ids and names before broadcast
addColumnMetadataToConf(hadoopConf)
- @transient
- private[this] val hadoopReader =
- new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
+ @transient private val hadoopReader = new HadoopTableReader(
+ output,
+ relation.partitionCols,
+ tableDesc,
+ sparkSession,
+ hadoopConf)
- private[this] def castFromString(value: String, dataType: DataType) = {
+ private def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
private def addColumnMetadataToConf(hiveConf: Configuration) {
// Specifies needed column IDs for those non-partitioning columns.
- val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
+ val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+ val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer)
- HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name))
+ HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name))
- val tableDesc = relation.tableDesc
val deserializer = tableDesc.getDeserializerClass.newInstance
deserializer.initialize(hiveConf, tableDesc.getProperties)
@@ -113,7 +128,7 @@ case class HiveTableScanExec(
.mkString(",")
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
- hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(","))
+ hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(","))
}
/**
@@ -126,7 +141,7 @@ case class HiveTableScanExec(
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
- val dataTypes = relation.partitionKeys.map(_.dataType)
+ val dataTypes = relation.partitionCols.map(_.dataType)
val castedValues = part.getValues.asScala.zip(dataTypes)
.map { case (value, dataType) => castFromString(value, dataType) }
@@ -138,27 +153,35 @@ case class HiveTableScanExec(
}
}
+ // exposed for tests
+ @transient lazy val rawPartitions = {
+ val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
+ // Retrieve the original attributes based on expression ID so that capitalization matches.
+ val normalizedFilters = partitionPruningPred.map(_.transform {
+ case a: AttributeReference => originalAttributes(a)
+ })
+ sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
+ relation.tableMeta.database,
+ relation.tableMeta.identifier.table,
+ normalizedFilters)
+ } else {
+ sparkSession.sharedState.externalCatalog.listPartitions(
+ relation.tableMeta.database,
+ relation.tableMeta.identifier.table)
+ }
+ prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
// Using dummyCallSite, as getCallSite can turn out to be expensive with
// with multiple partitions.
- val rdd = if (!relation.hiveQlTable.isPartitioned) {
+ val rdd = if (!relation.isPartitioned) {
Utils.withDummyCallSite(sqlContext.sparkContext) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ hadoopReader.makeRDDForTable(hiveQlTable)
}
} else {
- // The attribute name of predicate could be different than the one in schema in case of
- // case insensitive, we should change them to match the one in schema, so we do not need to
- // worry about case sensitivity anymore.
- val normalizedFilters = partitionPruningPred.map { e =>
- e transform {
- case a: AttributeReference =>
- a.withName(relation.output.find(_.semanticEquals(a)).get.name)
- }
- }
-
Utils.withDummyCallSite(sqlContext.sparkContext) {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(normalizedFilters)))
+ hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions))
}
}
val numOutputRows = longMetric("numOutputRows")
@@ -174,8 +197,6 @@ case class HiveTableScanExec(
}
}
- override def output: Seq[Attribute] = attributes
-
override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: HiveTableScanExec =>
val thisPredicates = partitionPruningPred.map(cleanExpression)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 142f25defb..f107149ada 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -29,16 +29,18 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.ErrorMsg
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.HiveVersion
+import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
import org.apache.spark.SparkException
@@ -52,9 +54,7 @@ import org.apache.spark.SparkException
* In the future we should converge the write path for Hive with the normal data source write path,
* as defined in `org.apache.spark.sql.execution.datasources.FileFormatWriter`.
*
- * @param table the logical plan representing the table. In the future this should be a
- * `org.apache.spark.sql.catalyst.catalog.CatalogTable` once we converge Hive tables
- * and data source tables.
+ * @param table the metadata of the table.
* @param partition a map from the partition key to the partition value (optional). If the partition
* value is optional, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
@@ -74,7 +74,7 @@ import org.apache.spark.SparkException
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoHiveTable(
- table: MetastoreRelation,
+ table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
@@ -218,10 +218,19 @@ case class InsertIntoHiveTable(
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+ val hiveQlTable = HiveClientImpl.toHiveTable(table)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
- val tableDesc = table.tableDesc
- val tableLocation = table.hiveQlTable.getDataLocation
+ val tableDesc = new TableDesc(
+ hiveQlTable.getInputFormatClass,
+ // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
+ // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
+ // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
+ // HiveSequenceFileOutputFormat.
+ hiveQlTable.getOutputFormatClass,
+ hiveQlTable.getMetadata
+ )
+ val tableLocation = hiveQlTable.getDataLocation
val tmpLocation =
getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
@@ -254,9 +263,9 @@ case class InsertIntoHiveTable(
// By this time, the partition map must match the table's partition columns
if (partitionColumnNames.toSet != partition.keySet) {
throw new SparkException(
- s"""Requested partitioning does not match the ${table.tableName} table:
+ s"""Requested partitioning does not match the ${table.identifier.table} table:
|Requested partitions: ${partition.keys.mkString(",")}
- |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin)
+ |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
}
// Validate partition spec if there exist any dynamic partitions
@@ -307,8 +316,8 @@ case class InsertIntoHiveTable(
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
externalCatalog.loadDynamicPartitions(
- db = table.catalogTable.database,
- table = table.catalogTable.identifier.table,
+ db = table.database,
+ table = table.identifier.table,
tmpLocation.toString,
partitionSpec,
overwrite,
@@ -320,8 +329,8 @@ case class InsertIntoHiveTable(
// scalastyle:on
val oldPart =
externalCatalog.getPartitionOption(
- table.catalogTable.database,
- table.catalogTable.identifier.table,
+ table.database,
+ table.identifier.table,
partitionSpec)
var doHiveOverwrite = overwrite
@@ -350,8 +359,8 @@ case class InsertIntoHiveTable(
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
externalCatalog.loadPartition(
- table.catalogTable.database,
- table.catalogTable.identifier.table,
+ table.database,
+ table.identifier.table,
tmpLocation.toString,
partitionSpec,
isOverwrite = doHiveOverwrite,
@@ -361,8 +370,8 @@ case class InsertIntoHiveTable(
}
} else {
externalCatalog.loadTable(
- table.catalogTable.database,
- table.catalogTable.identifier.table,
+ table.database,
+ table.identifier.table,
tmpLocation.toString, // TODO: URI
overwrite,
isSrcLocal = false)
@@ -378,8 +387,8 @@ case class InsertIntoHiveTable(
}
// Invalidate the cache.
- sparkSession.sharedState.cacheManager.invalidateCache(table)
- sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier)
+ sparkSession.catalog.uncacheTable(table.qualifiedName)
+ sparkSession.sessionState.catalog.refreshTable(table.identifier)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
deleted file mode 100644
index 91ff711445..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
- test("makeCopy and toJSON should work") {
- val table = CatalogTable(
- identifier = TableIdentifier("test", Some("db")),
- tableType = CatalogTableType.VIEW,
- storage = CatalogStorageFormat.empty,
- schema = StructType(StructField("a", IntegerType, true) :: Nil))
- val relation = MetastoreRelation("db", "test")(table, null)
-
- // No exception should be thrown
- relation.makeCopy(Array("db", "test"))
- // No exception should be thrown
- relation.toJSON
- }
-
- test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") {
- withTable("bar") {
- withTempView("foo") {
- sql("select 0 as id").createOrReplaceTempView("foo")
- // If we optimize the query in CTAS more than once, the following saveAsTable will fail
- // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])`
- sql("CREATE TABLE bar AS SELECT * FROM foo group by id")
- checkAnswer(spark.table("bar"), Row(0) :: Nil)
- val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
- assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table")
- }
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e2fcd2fd41..962998ea6f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
@@ -33,52 +33,46 @@ import org.apache.spark.sql.types._
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
- test("MetastoreRelations fallback to HDFS for size estimation") {
- val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled
- try {
- withTempDir { tempDir =>
-
- // EXTERNAL OpenCSVSerde table pointing to LOCATION
-
- val file1 = new File(tempDir + "/data1")
- val writer1 = new PrintWriter(file1)
- writer1.write("1,2")
- writer1.close()
-
- val file2 = new File(tempDir + "/data2")
- val writer2 = new PrintWriter(file2)
- writer2.write("1,2")
- writer2.close()
-
- sql(
- s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
- WITH SERDEPROPERTIES (
- \"separatorChar\" = \",\",
- \"quoteChar\" = \"\\\"\",
- \"escapeChar\" = \"\\\\\")
- LOCATION '${tempDir.toURI}'
- """)
-
- spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
-
- val relation = spark.table("csv_table").queryExecution.analyzed.children.head
- .asInstanceOf[MetastoreRelation]
-
- val properties = relation.hiveQlTable.getParameters
- assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
- assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
-
- val sizeInBytes = relation.stats(conf).sizeInBytes
- assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+ test("Hive serde tables should fallback to HDFS for size estimation") {
+ withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
+ withTable("csv_table") {
+ withTempDir { tempDir =>
+ // EXTERNAL OpenCSVSerde table pointing to LOCATION
+ val file1 = new File(tempDir + "/data1")
+ val writer1 = new PrintWriter(file1)
+ writer1.write("1,2")
+ writer1.close()
+
+ val file2 = new File(tempDir + "/data2")
+ val writer2 = new PrintWriter(file2)
+ writer2.write("1,2")
+ writer2.close()
+
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ |WITH SERDEPROPERTIES (
+ |\"separatorChar\" = \",\",
+ |\"quoteChar\" = \"\\\"\",
+ |\"escapeChar\" = \"\\\\\")
+ |LOCATION '${tempDir.toURI}'""".stripMargin)
+
+ val relation = spark.table("csv_table").queryExecution.analyzed.children.head
+ .asInstanceOf[CatalogRelation]
+
+ val properties = relation.tableMeta.properties
+ assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
+ assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
+
+ val sizeInBytes = relation.stats(conf).sizeInBytes
+ assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+ }
}
- } finally {
- spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats)
- sql("DROP TABLE csv_table ")
}
}
- test("analyze MetastoreRelations") {
+ test("analyze Hive serde tables") {
def queryTotalSize(tableName: String): BigInt =
spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
@@ -152,9 +146,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
private def checkTableStats(
- stats: Option[CatalogStatistics],
+ tableName: String,
hasSizeInBytes: Boolean,
- expectedRowCounts: Option[Int]): Unit = {
+ expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
+ val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
+
if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
assert(stats.isDefined)
assert(stats.get.sizeInBytes > 0)
@@ -162,26 +158,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
} else {
assert(stats.isEmpty)
}
- }
- private def checkTableStats(
- tableName: String,
- isDataSourceTable: Boolean,
- hasSizeInBytes: Boolean,
- expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
- val df = sql(s"SELECT * FROM $tableName")
- val stats = df.queryExecution.analyzed.collect {
- case rel: MetastoreRelation =>
- checkTableStats(rel.catalogTable.stats, hasSizeInBytes, expectedRowCounts)
- assert(!isDataSourceTable, "Expected a Hive serde table, but got a data source table")
- rel.catalogTable.stats
- case rel: LogicalRelation =>
- checkTableStats(rel.catalogTable.get.stats, hasSizeInBytes, expectedRowCounts)
- assert(isDataSourceTable, "Expected a data source table, but got a Hive serde table")
- rel.catalogTable.get.stats
- }
- assert(stats.size == 1)
- stats.head
+ stats
}
test("test table-level statistics for hive tables created in HiveExternalCatalog") {
@@ -192,25 +170,23 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
checkTableStats(
textTable,
- isDataSourceTable = false,
hasSizeInBytes = false,
expectedRowCounts = None)
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
checkTableStats(
textTable,
- isDataSourceTable = false,
hasSizeInBytes = false,
expectedRowCounts = None)
// noscan won't count the number of rows
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
- val fetchedStats1 = checkTableStats(
- textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None)
+ val fetchedStats1 =
+ checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None)
// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
- val fetchedStats2 = checkTableStats(
- textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
+ val fetchedStats2 =
+ checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
}
}
@@ -221,25 +197,25 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
- val fetchedStats1 = checkTableStats(
- textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
+ val fetchedStats1 =
+ checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// when the total size is not changed, the old row count is kept
- val fetchedStats2 = checkTableStats(
- textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = Some(500))
+ val fetchedStats2 =
+ checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1 == fetchedStats2)
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
// update total size and remove the old and invalid row count
- val fetchedStats3 = checkTableStats(
- textTable, isDataSourceTable = false, hasSizeInBytes = true, expectedRowCounts = None)
+ val fetchedStats3 =
+ checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes)
}
}
- test("test statistics of LogicalRelation converted from MetastoreRelation") {
+ test("test statistics of LogicalRelation converted from Hive serde tables") {
val parquetTable = "parquetTable"
val orcTable = "orcTable"
withTable(parquetTable, orcTable) {
@@ -251,21 +227,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
// for robustness
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
- checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
+ checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None)
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
- checkTableStats(
- parquetTable,
- isDataSourceTable = true,
- hasSizeInBytes = true,
- expectedRowCounts = Some(500))
+ checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
}
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
- checkTableStats(
- orcTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
+ checkTableStats(orcTable, hasSizeInBytes = false, expectedRowCounts = None)
sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
- checkTableStats(
- orcTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = Some(500))
+ checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
}
}
}
@@ -385,27 +354,23 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Add a filter to avoid creating too many partitions
sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10")
- checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = false, expectedRowCounts = None)
+ checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None)
// noscan won't count the number of rows
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats1 = checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+ val fetchedStats1 =
+ checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None)
sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10")
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan")
- val fetchedStats2 = checkTableStats(
- parquetTable, isDataSourceTable = true, hasSizeInBytes = true, expectedRowCounts = None)
+ val fetchedStats2 =
+ checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes)
// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
- val fetchedStats3 = checkTableStats(
- parquetTable,
- isDataSourceTable = true,
- hasSizeInBytes = true,
- expectedRowCounts = Some(20))
+ val fetchedStats3 =
+ checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(20))
assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes)
}
}
@@ -426,11 +391,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.write.format("json").saveAsTable(table_no_cols)
sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
- checkTableStats(
- table_no_cols,
- isDataSourceTable = true,
- hasSizeInBytes = true,
- expectedRowCounts = Some(10))
+ checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10))
}
}
@@ -478,10 +439,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
assert(statsAfterUpdate.rowCount == Some(2))
}
- test("estimates the size of a test MetastoreRelation") {
+ test("estimates the size of a test Hive serde tables") {
val df = sql("""SELECT * FROM src""")
- val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
- mr.stats(conf).sizeInBytes
+ val sizes = df.queryExecution.analyzed.collect {
+ case relation: CatalogRelation => relation.stats(conf).sizeInBytes
}
assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(5812)),
@@ -533,7 +494,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
after()
}
- /** Tests for MetastoreRelation */
+ /** Tests for Hive serde tables */
val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key"""
val metastoreAnswer = Seq.fill(4)(Row(238, "val_238", 238, "val_238"))
mkTest(
@@ -541,7 +502,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
() => (),
metastoreQuery,
metastoreAnswer,
- implicitly[ClassTag[MetastoreRelation]]
+ implicitly[ClassTag[CatalogRelation]]
)
}
@@ -555,9 +516,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Assert src has a size smaller than the threshold.
val sizes = df.queryExecution.analyzed.collect {
- case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass
- .isAssignableFrom(r.getClass) =>
- r.stats(conf).sizeInBytes
+ case relation: CatalogRelation => relation.stats(conf).sizeInBytes
}
assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
&& sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index f3151d52f2..536ca8fd9d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -385,7 +385,7 @@ abstract class HiveComparisonTest
// also print out the query plans and results for those.
val computedTablesMessages: String = try {
val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect {
- case ts: HiveTableScanExec => ts.relation.tableName
+ case ts: HiveTableScanExec => ts.relation.tableMeta.identifier
}.toSet
TestHive.reset()
@@ -393,7 +393,7 @@ abstract class HiveComparisonTest
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
case (q, e) => e.analyzed.collect {
- case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
+ case i: InsertIntoHiveTable if tablesRead contains i.table.identifier =>
(q, e, i)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 5c460d25f3..90e037e292 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.Row
-import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -95,8 +94,7 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
private def checkNumScannedPartitions(stmt: String, expectedNumParts: Int): Unit = {
val plan = sql(stmt).queryExecution.sparkPlan
val numPartitions = plan.collectFirst {
- case p: HiveTableScanExec =>
- p.relation.getHiveQlPartitions(p.partitionPruningPred).length
+ case p: HiveTableScanExec => p.rawPartitions.length
}.getOrElse(0)
assert(numPartitions == expectedNumParts)
}
@@ -170,11 +168,11 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
s"""
|SELECT * FROM $table
""".stripMargin).queryExecution.sparkPlan
- val relation = plan.collectFirst {
- case p: HiveTableScanExec => p.relation
+ val scan = plan.collectFirst {
+ case p: HiveTableScanExec => p
}.get
- val tableCols = relation.hiveQlTable.getCols
- relation.getHiveQlPartitions().foreach(p => assert(p.getCols.size == tableCols.size))
+ val numDataCols = scan.relation.dataCols.length
+ scan.rawPartitions.foreach(p => assert(p.getCols.size == numDataCols))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 24df73b40e..d535bef4cc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -153,8 +153,8 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
val (actualScannedColumns, actualPartValues) = plan.collect {
case p @ HiveTableScanExec(columns, relation, _) =>
val columnNames = columns.map(_.name)
- val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) {
- p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
+ val partValues = if (relation.isPartitioned) {
+ p.prunePartitions(p.rawPartitions).map(_.getValues)
} else {
Seq.empty
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 9f6176339e..ef2d451e6b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,12 +28,12 @@ import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -526,7 +526,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
case LogicalRelation(r: HadoopFsRelation, _, _) =>
if (!isDataSourceTable) {
fail(
- s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
+ s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
}
userSpecifiedLocation match {
@@ -536,15 +536,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
assert(catalogTable.provider.get === format)
- case r: MetastoreRelation =>
+ case r: CatalogRelation =>
if (isDataSourceTable) {
fail(
s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
- s"${classOf[MetastoreRelation].getCanonicalName}.")
+ s"${classOf[CatalogRelation].getCanonicalName}.")
}
userSpecifiedLocation match {
case Some(location) =>
- assert(r.catalogTable.location === location)
+ assert(r.tableMeta.location === location)
case None => // OK.
}
// Also make sure that the format and serde are as desired.
@@ -1030,7 +1030,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
sql("CREATE TABLE explodeTest (key bigInt)")
table("explodeTest").queryExecution.analyzed match {
- case SubqueryAlias(_, r: MetastoreRelation, _) => // OK
+ case SubqueryAlias(_, r: CatalogRelation, _) => // OK
case _ =>
fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
}
@@ -2043,4 +2043,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}
+
+ test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") {
+ withTable("bar") {
+ withTempView("foo") {
+ sql("select 0 as id").createOrReplaceTempView("foo")
+ // If we optimize the query in CTAS more than once, the following saveAsTable will fail
+ // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])`
+ sql("SELECT * FROM foo group by id").toDF().write.format("hive").saveAsTable("bar")
+ checkAnswer(spark.table("bar"), Row(0) :: Nil)
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
+ assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 9fa1fb931d..38a5477796 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,8 +26,9 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
-import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.internal.SQLConf
@@ -473,7 +474,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
} else {
queryExecution.analyzed.collectFirst {
- case _: MetastoreRelation => ()
+ case _: CatalogRelation => ()
}.getOrElse {
fail(s"Expecting no conversion from orc to data sources, " +
s"but got:\n$queryExecution")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1a1b2571b6..3512c4a890 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,8 +21,8 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -806,7 +806,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
}
} else {
queryExecution.analyzed.collectFirst {
- case _: MetastoreRelation =>
+ case _: CatalogRelation =>
}.getOrElse {
fail(s"Expecting no conversion from parquet to data sources, " +
s"but got:\n$queryExecution")