diff options
author | Reynold Xin <rxin@databricks.com> | 2016-07-05 11:36:05 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-05 11:36:05 -0700 |
commit | 16a2a7d714f945b06978e3bd20a58ea32f0621ac (patch) | |
tree | 76fd896b952ea96a890fcb5500af6344886c46cd /sql/hive/src/test/scala | |
parent | 07d9c5327f050f9da611d5239f61ed73b36ce4e6 (diff) | |
download | spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.gz spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.tar.bz2 spark-16a2a7d714f945b06978e3bd20a58ea32f0621ac.zip |
[SPARK-16311][SQL] Metadata refresh should work on temporary views
## What changes were proposed in this pull request?
This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage.
Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution).
## How was this patch tested?
Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation.
Author: Reynold Xin <rxin@databricks.com>
Author: petermaxlee <petermaxlee@gmail.com>
Closes #14009 from rxin/SPARK-16311.
Diffstat (limited to 'sql/hive/src/test/scala')
3 files changed, 73 insertions, 11 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala new file mode 100644 index 0000000000..5714d06f0f --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test suite to handle metadata cache related. + */ +class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + test("SPARK-16337 temporary view refresh") { + withTempTable("view_refresh") { + withTable("view_table") { + // Create a Parquet directory + spark.range(start = 0, end = 100, step = 1, numPartitions = 3) + .write.saveAsTable("view_table") + + // Read the table in + spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh") + assert(sql("select count(*) from view_refresh").first().getLong(0) == 100) + + // Delete a file using the Hadoop file system interface since the path returned by + // inputFiles is not recognizable by Java IO. + val p = new Path(spark.table("view_table").inputFiles.head) + assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false)) + + // Read it again and now we should see a FileNotFoundException + val e = intercept[SparkException] { + sql("select count(*) from view_refresh").first() + } + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("REFRESH")) + + // Refresh and we should be able to read it again. + spark.catalog.refreshTable("view_refresh") + val newCount = sql("select count(*) from view_refresh").first().getLong(0) + assert(newCount > 0 && newCount < 100) + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b028d49aff..12d250d4fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) // Discard the cached relation. - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) assert(expectedSchema === table("jsonTable").schema) @@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv """.stripMargin) // Discard the cached relation. - sessionState.invalidateTable("ctasJsonTable") + sessionState.refreshTable("ctasJsonTable") // Schema should not be changed. assert(table("ctasJsonTable").schema === table("jsonTable").schema) @@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), (6 to 10).map(i => Row(i, s"str$i"))) - sessionState.invalidateTable("savedJsonTable") + sessionState.refreshTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), @@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempDir.getCanonicalPath), isExternal = false) - sessionState.invalidateTable("wide_schema") + sessionState.refreshTable("wide_schema") val actualSchema = table("wide_schema").schema assert(schema === actualSchema) @@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val actualSchema = table(tableName).schema assert(schema === actualSchema) @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .bucketBy(8, "d", "b") .sortBy("c") .saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 4b32b135e7..96beb2d342 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -487,7 +487,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ @@ -500,7 +500,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. @@ -550,7 +550,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { |select b, '2015-04-02', a FROM jt """.stripMargin).collect()) - sessionState.invalidateTable("test_parquet_partitioned_cache_test") + sessionState.refreshTable("test_parquet_partitioned_cache_test") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") |