diff options
3 files changed, 59 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 151e4563a7..7dae473f47 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], @@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { @@ -265,9 +265,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log PartitionDirectory(values, location) } val partitionSpec = PartitionSpec(partitionSchema, partitions) + val partitionPaths = partitions.map(_.path.toString) + + // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a + // partitioned table's paths depends on whether that table has any actual partitions. + // Partitioned tables without partitions use the location of the table's base path. + // Partitioned tables with partitions use the locations of those partitions' data locations, + // _omitting_ the table's base path. + val paths = if (partitionPaths.isEmpty) { + Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + } else { + partitionPaths + } val cached = getCached( tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, @@ -312,6 +325,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) val cached = getCached(tableIdentifier, + paths, metastoreRelation, metastoreSchema, fileFormatClass, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 97cd29f541..529d388716 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -298,6 +298,7 @@ case class InsertIntoHiveTable( // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) + sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index fe7253d735..4b32b135e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") { withTable("nonPartitioned") { sql( - s"""CREATE TABLE nonPartitioned ( - | key INT, - | value STRING - |) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE nonPartitioned ( + | key INT, + | value STRING + |) + |STORED AS PARQUET + """.stripMargin) // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("nonPartitioned")) + val r1 = collectHadoopFsRelation(table("nonPartitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("nonPartitioned")) + val r2 = collectHadoopFsRelation(table("nonPartitioned")) // They should be the same instance assert(r1 eq r2) } @@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") { withTable("partitioned") { sql( - s"""CREATE TABLE partitioned ( - | key INT, - | value STRING - |) - |PARTITIONED BY (part INT) - |STORED AS PARQUET - """.stripMargin) + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectHadoopFsRelation(table("partitioned")) + // Second lookup should reuse the cache + val r2 = collectHadoopFsRelation(table("partitioned")) + // They should be the same instance + assert(r1 eq r2) + } + } + + test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " + + "relation") { + withTable("partitioned") { + sql( + """ + |CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value") // First lookup fills the cache - val r1 = collectHadoopFsRelation (table("partitioned")) + val r1 = collectHadoopFsRelation(table("partitioned")) // Second lookup should reuse the cache - val r2 = collectHadoopFsRelation (table("partitioned")) + val r2 = collectHadoopFsRelation(table("partitioned")) // They should be the same instance assert(r1 eq r2) } @@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) // Add data files to partition directory and check whether they can be read - Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir) + sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) |