aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2016-07-05 09:49:25 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-05 09:49:25 +0800
commit8f6cf00c697689174bcf522091e396b4fa5ef66d (patch)
tree551c9fc203ba2d742c3cfdf71abf35d87b553b4a
parent7dbffcdd6dc76b8e8d6a9cd6eeb24323a6b740c3 (diff)
downloadspark-8f6cf00c697689174bcf522091e396b4fa5ef66d.tar.gz
spark-8f6cf00c697689174bcf522091e396b4fa5ef66d.tar.bz2
spark-8f6cf00c697689174bcf522091e396b4fa5ef66d.zip
[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached
(Please note this is a revision of PR #13686, which has been closed in favor of this PR.) This PR addresses [SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968). ## What changes were proposed in this pull request? The `getCached` method of [HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala) computes `pathsInMetastore` from the metastore relation's catalog table. This only returns the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a result, cached lookups on nonempty partitioned tables always miss. Rather than get `pathsInMetastore` from metastoreRelation.catalogTable.storage.locationUri.toSeq I modified the `getCached` method to take a `pathsInMetastore` argument. Calls to this method pass in the paths computed from calls to the Hive metastore. This is how `getCached` was implemented in Spark 1.5: https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444. I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from the SQL session catalog. ## How was this patch tested? I've added a new unit test to `parquetSuites.scala`: SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation Note that the only difference between this new test and the one above it in the file is that the new test populates its partitioned table with a single value, while the existing test leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before this patch. Note I also modified a different but related unit test in `parquetSuites.scala`: SPARK-15248: explicitly added partitions should be readable This unit test asserts that Spark SQL should return data from a table partition which has been placed there outside a metastore query immediately after it is added. I changed the test so that, instead of adding the data as a parquet file saved in the partition's location, the data is added through a SQL `INSERT` query. I made this change because I could find no way to efficiently support partitioned table caching without failing that test. In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable weakening of that test. First, it still validates a fix for [SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for which it was written. Second, the assertion made is stronger than that required for non-partitioned tables. If you write data to the storage location of a non-partitioned metastore table without using a proper SQL DML query, a subsequent call to show that data will not return it. I believe this is an intentional limitation put in place to make table caching feasible, but I'm only speculating. Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases. Thanks. Author: Michael Allman <michael@videoamp.com> Closes #13818 from mallman/spark-15968.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala61
3 files changed, 59 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 151e4563a7..7dae473f47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def getCached(
tableIdentifier: QualifiedTableName,
+ pathsInMetastore: Seq[String],
metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
@@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
- val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass
expectedFileFormat match {
@@ -265,9 +265,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
PartitionDirectory(values, location)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ val partitionPaths = partitions.map(_.path.toString)
+
+ // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a
+ // partitioned table's paths depends on whether that table has any actual partitions.
+ // Partitioned tables without partitions use the location of the table's base path.
+ // Partitioned tables with partitions use the locations of those partitions' data locations,
+ // _omitting_ the table's base path.
+ val paths = if (partitionPaths.isEmpty) {
+ Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+ } else {
+ partitionPaths
+ }
val cached = getCached(
tableIdentifier,
+ paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
@@ -312,6 +325,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
val cached = getCached(tableIdentifier,
+ paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 97cd29f541..529d388716 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -298,6 +298,7 @@ case class InsertIntoHiveTable(
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
+ sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index fe7253d735..4b32b135e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
withTable("nonPartitioned") {
sql(
- s"""CREATE TABLE nonPartitioned (
- | key INT,
- | value STRING
- |)
- |STORED AS PARQUET
- """.stripMargin)
+ """
+ |CREATE TABLE nonPartitioned (
+ | key INT,
+ | value STRING
+ |)
+ |STORED AS PARQUET
+ """.stripMargin)
// First lookup fills the cache
- val r1 = collectHadoopFsRelation (table("nonPartitioned"))
+ val r1 = collectHadoopFsRelation(table("nonPartitioned"))
// Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation (table("nonPartitioned"))
+ val r2 = collectHadoopFsRelation(table("nonPartitioned"))
// They should be the same instance
assert(r1 eq r2)
}
@@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
withTable("partitioned") {
sql(
- s"""CREATE TABLE partitioned (
- | key INT,
- | value STRING
- |)
- |PARTITIONED BY (part INT)
- |STORED AS PARQUET
- """.stripMargin)
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectHadoopFsRelation(table("partitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectHadoopFsRelation(table("partitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+ }
+ }
+
+ test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
+ "relation") {
+ withTable("partitioned") {
+ sql(
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+ sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")
// First lookup fills the cache
- val r1 = collectHadoopFsRelation (table("partitioned"))
+ val r1 = collectHadoopFsRelation(table("partitioned"))
// Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation (table("partitioned"))
+ val r2 = collectHadoopFsRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)
}
@@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
// Add data files to partition directory and check whether they can be read
- Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+ sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))