diff options
author | Liang-Chi Hsieh <viirya@gmail.com> | 2017-03-01 00:19:57 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-01 00:19:57 -0800 |
commit | 38e7835347a2e1803b1df5e73cf8b749951b11b2 (patch) | |
tree | 47bf9b56af726f39cc414714f59d61b72e784780 /sql | |
parent | 4913c92c2fbfcc22b41afb8ce79687165392d7da (diff) | |
download | spark-38e7835347a2e1803b1df5e73cf8b749951b11b2.tar.gz spark-38e7835347a2e1803b1df5e73cf8b749951b11b2.tar.bz2 spark-38e7835347a2e1803b1df5e73cf8b749951b11b2.zip |
[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path
## What changes were proposed in this pull request?
`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.
However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #17064 from viirya/fix-refreshByPath.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala | 19 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 16 |
2 files changed, 26 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4ca1347008..80138510dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } - cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => - val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) - if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) - } - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing + cachedData.filter { + case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true + case _ => false + }.foreach { data => + val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) + if (dataIndex >= 0) { + data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) + cachedData.remove(dataIndex) + } + sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 1af1a36529..2a0e088437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { + withTempDir { dir => + val path = dir.getCanonicalPath() + + spark.range(10).write.mode("overwrite").parquet(path) + spark.read.parquet(path).cache() + spark.read.parquet(path).filter($"id" > 4).cache() + assert(spark.read.parquet(path).filter($"id" > 4).count() == 5) + + spark.range(20).write.mode("overwrite").parquet(path) + spark.catalog.refreshByPath(path) + assert(spark.read.parquet(path).count() == 20) + assert(spark.read.parquet(path).filter($"id" > 4).count() == 15) + } + } } |