From 39a54b40aff66816f8b8f5c6133eaaad6eaecae1 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Mar 2015 22:42:18 +0800 Subject: [SPARK-6073][SQL] Need to refresh metastore cache after append data in CreateMetastoreDataSourceAsSelect JIRA: https://issues.apache.org/jira/browse/SPARK-6073 liancheng Author: Yin Huai Closes #4824 from yhuai/refreshCache and squashes the following commits: b9542ef [Yin Huai] Refresh metadata cache in the Catalog in CreateMetastoreDataSourceAsSelect. --- .../apache/spark/sql/hive/execution/commands.scala | 2 + .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 52 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 9934a5d3c3..ffaef8eef1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -248,6 +248,8 @@ case class CreateMetastoreDataSourceAsSelect( isExternal) } + // Refresh the cache of the table in the catalog. + hiveContext.refreshTable(tableName) Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 00306f1cd7..868c35f35f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -612,4 +612,56 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { val actualSchema = table("wide_schema").schema assert(schema === actualSchema) } + + test("insert into a table") { + def createDF(from: Int, to: Int): DataFrame = + createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2") + + createDF(0, 9).saveAsTable("insertParquet", "parquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 9).map(i => Row(i, s"str$i"))) + + intercept[AnalysisException] { + createDF(10, 19).saveAsTable("insertParquet", "parquet") + } + + createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + + createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), + (6 to 24).map(i => Row(i, s"str$i"))) + + intercept[AnalysisException] { + createDF(30, 39).saveAsTable("insertParquet") + } + + createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), + (6 to 34).map(i => Row(i, s"str$i"))) + + createDF(40, 49).insertInto("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), + (6 to 44).map(i => Row(i, s"str$i"))) + + createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), + (52 to 54).map(i => Row(i, s"str$i"))) + createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (50 to 59).map(i => Row(i, s"str$i"))) + + createDF(70, 79).insertInto("insertParquet", overwrite = true) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (70 to 79).map(i => Row(i, s"str$i"))) + } } -- cgit v1.2.3