diff options
12 files changed, 101 insertions, 36 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8c620d36e5..e1d49912c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -462,17 +462,17 @@ class SessionCatalog( } } - // TODO: It's strange that we have both refresh and invalidate here. - /** * Refresh the cache entry for a metastore table, if any. */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Invalidate the cache entry for a metastore table, if any. - */ - def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ } + def refreshTable(name: TableIdentifier): Unit = { + // Go through temporary tables and invalidate them. + // If the database is defined, this is definitely not a temp table. + // If the database is not defined, there is a good chance this is a temp table. + if (name.database.isEmpty) { + tempTables.get(name.table).foreach(_.refresh()) + } + } /** * Drop all existing temporary tables. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4984f235b4..d0b2b5d7b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { s"Reference '$name' is ambiguous, could be: $referenceNames.") } } + + /** + * Refreshes (or invalidates) any metadata/data cached in the plan recursively. + */ + def refresh(): Unit = children.foreach(_.refresh()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc00912bf9..226f61ef40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -206,7 +206,7 @@ case class DropTableCommand( } catch { case NonFatal(e) => log.warn(e.toString, e) } - catalog.invalidateTable(tableName) + catalog.refreshTable(tableName) catalog.dropTable(tableName, ifExists) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 687d69aa5f..5c815df0de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -172,7 +172,7 @@ case class AlterTableRenameCommand( } // Invalidate the table last, otherwise uncaching the table would load the logical plan // back into the hive metastore cache - catalog.invalidateTable(oldName) + catalog.refreshTable(oldName) catalog.renameTable(oldName, newName) if (wasCached) { sparkSession.catalog.cacheTable(newName.unquotedString) @@ -373,7 +373,7 @@ case class TruncateTableCommand( } // After deleting the data, invalidate the table to make sure we don't keep around a stale // file relation in the metastore cache. - spark.sessionState.invalidateTable(tableName.unquotedString) + spark.sessionState.refreshTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 39c8606fd1..90711f2b1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -85,5 +85,10 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] + override def refresh(): Unit = relation match { + case fs: HadoopFsRelation => fs.refresh() + case _ => // Do nothing. + } + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 5f5cf5c6d3..01cc13f9df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) - def invalidateTable(tableName: String): Unit = { - catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) + def refreshTable(tableName: String): Unit = { + catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } def addJar(path: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala index d872f4baa6..3f8cc8164d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { } } - ignore("SPARK-16337 temporary view refresh") { - withTempPath { (location: File) => + test("SPARK-16337 temporary view refresh") { + withTempTable("view_refresh") { withTempPath { (location: File) => // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) .write.parquet(location.getAbsolutePath) @@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { sql("select count(*) from view_refresh").first() } assert(e.getMessage.contains("FileNotFoundException")) - assert(e.getMessage.contains("refresh()")) + assert(e.getMessage.contains("REFRESH")) // Refresh and we should be able to read it again. spark.catalog.refreshTable("view_refresh") val newCount = sql("select count(*) from view_refresh").first().getLong(0) assert(newCount > 0 && newCount < 100) - } + }} } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7dae473f47..20e64a4e09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -147,10 +147,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // it is better at here to invalidate the cache to avoid confusing waring logs from the // cache loader (e.g. cannot find data source provider, which is only defined for // data source table.). - invalidateTable(tableIdent) - } - - def invalidateTable(tableIdent: TableIdentifier): Unit = { cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 18b8dafe64..ebb6711f6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog( val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables override def refreshTable(name: TableIdentifier): Unit = { + super.refreshTable(name) metastoreCatalog.refreshTable(name) } - override def invalidateTable(name: TableIdentifier): Unit = { - metastoreCatalog.invalidateTable(name) - } - def invalidateCache(): Unit = { metastoreCatalog.cachedDataSourceTables.invalidateAll() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala new file mode 100644 index 0000000000..5714d06f0f --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test suite to handle metadata cache related. + */ +class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + test("SPARK-16337 temporary view refresh") { + withTempTable("view_refresh") { + withTable("view_table") { + // Create a Parquet directory + spark.range(start = 0, end = 100, step = 1, numPartitions = 3) + .write.saveAsTable("view_table") + + // Read the table in + spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh") + assert(sql("select count(*) from view_refresh").first().getLong(0) == 100) + + // Delete a file using the Hadoop file system interface since the path returned by + // inputFiles is not recognizable by Java IO. + val p = new Path(spark.table("view_table").inputFiles.head) + assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false)) + + // Read it again and now we should see a FileNotFoundException + val e = intercept[SparkException] { + sql("select count(*) from view_refresh").first() + } + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("REFRESH")) + + // Refresh and we should be able to read it again. + spark.catalog.refreshTable("view_refresh") + val newCount = sql("select count(*) from view_refresh").first().getLong(0) + assert(newCount > 0 && newCount < 100) + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b028d49aff..12d250d4fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) // Discard the cached relation. - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) assert(expectedSchema === table("jsonTable").schema) @@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv """.stripMargin) // Discard the cached relation. - sessionState.invalidateTable("ctasJsonTable") + sessionState.refreshTable("ctasJsonTable") // Schema should not be changed. assert(table("ctasJsonTable").schema === table("jsonTable").schema) @@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), (6 to 10).map(i => Row(i, s"str$i"))) - sessionState.invalidateTable("savedJsonTable") + sessionState.refreshTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), @@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempDir.getCanonicalPath), isExternal = false) - sessionState.invalidateTable("wide_schema") + sessionState.refreshTable("wide_schema") val actualSchema = table("wide_schema").schema assert(schema === actualSchema) @@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val actualSchema = table(tableName).schema assert(schema === actualSchema) @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .bucketBy(8, "d", "b") .sortBy("c") .saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 4b32b135e7..96beb2d342 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -487,7 +487,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ @@ -500,7 +500,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. @@ -550,7 +550,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { |select b, '2015-04-02', a FROM jt """.stripMargin).collect()) - sessionState.invalidateTable("test_parquet_partitioned_cache_test") + sessionState.refreshTable("test_parquet_partitioned_cache_test") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") |