aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-19 00:07:48 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-19 00:07:48 -0800
commit2e62560024999c215cf2373fc9a8070bb2ad5c58 (patch)
tree75fadf15026f631e0e8195beda1d1f7c03258c1b /sql/hive
parent0c9231858866eff16f97df073d22811176fb6b36 (diff)
downloadspark-2e62560024999c215cf2373fc9a8070bb2ad5c58.tar.gz
spark-2e62560024999c215cf2373fc9a8070bb2ad5c58.tar.bz2
spark-2e62560024999c215cf2373fc9a8070bb2ad5c58.zip
[SPARK-19265][SQL] make table relation cache general and does not depend on hive
## What changes were proposed in this pull request? We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc. However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support. It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually. main changes: 1. move the table relation cache to `SessionCatalog` 2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore 3. `FindDataSourceTable` will read/write the table relation cache. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #16621 from cloud-fan/plan-cache.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala98
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala37
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala17
9 files changed, 70 insertions, 141 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e4b1f6ae3e..faa76b73fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,15 @@
package org.apache.spark.sql.hive
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -41,9 +39,7 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
-
- /** A fully qualified identifier for a table (i.e., database.tableName) */
- case class QualifiedTableName(database: String, name: String)
+ private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
@@ -65,45 +61,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
- /** A cache of Spark SQL data source tables that have been accessed. */
- protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
- val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
- override def load(in: QualifiedTableName): LogicalPlan = {
- logDebug(s"Creating new cached data source for $in")
- val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
-
- val pathOption = table.storage.locationUri.map("path" -> _)
- val dataSource =
- DataSource(
- sparkSession,
- // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
- // inferred at runtime. We should still support it.
- userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- className = table.provider.get,
- options = table.storage.properties ++ pathOption,
- catalogTable = Some(table))
-
- LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
- }
- }
-
- CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
- }
-
- def refreshTable(tableIdent: TableIdentifier): Unit = {
- // refreshTable does not eagerly reload the cache. It just invalidate the cache.
- // Next time when we use the table, it will be populated in the cache.
- // Since we also cache ParquetRelations converted from Hive Parquet tables and
- // adding converted ParquetRelations into the cache is not defined in the load function
- // of the cache (instead, we add the cache entry in convertToParquetRelation),
- // it is better at here to invalidate the cache to avoid confusing waring logs from the
- // cache loader (e.g. cannot find data source provider, which is only defined for
- // data source table.).
- cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
- }
-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
@@ -111,45 +68,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
new Path(new Path(dbLocation), tblName).toString
}
- /**
- * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore.
- *
- * @param tableIdent The name of the table/view that we look up.
- * @param alias The alias name of the table/view that we look up.
- * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore.
- */
- def lookupRelation(
- tableIdent: TableIdentifier,
- alias: Option[String]): LogicalPlan = {
- val qualifiedTableName = getQualifiedTableName(tableIdent)
- val table = sparkSession.sharedState.externalCatalog.getTable(
- qualifiedTableName.database, qualifiedTableName.name)
-
- if (DDLUtils.isDatasourceTable(table)) {
- val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
- val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
- // Then, if alias is specified, wrap the table with a Subquery using the alias.
- // Otherwise, wrap the table with a Subquery using the table name.
- alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
- } else if (table.tableType == CatalogTableType.VIEW) {
- val tableIdentifier = table.identifier
- val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
- // The relation is a view, so we wrap the relation by:
- // 1. Add a [[View]] operator over the relation to keep track of the view desc;
- // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
- val child = View(
- desc = table,
- output = table.schema.toAttributes,
- child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
- SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier))
- } else {
- val qualifiedTable =
- MetastoreRelation(
- qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
- alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
- }
- }
-
private def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[Path],
@@ -159,7 +77,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
expectedBucketSpec: Option[BucketSpec],
partitionSchema: Option[StructType]): Option[LogicalRelation] = {
- cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ tableRelationCache.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -178,7 +96,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
- cachedDataSourceTables.invalidate(tableIdentifier)
+ tableRelationCache.invalidate(tableIdentifier)
None
}
case _ =>
@@ -187,7 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
s"should be stored as $expectedFileFormat. However, we are getting " +
s"a ${relation.fileFormat} from the metastore cache. This cached " +
s"entry will be invalidated.")
- cachedDataSourceTables.invalidate(tableIdentifier)
+ tableRelationCache.invalidate(tableIdentifier)
None
}
case other =>
@@ -195,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
- cachedDataSourceTables.invalidate(tableIdentifier)
+ tableRelationCache.invalidate(tableIdentifier)
None
}
}
@@ -270,7 +188,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val created = LogicalRelation(relation,
catalogTable = Some(metastoreRelation.catalogTable))
- cachedDataSourceTables.put(tableIdentifier, created)
+ tableRelationCache.put(tableIdentifier, created)
created
}
@@ -298,7 +216,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
className = fileType).resolveRelation(),
catalogTable = Some(metastoreRelation.catalogTable))
- cachedDataSourceTables.put(tableIdentifier, created)
+ tableRelationCache.put(tableIdentifier, created)
created
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index b3cbbedbe1..44ef5cce2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
@@ -58,28 +58,6 @@ private[sql] class HiveSessionCatalog(
hadoopConf,
parser) {
- override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
- synchronized {
- val table = formatTableName(name.table)
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- if (db == globalTempViewManager.database) {
- val relationAlias = alias.getOrElse(table)
- globalTempViewManager.get(table).map { viewDef =>
- SubqueryAlias(relationAlias, viewDef, Some(name))
- }.getOrElse(throw new NoSuchTableException(db, table))
- } else if (name.database.isDefined || !tempTables.contains(table)) {
- val newName = name.copy(database = Some(db), table = table)
- metastoreCatalog.lookupRelation(newName, alias)
- } else {
- val relation = tempTables(table)
- val tableWithQualifiers = SubqueryAlias(table, relation, None)
- // If an alias was specified by the lookup, wrap the plan in a subquery so that
- // attributes are properly qualified with this alias.
- alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
- }
- }
- }
-
// ----------------------------------------------------------------
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------
@@ -93,15 +71,6 @@ private[sql] class HiveSessionCatalog(
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
- override def refreshTable(name: TableIdentifier): Unit = {
- super.refreshTable(name)
- metastoreCatalog.refreshTable(name)
- }
-
- def invalidateCache(): Unit = {
- metastoreCatalog.cachedDataSourceTables.invalidateAll()
- }
-
def hiveDefaultTableFilePath(name: TableIdentifier): String = {
metastoreCatalog.hiveDefaultTableFilePath(name)
}
@@ -109,7 +78,7 @@ private[sql] class HiveSessionCatalog(
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = metastoreCatalog.getQualifiedTableName(table)
- metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+ sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key)
}
override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 4e30d038b1..d3cef6e0cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -67,6 +67,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
new HiveAnalysis(sparkSession) ::
+ new FindDataSourceTable(sparkSession) ::
+ new FindHiveSerdeTable(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7987a0a84c..b649612a40 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -127,6 +127,21 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
}
}
+/**
+ * Replaces [[SimpleCatalogRelation]] with [[MetastoreRelation]] if its table provider is hive.
+ */
+class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+ if DDLUtils.isHiveTable(s.metadata) =>
+ i.copy(table =
+ MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session))
+
+ case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
+ MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)
+ }
+}
+
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a001f..ccc2d64c4a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
@@ -73,7 +73,9 @@ case class CreateHiveTableAsSelectCommand(
// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
- case r: MetastoreRelation => r
+ case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
+ val tableMeta = r.metadata
+ MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
}
}
// TODO ideally, we should get the output data ready first and then
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index dcb8e498a4..3267c237c8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -431,7 +431,7 @@ private[hive] class TestHiveSparkSession(
sharedState.cacheManager.clearCache()
loadedTables.clear()
sessionState.catalog.clearTempTables()
- sessionState.catalog.invalidateCache()
+ sessionState.catalog.tableRelationCache.invalidateAll()
sessionState.metadataHive.reset()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 081f6f6d82..f0e2c9369b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1322,4 +1322,26 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
}
}
+
+ test("SPARK-18464: support old table which doesn't store schema in table properties") {
+ withTable("old") {
+ withTempPath { path =>
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+ val tableDesc = CatalogTable(
+ identifier = TableIdentifier("old", Some("default")),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat.empty.copy(
+ properties = Map("path" -> path.getAbsolutePath)
+ ),
+ schema = new StructType(),
+ properties = Map(
+ HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+ hiveClient.createTable(tableDesc, ignoreIfExists = false)
+
+ checkAnswer(spark.table("old"), Row(1, "a"))
+
+ checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", "string", null) :: Nil)
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 0053aa1642..e2fcd2fd41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -62,7 +62,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
- val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+ val relation = spark.table("csv_table").queryExecution.analyzed.children.head
.asInstanceOf[MetastoreRelation]
val properties = relation.hiveQlTable.getParameters
@@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes
+ spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -451,7 +451,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
}
// Table lookup will make the table cached.
- catalog.lookupRelation(tableIndent)
+ spark.table(tableIndent)
statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
@@ -461,7 +461,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
} else {
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
}
- catalog.lookupRelation(tableIndent)
+ spark.table(tableIndent)
statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 953e29127f..104b5250b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
@@ -513,8 +514,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
isDataSourceTable: Boolean,
format: String,
userSpecifiedLocation: Option[String] = None): Unit = {
- val relation = EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+ var relation: LogicalPlan = null
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+ HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
+ relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed)
+ }
val catalogTable =
sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
relation match {
@@ -1021,13 +1026,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).createOrReplaceTempView("data")
- val originalConf = sessionState.conf.convertCTAS
- setConf(SQLConf.CONVERT_CTAS, false)
- try {
+ withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
sql("CREATE TABLE explodeTest (key bigInt)")
table("explodeTest").queryExecution.analyzed match {
- case metastoreRelation: MetastoreRelation => // OK
+ case SubqueryAlias(_, r: MetastoreRelation, _) => // OK
case _ =>
fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
}
@@ -1040,8 +1043,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sql("DROP TABLE explodeTest")
dropTempTable("data")
- } finally {
- setConf(SQLConf.CONVERT_CTAS, originalConf)
}
}