aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala61
3 files changed, 59 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 151e4563a7..7dae473f47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def getCached(
tableIdentifier: QualifiedTableName,
+ pathsInMetastore: Seq[String],
metastoreRelation: MetastoreRelation,
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
@@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
- val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass
expectedFileFormat match {
@@ -265,9 +265,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
PartitionDirectory(values, location)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ val partitionPaths = partitions.map(_.path.toString)
+
+ // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a
+ // partitioned table's paths depends on whether that table has any actual partitions.
+ // Partitioned tables without partitions use the location of the table's base path.
+ // Partitioned tables with partitions use the locations of those partitions' data locations,
+ // _omitting_ the table's base path.
+ val paths = if (partitionPaths.isEmpty) {
+ Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+ } else {
+ partitionPaths
+ }
val cached = getCached(
tableIdentifier,
+ paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
@@ -312,6 +325,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
val cached = getCached(tableIdentifier,
+ paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 97cd29f541..529d388716 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -298,6 +298,7 @@ case class InsertIntoHiveTable(
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
+ sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index fe7253d735..4b32b135e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
withTable("nonPartitioned") {
sql(
- s"""CREATE TABLE nonPartitioned (
- | key INT,
- | value STRING
- |)
- |STORED AS PARQUET
- """.stripMargin)
+ """
+ |CREATE TABLE nonPartitioned (
+ | key INT,
+ | value STRING
+ |)
+ |STORED AS PARQUET
+ """.stripMargin)
// First lookup fills the cache
- val r1 = collectHadoopFsRelation (table("nonPartitioned"))
+ val r1 = collectHadoopFsRelation(table("nonPartitioned"))
// Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation (table("nonPartitioned"))
+ val r2 = collectHadoopFsRelation(table("nonPartitioned"))
// They should be the same instance
assert(r1 eq r2)
}
@@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
withTable("partitioned") {
sql(
- s"""CREATE TABLE partitioned (
- | key INT,
- | value STRING
- |)
- |PARTITIONED BY (part INT)
- |STORED AS PARQUET
- """.stripMargin)
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectHadoopFsRelation(table("partitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectHadoopFsRelation(table("partitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+ }
+ }
+
+ test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
+ "relation") {
+ withTable("partitioned") {
+ sql(
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+ sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")
// First lookup fills the cache
- val r1 = collectHadoopFsRelation (table("partitioned"))
+ val r1 = collectHadoopFsRelation(table("partitioned"))
// Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation (table("partitioned"))
+ val r2 = collectHadoopFsRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)
}
@@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
// Add data files to partition directory and check whether they can be read
- Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+ sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))