diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-06-10 20:43:18 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-06-10 20:43:18 -0700 |
commit | 468da03e23a01e02718608f05d778386cbb8416b (patch) | |
tree | 716d829d8a12df9f401c5b9e61902b52de6c4d49 /sql/hive | |
parent | 8e7b56f3d4917692d3ff44d91aa264738a6fc2ed (diff) | |
download | spark-468da03e23a01e02718608f05d778386cbb8416b.tar.gz spark-468da03e23a01e02718608f05d778386cbb8416b.tar.bz2 spark-468da03e23a01e02718608f05d778386cbb8416b.zip |
[SPARK-15678] Add support to REFRESH data source paths
## What changes were proposed in this pull request?
Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.
Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset
```
This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path.
Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset
```
## How was this patch tested?
Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`.
Author: Sameer Agarwal <sameer@databricks.com>
Closes #13566 from sameeragarwal/refresh-path-2.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 52ba90f02c..5121440f06 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { Utils.deleteRecursively(tempPath) } + test("SPARK-15678: REFRESH PATH") { + val tempPath: File = Utils.createTempDir() + tempPath.delete() + table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) + sql("DROP TABLE IF EXISTS refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Cache the table. + sql("CACHE TABLE refreshTable") + assertCached(table("refreshTable")) + // Append new data. + table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) + // We are still using the old data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Refresh the table. + sql(s"REFRESH ${tempPath.toString}") + // We are using the new data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + + // Drop the table and create it again. + sql("DROP TABLE refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + // Refresh the table. REFRESH command should not make a uncached + // table cached. + sql(s"REFRESH ${tempPath.toString}") + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + + sql("DROP TABLE refreshTable") + Utils.deleteRecursively(tempPath) + } + test("SPARK-11246 cache parquet table") { sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") |