aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-06-10 20:43:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-10 20:43:18 -0700
commit468da03e23a01e02718608f05d778386cbb8416b (patch)
tree716d829d8a12df9f401c5b9e61902b52de6c4d49 /sql/hive
parent8e7b56f3d4917692d3ff44d91aa264738a6fc2ed (diff)
downloadspark-468da03e23a01e02718608f05d778386cbb8416b.tar.gz
spark-468da03e23a01e02718608f05d778386cbb8416b.tar.bz2
spark-468da03e23a01e02718608f05d778386cbb8416b.zip
[SPARK-15678] Add support to REFRESH data source paths
## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal <sameer@databricks.com> Closes #13566 from sameeragarwal/refresh-path-2.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala45
1 files changed, 45 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 52ba90f02c..5121440f06 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
Utils.deleteRecursively(tempPath)
}
+ test("SPARK-15678: REFRESH PATH") {
+ val tempPath: File = Utils.createTempDir()
+ tempPath.delete()
+ table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
+ sql("DROP TABLE IF EXISTS refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Cache the table.
+ sql("CACHE TABLE refreshTable")
+ assertCached(table("refreshTable"))
+ // Append new data.
+ table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
+ // We are still using the old data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Refresh the table.
+ sql(s"REFRESH ${tempPath.toString}")
+ // We are using the new data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+
+ // Drop the table and create it again.
+ sql("DROP TABLE refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+ // Refresh the table. REFRESH command should not make a uncached
+ // table cached.
+ sql(s"REFRESH ${tempPath.toString}")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+ sql("DROP TABLE refreshTable")
+ Utils.deleteRecursively(tempPath)
+ }
+
test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")