aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-01-15 20:40:44 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-15 20:40:44 +0800
commitde62ddf7ff42bdc383da127e6b1155897565354c (patch)
treeb76a3aecec906c88603baf55fbfb1c858f719835 /sql/hive/src/test/scala/org
parenta5e651f4c6f243b59724fd46237407374017a035 (diff)
downloadspark-de62ddf7ff42bdc383da127e6b1155897565354c.tar.gz
spark-de62ddf7ff42bdc383da127e6b1155897565354c.tar.bz2
spark-de62ddf7ff42bdc383da127e6b1155897565354c.zip
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables
### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes #16500 from gatorsmile/refreshInsertIntoHiveTable.
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala75
1 files changed, 66 insertions, 9 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index d3e04d4bf2..aa4a150a4b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,8 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"normal_parquet",
"jt",
"jt_array",
- "test_parquet")
+ "test_parquet")
+ super.afterAll()
}
test(s"conversion is working") {
@@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
- Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+ Seq(Row("foo", 0), Row("bar", 0)))
// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
- Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+ Seq(Row("foo", 0), Row("bar", 0)))
// Add data files to partition directory and check whether they can be read
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
- Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
+ Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
// Check it with pruning predicates
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 0"),
- Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
+ Seq(Row("foo", 0), Row("bar", 0)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 1"),
- Seq(("baz", 1)).toDF("a", "b"))
+ Seq(Row("baz", 1)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 2"),
- Seq[(String, Int)]().toDF("a", "b"))
+ Seq.empty)
// Also verify the inputFiles implementation
assert(sql("select * from test_added_partitions").inputFiles.length == 2)
@@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}
+ test("Explicitly added partitions should be readable after load") {
+ withTable("test_added_partitions") {
+ withTempDir { src =>
+ val newPartitionDir = src.getCanonicalPath
+ spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+ .mode("overwrite")
+ .parquet(newPartitionDir)
+
+ sql(
+ """
+ |CREATE TABLE test_added_partitions (a STRING)
+ |PARTITIONED BY (b INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // Create partition without data files and check whether it can be read
+ sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
+ // This table fetch is to fill the cache with zero leaf files
+ checkAnswer(spark.table("test_added_partitions"), Seq.empty)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+ |INTO TABLE test_added_partitions PARTITION(b='1')
+ """.stripMargin)
+
+ checkAnswer(
+ spark.table("test_added_partitions"),
+ Seq(Row("0", 1), Row("1", 1)))
+ }
+ }
+ }
+
+ test("Non-partitioned table readable after load") {
+ withTable("tab") {
+ withTempDir { src =>
+ val newPartitionDir = src.getCanonicalPath
+ spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+ .mode("overwrite")
+ .parquet(newPartitionDir)
+
+ sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
+
+ // This table fetch is to fill the cache with zero leaf files
+ checkAnswer(spark.table("tab"), Seq.empty)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+ |INTO TABLE tab
+ """.stripMargin)
+
+ checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
+ }
+ }
+ }
+
test("self-join") {
val table = spark.table("normal_parquet")
val selfJoin = table.as("t1").crossJoin(table.as("t2"))