diff options
author | gatorsmile <gatorsmile@gmail.com> | 2017-01-15 20:40:44 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-15 20:40:44 +0800 |
commit | de62ddf7ff42bdc383da127e6b1155897565354c (patch) | |
tree | b76a3aecec906c88603baf55fbfb1c858f719835 /sql/hive/src/test/scala | |
parent | a5e651f4c6f243b59724fd46237407374017a035 (diff) | |
download | spark-de62ddf7ff42bdc383da127e6b1155897565354c.tar.gz spark-de62ddf7ff42bdc383da127e6b1155897565354c.tar.bz2 spark-de62ddf7ff42bdc383da127e6b1155897565354c.zip |
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables
### What changes were proposed in this pull request?
```Scala
sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
// This table fetch is to fill the cache with zero leaf files
spark.table("tab").show()
sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE tab
""".stripMargin)
spark.table("tab").show()
```
In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.
This PR is to refresh the metadata cache after processing the `LOAD DATA` command.
In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.
### How was this patch tested?
Added test cases in parquetSuites.scala
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16500 from gatorsmile/refreshInsertIntoHiveTable.
Diffstat (limited to 'sql/hive/src/test/scala')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 75 |
1 files changed, 66 insertions, 9 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index d3e04d4bf2..aa4a150a4b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "normal_parquet", "jt", "jt_array", - "test_parquet") + "test_parquet") + super.afterAll() } test(s"conversion is working") { @@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Create partition without data files and check whether it can be read sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Add data files to partition directory and check whether they can be read sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1))) // Check it with pruning predicates checkAnswer( sql("SELECT * FROM test_added_partitions where b = 0"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 1"), - Seq(("baz", 1)).toDF("a", "b")) + Seq(Row("baz", 1))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 2"), - Seq[(String, Int)]().toDF("a", "b")) + Seq.empty) // Also verify the inputFiles implementation assert(sql("select * from test_added_partitions").inputFiles.length == 2) @@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } + test("Explicitly added partitions should be readable after load") { + withTable("test_added_partitions") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql( + """ + |CREATE TABLE test_added_partitions (a STRING) + |PARTITIONED BY (b INT) + |STORED AS PARQUET + """.stripMargin) + + // Create partition without data files and check whether it can be read + sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')") + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("test_added_partitions"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE test_added_partitions PARTITION(b='1') + """.stripMargin) + + checkAnswer( + spark.table("test_added_partitions"), + Seq(Row("0", 1), Row("1", 1))) + } + } + } + + test("Non-partitioned table readable after load") { + withTable("tab") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") + + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("tab"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE tab + """.stripMargin) + + checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1"))) + } + } + } + test("self-join") { val table = spark.table("normal_parquet") val selfJoin = table.as("t1").crossJoin(table.as("t2")) |