From 16a2a7d714f945b06978e3bd20a58ea32f0621ac Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 5 Jul 2016 11:36:05 -0700 Subject: [SPARK-16311][SQL] Metadata refresh should work on temporary views ## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold Xin Author: petermaxlee Closes #14009 from rxin/SPARK-16311. --- .../main/scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/tables.scala | 4 ++-- .../apache/spark/sql/execution/datasources/LogicalRelation.scala | 5 +++++ .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala | 8 ++++---- 5 files changed, 14 insertions(+), 9 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc00912bf9..226f61ef40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -206,7 +206,7 @@ case class DropTableCommand( } catch { case NonFatal(e) => log.warn(e.toString, e) } - catalog.invalidateTable(tableName) + catalog.refreshTable(tableName) catalog.dropTable(tableName, ifExists) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 687d69aa5f..5c815df0de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -172,7 +172,7 @@ case class AlterTableRenameCommand( } // Invalidate the table last, otherwise uncaching the table would load the logical plan // back into the hive metastore cache - catalog.invalidateTable(oldName) + catalog.refreshTable(oldName) catalog.renameTable(oldName, newName) if (wasCached) { sparkSession.catalog.cacheTable(newName.unquotedString) @@ -373,7 +373,7 @@ case class TruncateTableCommand( } // After deleting the data, invalidate the table to make sure we don't keep around a stale // file relation in the metastore cache. - spark.sessionState.invalidateTable(tableName.unquotedString) + spark.sessionState.refreshTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 39c8606fd1..90711f2b1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -85,5 +85,10 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] + override def refresh(): Unit = relation match { + case fs: HadoopFsRelation => fs.refresh() + case _ => // Do nothing. + } + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 5f5cf5c6d3..01cc13f9df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) - def invalidateTable(tableName: String): Unit = { - catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) + def refreshTable(tableName: String): Unit = { + catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } def addJar(path: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala index d872f4baa6..3f8cc8164d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { } } - ignore("SPARK-16337 temporary view refresh") { - withTempPath { (location: File) => + test("SPARK-16337 temporary view refresh") { + withTempTable("view_refresh") { withTempPath { (location: File) => // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) .write.parquet(location.getAbsolutePath) @@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { sql("select count(*) from view_refresh").first() } assert(e.getMessage.contains("FileNotFoundException")) - assert(e.getMessage.contains("refresh()")) + assert(e.getMessage.contains("REFRESH")) // Refresh and we should be able to read it again. spark.catalog.refreshTable("view_refresh") val newCount = sql("select count(*) from view_refresh").first().getLong(0) assert(newCount > 0 && newCount < 100) - } + }} } } -- cgit v1.2.3