aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
committerReynold Xin <rxin@databricks.com>2016-07-05 11:36:05 -0700
commit16a2a7d714f945b06978e3bd20a58ea32f0621ac (patch)
tree76fd896b952ea96a890fcb5500af6344886c46cd /sql/hive/src
parent07d9c5327f050f9da611d5239f61ed73b36ce4e6 (diff)
downloadspark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.gz
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.bz2
spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.zip
[SPARK-16311][SQL] Metadata refresh should work on temporary views
## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold Xin <rxin@databricks.com> Author: petermaxlee <petermaxlee@gmail.com> Closes #14009 from rxin/SPARK-16311.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala62
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala6
5 files changed, 74 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7dae473f47..20e64a4e09 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -147,10 +147,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
- invalidateTable(tableIdent)
- }
-
- def invalidateTable(tableIdent: TableIdentifier): Unit = {
cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 18b8dafe64..ebb6711f6a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog(
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
override def refreshTable(name: TableIdentifier): Unit = {
+ super.refreshTable(name)
metastoreCatalog.refreshTable(name)
}
- override def invalidateTable(name: TableIdentifier): Unit = {
- metastoreCatalog.invalidateTable(name)
- }
-
def invalidateCache(): Unit = {
metastoreCatalog.cachedDataSourceTables.invalidateAll()
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
new file mode 100644
index 0000000000..5714d06f0f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+
+ test("SPARK-16337 temporary view refresh") {
+ withTempTable("view_refresh") {
+ withTable("view_table") {
+ // Create a Parquet directory
+ spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+ .write.saveAsTable("view_table")
+
+ // Read the table in
+ spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
+ assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
+
+ // Delete a file using the Hadoop file system interface since the path returned by
+ // inputFiles is not recognizable by Java IO.
+ val p = new Path(spark.table("view_table").inputFiles.head)
+ assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false))
+
+ // Read it again and now we should see a FileNotFoundException
+ val e = intercept[SparkException] {
+ sql("select count(*) from view_refresh").first()
+ }
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("REFRESH"))
+
+ // Refresh and we should be able to read it again.
+ spark.catalog.refreshTable("view_refresh")
+ val newCount = sql("select count(*) from view_refresh").first().getLong(0)
+ assert(newCount > 0 && newCount < 100)
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b028d49aff..12d250d4fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
// Discard the cached relation.
- sessionState.invalidateTable("jsonTable")
+ sessionState.refreshTable("jsonTable")
checkAnswer(
sql("SELECT * FROM jsonTable"),
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
- sessionState.invalidateTable("jsonTable")
+ sessionState.refreshTable("jsonTable")
val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
assert(expectedSchema === table("jsonTable").schema)
@@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
""".stripMargin)
// Discard the cached relation.
- sessionState.invalidateTable("ctasJsonTable")
+ sessionState.refreshTable("ctasJsonTable")
// Schema should not be changed.
assert(table("ctasJsonTable").schema === table("jsonTable").schema)
@@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
(6 to 10).map(i => Row(i, s"str$i")))
- sessionState.invalidateTable("savedJsonTable")
+ sessionState.refreshTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
options = Map("path" -> tempDir.getCanonicalPath),
isExternal = false)
- sessionState.invalidateTable("wide_schema")
+ sessionState.refreshTable("wide_schema")
val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
@@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val actualSchema = table(tableName).schema
assert(schema === actualSchema)
@@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
@@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.bucketBy(8, "d", "b")
.sortBy("c")
.saveAsTable(tableName)
- sessionState.invalidateTable(tableName)
+ sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 4b32b135e7..96beb2d342 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -487,7 +487,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
- sessionState.invalidateTable("test_insert_parquet")
+ sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
@@ -500,7 +500,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
- sessionState.invalidateTable("test_insert_parquet")
+ sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
// Create a partitioned table.
@@ -550,7 +550,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())
- sessionState.invalidateTable("test_parquet_partitioned_cache_test")
+ sessionState.refreshTable("test_parquet_partitioned_cache_test")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")