aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala)127
3 files changed, 127 insertions, 18 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c909eb5d20..44089335e1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
- catalog.allPartitions
+ catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 81337493c7..d13e29b302 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
- }
+ }
+
+ test("table name with schema") {
+ // regression test for SPARK-11778
+ spark.sql("create schema usrdb")
+ spark.sql("create table usrdb.test(c int)")
+ spark.read.table("usrdb.test")
+ spark.sql("drop table usrdb.test")
+ spark.sql("drop schema usrdb")
+ }
+
+ test("SPARK-15887: hive-site.xml should be loaded") {
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ assert(hiveClient.getConf("hive.in.test", "") == "true")
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
index 15523437a3..82ee813c6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
@@ -19,25 +19,26 @@ package org.apache.spark.sql.hive
import java.io.File
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
- test("table name with schema") {
- // regression test for SPARK-11778
- spark.sql("create schema usrdb")
- spark.sql("create table usrdb.test(c int)")
- spark.read.table("usrdb.test")
- spark.sql("drop table usrdb.test")
- spark.sql("drop schema usrdb")
+class HiveTablePerfStatsSuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ FileStatusCache.resetForTesting()
}
- test("SPARK-15887: hive-site.xml should be loaded") {
- val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- assert(hiveClient.getConf("hive.in.test", "") == "true")
+ override def afterEach(): Unit = {
+ super.afterEach()
+ FileStatusCache.resetForTesting()
}
private def setupPartitionedTable(tableName: String, dir: File): Unit = {
@@ -79,7 +80,9 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("lazy partition pruning reads only necessary partition data") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
+ withSQLConf(
+ SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
@@ -104,11 +107,103 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- // read all should be cached
+ // read all should not be cached
HiveCatalogMetrics.reset()
spark.sql("select * from test").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ // cache should be disabled
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("lazy partition pruning with file status caching enabled") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 3").count() == 3)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+ }
+ }
+ }
+ }
+
+ test("file status caching respects refresh table and refreshByPath") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ spark.catalog.cacheTable("test")
+ HiveCatalogMetrics.reset()
+ spark.catalog.refreshByPath(dir.getAbsolutePath)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("file status cache respects size limit") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}
@@ -124,18 +219,18 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
// mode. This is kind of terrible, but is needed to preserve the legacy behavior
// of doing plan cache validation based on the entire partition set.
HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 = 999").count()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
// 5 from table resolution, another 5 from ListingFileCatalog
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 2").count()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
+ assert(spark.sql("select * from test").count() == 5)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
}