aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-10-22 22:08:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-22 22:08:28 +0800
commit3eca283aca68ac81c127d60ad5699f854d5f14b7 (patch)
tree1846e569ede3f7774b9fca2d21c5b85dec2b885d /sql/hive
parentab3363e9f6b1f7fc26682509fe7382c570f91778 (diff)
downloadspark-3eca283aca68ac81c127d60ad5699f854d5f14b7.tar.gz
spark-3eca283aca68ac81c127d60ad5699f854d5f14b7.tar.bz2
spark-3eca283aca68ac81c127d60ad5699f854d5f14b7.zip
[SPARK-17994][SQL] Add back a file status cache for catalog tables
## What changes were proposed in this pull request? In SPARK-16980, we removed the full in-memory cache of table partitions in favor of loading only needed partitions from the metastore. This greatly improves the initial latency of queries that only read a small fraction of table partitions. However, since the metastore does not store file statistics, we need to discover those from remote storage. With the loss of the in-memory file status cache this has to happen on each query, increasing the latency of repeated queries over the same partitions. The proposal is to add back a per-table cache of partition contents, i.e. Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can be invalidated through refreshTable() and refreshByPath(). Unlike the prior cache, it can be incrementally updated as new partitions are read. ## How was this patch tested? Existing tests and new tests in `HiveTablePerfStatsSuite`. cc mallman Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Closes #15539 from ericl/meta-cache.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala)127
3 files changed, 127 insertions, 18 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c909eb5d20..44089335e1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
- catalog.allPartitions
+ catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 81337493c7..d13e29b302 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
- }
+ }
+
+ test("table name with schema") {
+ // regression test for SPARK-11778
+ spark.sql("create schema usrdb")
+ spark.sql("create table usrdb.test(c int)")
+ spark.read.table("usrdb.test")
+ spark.sql("drop table usrdb.test")
+ spark.sql("drop schema usrdb")
+ }
+
+ test("SPARK-15887: hive-site.xml should be loaded") {
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ assert(hiveClient.getConf("hive.in.test", "") == "true")
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
index 15523437a3..82ee813c6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
@@ -19,25 +19,26 @@ package org.apache.spark.sql.hive
import java.io.File
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
- test("table name with schema") {
- // regression test for SPARK-11778
- spark.sql("create schema usrdb")
- spark.sql("create table usrdb.test(c int)")
- spark.read.table("usrdb.test")
- spark.sql("drop table usrdb.test")
- spark.sql("drop schema usrdb")
+class HiveTablePerfStatsSuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ FileStatusCache.resetForTesting()
}
- test("SPARK-15887: hive-site.xml should be loaded") {
- val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- assert(hiveClient.getConf("hive.in.test", "") == "true")
+ override def afterEach(): Unit = {
+ super.afterEach()
+ FileStatusCache.resetForTesting()
}
private def setupPartitionedTable(tableName: String, dir: File): Unit = {
@@ -79,7 +80,9 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
}
test("lazy partition pruning reads only necessary partition data") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
+ withSQLConf(
+ SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
withTable("test") {
withTempDir { dir =>
setupPartitionedTable("test", dir)
@@ -104,11 +107,103 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- // read all should be cached
+ // read all should not be cached
HiveCatalogMetrics.reset()
spark.sql("select * from test").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ // cache should be disabled
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("lazy partition pruning with file status caching enabled") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 3").count() == 3)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+ }
+ }
+ }
+ }
+
+ test("file status caching respects refresh table and refreshByPath") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ spark.catalog.cacheTable("test")
+ HiveCatalogMetrics.reset()
+ spark.catalog.refreshByPath(dir.getAbsolutePath)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("file status cache respects size limit") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}
@@ -124,18 +219,18 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
// mode. This is kind of terrible, but is needed to preserve the legacy behavior
// of doing plan cache validation based on the entire partition set.
HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 = 999").count()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
// 5 from table resolution, another 5 from ListingFileCatalog
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 2").count()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
+ assert(spark.sql("select * from test").count() == 5)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
}