diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala | 16 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala) | 127 |
3 files changed, 127 insertions, 18 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c909eb5d20..44089335e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (lazyPruningEnabled) { catalog } else { - catalog.allPartitions + catalog.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 81337493c7..d13e29b302 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) } - } + } + + test("table name with schema") { + // regression test for SPARK-11778 + spark.sql("create schema usrdb") + spark.sql("create table usrdb.test(c int)") + spark.read.table("usrdb.test") + spark.sql("drop table usrdb.test") + spark.sql("drop schema usrdb") + } + + test("SPARK-15887: hive-site.xml should be loaded") { + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + assert(hiveClient.getConf("hive.in.test", "") == "true") + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala index 15523437a3..82ee813c6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala @@ -19,25 +19,26 @@ package org.apache.spark.sql.hive import java.io.File +import org.scalatest.BeforeAndAfterEach + import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils -class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - test("table name with schema") { - // regression test for SPARK-11778 - spark.sql("create schema usrdb") - spark.sql("create table usrdb.test(c int)") - spark.read.table("usrdb.test") - spark.sql("drop table usrdb.test") - spark.sql("drop schema usrdb") +class HiveTablePerfStatsSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + override def beforeEach(): Unit = { + super.beforeEach() + FileStatusCache.resetForTesting() } - test("SPARK-15887: hive-site.xml should be loaded") { - val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - assert(hiveClient.getConf("hive.in.test", "") == "true") + override def afterEach(): Unit = { + super.afterEach() + FileStatusCache.resetForTesting() } private def setupPartitionedTable(tableName: String, dir: File): Unit = { @@ -79,7 +80,9 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("lazy partition pruning reads only necessary partition data") { - withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") { + withSQLConf( + SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true", + SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) @@ -104,11 +107,103 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) - // read all should be cached + // read all should not be cached HiveCatalogMetrics.reset() spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + // cache should be disabled + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("lazy partition pruning with file status caching enabled") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 = 999").count() == 0) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 < 2").count() == 2) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 < 3").count() == 3) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5) + } + } + } + } + + test("file status caching respects refresh table and refreshByPath") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + spark.sql("refresh table test") + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + spark.catalog.cacheTable("test") + HiveCatalogMetrics.reset() + spark.catalog.refreshByPath(dir.getAbsolutePath) + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("file status cache respects size limit") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } } @@ -124,18 +219,18 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt // mode. This is kind of terrible, but is needed to preserve the legacy behavior // of doing plan cache validation based on the entire partition set. HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 = 999").count() + assert(spark.sql("select * from test where partCol1 = 999").count() == 0) // 5 from table resolution, another 5 from ListingFileCatalog assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 < 2").count() + assert(spark.sql("select * from test where partCol1 < 2").count() == 2) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) HiveCatalogMetrics.reset() - spark.sql("select * from test").count() + assert(spark.sql("select * from test").count() == 5) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) } |