aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
committerReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
commit16a2a7d714f945b06978e3bd20a58ea32f0621ac (patch)
tree76fd896b952ea96a890fcb5500af6344886c46cd
parent07d9c5327f050f9da611d5239f61ed73b36ce4e6 (diff)
downloadspark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.gz
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.bz2
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.zip
[SPARK-16311][SQL] Metadata refresh should work on temporary views
## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold Xin <rxin@databricks.com> Author: petermaxlee <petermaxlee@gmail.com> Closes #14009 from rxin/SPARK-16311.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala62
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala6
12 files changed, 101 insertions, 36 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c620d36e5..e1d49912c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -462,17 +462,17 @@ class SessionCatalog(
}
}
- // TODO: It's strange that we have both refresh and invalidate here.
-
/**
* Refresh the cache entry for a metastore table, if any.
*/
- def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
-
- /**
- * Invalidate the cache entry for a metastore table, if any.
- */
- def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
+ def refreshTable(name: TableIdentifier): Unit = {
+ // Go through temporary tables and invalidate them.
+ // If the database is defined, this is definitely not a temp table.
+ // If the database is not defined, there is a good chance this is a temp table.
+ if (name.database.isEmpty) {
+ tempTables.get(name.table).foreach(_.refresh())
+ }
+ }
/**
* Drop all existing temporary tables.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4984f235b4..d0b2b5d7b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
s"Reference '$name' is ambiguous, could be: $referenceNames.")
}
}
+
+ /**
+ * Refreshes (or invalidates) any metadata/data cached in the plan recursively.
+ */
+ def refresh(): Unit = children.foreach(_.refresh())
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fc00912bf9..226f61ef40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -206,7 +206,7 @@ case class DropTableCommand(
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
- catalog.invalidateTable(tableName)
+ catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists)
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 687d69aa5f..5c815df0de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -172,7 +172,7 @@ case class AlterTableRenameCommand(
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
- catalog.invalidateTable(oldName)
+ catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newName.unquotedString)
@@ -373,7 +373,7 @@ case class TruncateTableCommand(
}
// After deleting the data, invalidate the table to make sure we don't keep around a stale
// file relation in the metastore cache.
- spark.sessionState.invalidateTable(tableName.unquotedString)
+ spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 39c8606fd1..90711f2b1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -85,5 +85,10 @@ case class LogicalRelation(
expectedOutputAttributes,
metastoreTableIdentifier).asInstanceOf[this.type]
+ override def refresh(): Unit = relation match {
+ case fs: HadoopFsRelation => fs.refresh()
+ case _ => // Do nothing.
+ }
+
override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 5f5cf5c6d3..01cc13f9df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
- def invalidateTable(tableName: String): Unit = {
- catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
+ def refreshTable(tableName: String): Unit = {
+ catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}
def addJar(path: String): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index d872f4baa6..3f8cc8164d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}
- ignore("SPARK-16337 temporary view refresh") {
- withTempPath { (location: File) =>
+ test("SPARK-16337 temporary view refresh") {
+ withTempTable("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
@@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
sql("select count(*) from view_refresh").first()
}
assert(e.getMessage.contains("FileNotFoundException"))
- assert(e.getMessage.contains("refresh()"))
+ assert(e.getMessage.contains("REFRESH"))
// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
val newCount = sql("select count(*) from view_refresh").first().getLong(0)
assert(newCount > 0 && newCount < 100)
- }
+ }}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7dae473f47..20e64a4e09 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -147,10 +147,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
- invalidateTable(tableIdent)
- }
-
- def invalidateTable(tableIdent: TableIdentifier): Unit = {
cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 18b8dafe64..ebb6711f6a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog(
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
override def refreshTable(name: TableIdentifier): Unit = {
+ super.refreshTable(name)
metastoreCatalog.refreshTable(name)
}
- override def invalidateTable(name: TableIdentifier): Unit = {
- metastoreCatalog.invalidateTable(name)
- }
-
def invalidateCache(): Unit = {
metastoreCatalog.cachedDataSourceTables.invalidateAll()
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
new file mode 100644
index 0000000000..5714d06f0f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+
+ test("SPARK-16337 temporary view refresh") {
+ withTempTable("view_refresh") {
+ withTable("view_table") {
+ // Create a Parquet directory
+ spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+ .write.saveAsTable("view_table")
+
+ // Read the table in
+ spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
+ assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
+
+ // Delete a file using the Hadoop file system interface since the path returned by
+ // inputFiles is not recognizable by Java IO.
+ val p = new Path(spark.table("view_table").inputFiles.head)
+ assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false))
+
+ // Read it again and now we should see a FileNotFoundException
+ val e = intercept[SparkException] {
+ sql("select count(*) from view_refresh").first()
+ }
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("REFRESH"))
+
+ // Refresh and we should be able to read it again.
+ spark.catalog.refreshTable("view_refresh")
+ val newCount = sql("select count(*) from view_refresh").first().getLong(0)
+ assert(newCount > 0 && newCount < 100)
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b028d49aff..12d250d4fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
// Discard the cached relation.
- sessionState.invalidateTable("jsonTable")
+ sessionState.refreshTable("jsonTable")
checkAnswer(
sql("SELECT * FROM jsonTable"),
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
- sessionState.invalidateTable("jsonTable")
+ sessionState.refreshTable("jsonTable")
val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
assert(expectedSchema === table("jsonTable").schema)
@@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
""".stripMargin)
// Discard the cached relation.
- sessionState.invalidateTable("ctasJsonTable")
+ sessionState.refreshTable("ctasJsonTable")
// Schema should not be changed.
assert(table("ctasJsonTable").schema === table("jsonTable").schema)
@@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
(6 to 10).map(i => Row(i, s"str$i")))
- sessionState.invalidateTable("savedJsonTable")
+ sessionState.refreshTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
options = Map("path" -> tempDir.getCanonicalPath),
isExternal = false)
- sessionState.invalidateTable("wide_schema")
+ sessionState.refreshTable("wide_schema")
val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
@@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val actualSchema = table(tableName).schema
assert(schema === actualSchema)
@@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
@@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.bucketBy(8, "d", "b")
.sortBy("c")
.saveAsTable(tableName)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 4b32b135e7..96beb2d342 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -487,7 +487,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
- sessionState.invalidateTable("test_insert_parquet")
+ sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
@@ -500,7 +500,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
- sessionState.invalidateTable("test_insert_parquet")
+ sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
// Create a partitioned table.
@@ -550,7 +550,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())
- sessionState.invalidateTable("test_parquet_partitioned_cache_test")
+ sessionState.refreshTable("test_parquet_partitioned_cache_test")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")