aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
committerReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
commit16a2a7d714f945b06978e3bd20a58ea32f0621ac (patch)
tree76fd896b952ea96a890fcb5500af6344886c46cd /sql/core
parent07d9c5327f050f9da611d5239f61ed73b36ce4e6 (diff)
downloadspark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.gz
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.bz2
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.zip
[SPARK-16311][SQL] Metadata refresh should work on temporary views
## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold Xin <rxin@databricks.com> Author: petermaxlee <petermaxlee@gmail.com> Closes #14009 from rxin/SPARK-16311.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala8
5 files changed, 14 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fc00912bf9..226f61ef40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -206,7 +206,7 @@ case class DropTableCommand(
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
- catalog.invalidateTable(tableName)
+ catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists)
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 687d69aa5f..5c815df0de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -172,7 +172,7 @@ case class AlterTableRenameCommand(
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
- catalog.invalidateTable(oldName)
+ catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newName.unquotedString)
@@ -373,7 +373,7 @@ case class TruncateTableCommand(
}
// After deleting the data, invalidate the table to make sure we don't keep around a stale
// file relation in the metastore cache.
- spark.sessionState.invalidateTable(tableName.unquotedString)
+ spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 39c8606fd1..90711f2b1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -85,5 +85,10 @@ case class LogicalRelation(
expectedOutputAttributes,
metastoreTableIdentifier).asInstanceOf[this.type]
+ override def refresh(): Unit = relation match {
+ case fs: HadoopFsRelation => fs.refresh()
+ case _ => // Do nothing.
+ }
+
override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 5f5cf5c6d3..01cc13f9df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
- def invalidateTable(tableName: String): Unit = {
- catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
+ def refreshTable(tableName: String): Unit = {
+ catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}
def addJar(path: String): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index d872f4baa6..3f8cc8164d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}
- ignore("SPARK-16337 temporary view refresh") {
- withTempPath { (location: File) =>
+ test("SPARK-16337 temporary view refresh") {
+ withTempTable("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
@@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
sql("select count(*) from view_refresh").first()
}
assert(e.getMessage.contains("FileNotFoundException"))
- assert(e.getMessage.contains("refresh()"))
+ assert(e.getMessage.contains("REFRESH"))
// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
val newCount = sql("select count(*) from view_refresh").first().getLong(0)
assert(newCount > 0 && newCount < 100)
- }
+ }}
}
}