aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-28 23:14:31 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-28 23:14:31 +0800
commitf82461fc1197f6055d9cf972d82260b178e10a7c (patch)
tree36bb1f58ce3080b1b2d86cd8c2b99148d07cbf0c /sql/hive
parent4fcc214d9eb5e98b2eed3e28cc23b0c511cd9007 (diff)
downloadspark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.gz
spark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.bz2
spark-f82461fc1197f6055d9cf972d82260b178e10a7c.zip
[SPARK-20126][SQL] Remove HiveSessionState
## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17457 from hvanhovell/SPARK-20126.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala144
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala21
9 files changed, 62 insertions, 160 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f78660f7c1..0a53aaca40 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalLocale = Locale.getDefault
private val originalColumnBatchSize = TestHive.conf.columnBatchSize
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
- private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc
+ private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5393c57c9a..02a5117f00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -48,10 +48,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
new HiveContext(sparkSession.newSession())
}
- protected[sql] override def sessionState: HiveSessionState = {
- sparkSession.sessionState.asInstanceOf[HiveSessionState]
- }
-
/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2e060ab9f6..305bd007c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
// these are def_s and not val/lazy val since the latter would introduce circular references
- private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
+ private def sessionState = sparkSession.sessionState
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
import HiveMetastoreCatalog._
@@ -281,12 +281,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
- sessionState.convertMetastoreParquet
+ sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}
private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[ParquetFileFormat]
- val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
+ val mergeSchema = sessionState.conf.getConf(
+ HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
@@ -316,7 +317,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
object OrcConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
- sessionState.convertMetastoreOrc
+ sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}
private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 49ff8478f1..f49e6bb418 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,121 +17,24 @@
package org.apache.spark.sql.hive
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
+import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
-import org.apache.spark.sql.streaming.StreamingQueryManager
-
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
/**
- * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
- *
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
- * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
- * @param functionRegistry Internal catalog for managing functions registered by the user.
- * @param catalog Internal catalog for managing table and database states that uses Hive client for
- * interacting with the metastore.
- * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
- * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param optimizer Logical query plan optimizer.
- * @param planner Planner that converts optimized logical plans to physical plans and that takes
- * Hive-specific strategies into account.
- * @param streamingQueryManager Interface to start and stop streaming queries.
- * @param createQueryExecution Function used to create QueryExecution objects.
- * @param createClone Function used to create clones of the session state.
- * @param metadataHive The Hive metadata client.
+ * Entry object for creating a Hive aware [[SessionState]].
*/
-private[hive] class HiveSessionState(
- sparkContext: SparkContext,
- sharedState: SharedState,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods,
- functionRegistry: FunctionRegistry,
- override val catalog: HiveSessionCatalog,
- sqlParser: ParserInterface,
- analyzer: Analyzer,
- optimizer: Optimizer,
- planner: SparkPlanner,
- streamingQueryManager: StreamingQueryManager,
- createQueryExecution: LogicalPlan => QueryExecution,
- createClone: (SparkSession, SessionState) => SessionState,
- val metadataHive: HiveClient)
- extends SessionState(
- sparkContext,
- sharedState,
- conf,
- experimentalMethods,
- functionRegistry,
- catalog,
- sqlParser,
- analyzer,
- optimizer,
- planner,
- streamingQueryManager,
- createQueryExecution,
- createClone) {
-
- // ------------------------------------------------------
- // Helper methods, partially leftover from pre-2.0 days
- // ------------------------------------------------------
-
- override def addJar(path: String): Unit = {
- metadataHive.addJar(path)
- super.addJar(path)
- }
-
- /**
- * When true, enables an experimental feature where metastore tables that use the parquet SerDe
- * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
- * SerDe.
- */
- def convertMetastoreParquet: Boolean = {
- conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
- }
-
- /**
- * When true, also tries to merge possibly different but compatible Parquet schemas in different
- * Parquet data files.
- *
- * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
- */
- def convertMetastoreParquetWithSchemaMerging: Boolean = {
- conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
- }
-
- /**
- * When true, enables an experimental feature where metastore tables that use the Orc SerDe
- * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
- * SerDe.
- */
- def convertMetastoreOrc: Boolean = {
- conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
- }
-
- /**
- * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
- */
- def hiveThriftServerAsync: Boolean = {
- conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
- }
-}
-
private[hive] object HiveSessionState {
/**
- * Create a new [[HiveSessionState]] for the given session.
+ * Create a new Hive aware [[SessionState]]. for the given session.
*/
- def apply(session: SparkSession): HiveSessionState = {
+ def apply(session: SparkSession): SessionState = {
new HiveSessionStateBuilder(session).build()
}
}
@@ -148,6 +51,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
/**
+ * Create a Hive aware resource loader.
+ */
+ override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+ val client: HiveClient = externalCatalog.client.newSession()
+ new HiveSessionResourceLoader(session, client)
+ }
+
+ /**
* Create a [[HiveSessionCatalog]].
*/
override protected lazy val catalog: HiveSessionCatalog = {
@@ -159,7 +70,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
- new SessionFunctionResourceLoader(session))
+ resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
@@ -217,23 +128,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
}
override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
+}
- override def build(): HiveSessionState = {
- val metadataHive: HiveClient = externalCatalog.client.newSession()
- new HiveSessionState(
- session.sparkContext,
- session.sharedState,
- conf,
- experimentalMethods,
- functionRegistry,
- catalog,
- sqlParser,
- analyzer,
- optimizer,
- planner,
- streamingQueryManager,
- createQueryExecution,
- createClone,
- metadataHive)
+class HiveSessionResourceLoader(
+ session: SparkSession,
+ client: HiveClient)
+ extends SessionResourceLoader(session) {
+ override def addJar(path: String): Unit = {
+ client.addJar(path)
+ super.addJar(path)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 32ca69605e..0bcf219922 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,6 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
@@ -81,7 +80,7 @@ private[hive] class TestHiveSharedState(
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
- override lazy val externalCatalog: ExternalCatalog = {
+ override lazy val externalCatalog: TestHiveExternalCatalog = {
new TestHiveExternalCatalog(
sc.conf,
sc.hadoopConfiguration,
@@ -123,8 +122,6 @@ class TestHiveContext(
new TestHiveContext(sparkSession.newSession())
}
- override def sessionState: HiveSessionState = sparkSession.sessionState
-
def setCacheTables(c: Boolean): Unit = {
sparkSession.setCacheTables(c)
}
@@ -155,7 +152,7 @@ class TestHiveContext(
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[TestHiveSharedState],
- @transient private val parentSessionState: Option[HiveSessionState],
+ @transient private val parentSessionState: Option[SessionState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@@ -195,10 +192,12 @@ private[hive] class TestHiveSparkSession(
}
@transient
- override lazy val sessionState: HiveSessionState = {
+ override lazy val sessionState: SessionState = {
new TestHiveSessionStateBuilder(this, parentSessionState).build()
}
+ lazy val metadataHive: HiveClient = sharedState.externalCatalog.client.newSession()
+
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
}
@@ -492,7 +491,7 @@ private[hive] class TestHiveSparkSession(
sessionState.catalog.clearTempTables()
sessionState.catalog.tableRelationCache.invalidateAll()
- sessionState.metadataHive.reset()
+ metadataHive.reset()
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
@@ -509,14 +508,14 @@ private[hive] class TestHiveSparkSession(
sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
// other sql exec here.
- sessionState.metadataHive.runSqlHive("RESET")
+ metadataHive.runSqlHive("RESET")
// For some reason, RESET does not reset the following variables...
// https://issues.apache.org/jira/browse/HIVE-9004
- sessionState.metadataHive.runSqlHive("set hive.table.parameters.default=")
- sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections=true")
- sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true")
+ metadataHive.runSqlHive("set hive.table.parameters.default=")
+ metadataHive.runSqlHive("set datanucleus.cache.collections=true")
+ metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true")
// Lots of tests fail if we do not change the partition whitelist from the default.
- sessionState.metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
+ metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
sessionState.catalog.setCurrentDatabase("default")
} catch {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 079358b29a..d8fd68b63d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -115,7 +115,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
- assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
+ assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
}
}
@@ -147,7 +147,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
- assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") ===
+ assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
}
}
@@ -176,7 +176,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(IntegerType, StringType))
checkAnswer(table("t"), Row(1, "val_1"))
- assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
+ assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f02b7218d6..55e02acfa4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -379,8 +379,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|)
""".stripMargin)
- val expectedPath =
- sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
+ val expectedPath = sessionState.catalog.defaultTablePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.delete(filesystemPath, true)
@@ -486,7 +485,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("DROP TABLE savedJsonTable")
intercept[AnalysisException] {
read.json(
- sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
+ sessionState.catalog.defaultTablePath(TableIdentifier("savedJsonTable")).toString)
}
}
@@ -756,7 +755,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
serde = None,
compressed = false,
properties = Map(
- "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+ "path" -> sessionState.catalog.defaultTablePath(TableIdentifier(tableName)).toString)
),
properties = Map(
DATASOURCE_PROVIDER -> "json",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 04bc79d430..f0a995c274 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -128,11 +128,11 @@ class HiveDDLSuite
dbPath: Option[String] = None): Boolean = {
val expectedTablePath =
if (dbPath.isEmpty) {
- hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+ hiveContext.sessionState.catalog.defaultTablePath(tableIdentifier)
} else {
- new Path(new Path(dbPath.get), tableIdentifier.table).toString
+ new Path(new Path(dbPath.get), tableIdentifier.table)
}
- val filesystemPath = new Path(expectedTablePath)
+ val filesystemPath = new Path(expectedTablePath.toString)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.exists(filesystemPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 81af24979d..9fc2923bb6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution.HiveTableScanExec
@@ -448,10 +449,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}
+ private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
+ sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
+ }
+
test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifier: TableIdentifier): Unit = {
// Converted test_parquet should be cached.
- sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match {
+ getCachedDataSourceTable(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case LogicalRelation(_: HadoopFsRelation, _, _) => // OK
case other =>
@@ -479,14 +484,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
// First, make sure the converted test_parquet is not cached.
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
sessionState.refreshTable("test_insert_parquet")
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
@@ -499,7 +504,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select a, b from jt").collect())
// Invalidate the cache.
sessionState.refreshTable("test_insert_parquet")
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
// Create a partitioned table.
sql(
@@ -517,7 +522,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -526,14 +531,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (`date`='2015-04-02')
|select a, b from jt
""".stripMargin)
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
@@ -549,7 +554,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin).collect())
sessionState.refreshTable("test_parquet_partitioned_cache_test")
- assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
}