aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2017-03-01 00:19:57 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-01 00:19:57 -0800
commit38e7835347a2e1803b1df5e73cf8b749951b11b2 (patch)
tree47bf9b56af726f39cc414714f59d61b72e784780 /sql/core/src/main/scala/org/apache
parent4913c92c2fbfcc22b41afb8ce79687165392d7da (diff)
downloadspark-38e7835347a2e1803b1df5e73cf8b749951b11b2.tar.gz
spark-38e7835347a2e1803b1df5e73cf8b749951b11b2.tar.bz2
spark-38e7835347a2e1803b1df5e73cf8b749951b11b2.zip
[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path
## What changes were proposed in this pull request? `Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17064 from viirya/fix-refreshByPath.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala19
1 files changed, 10 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4ca1347008..80138510dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
(fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
- cachedData.foreach {
- case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
- val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
- if (dataIndex >= 0) {
- data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
- cachedData.remove(dataIndex)
- }
- sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
- case _ => // Do Nothing
+ cachedData.filter {
+ case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true
+ case _ => false
+ }.foreach { data =>
+ val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
+ if (dataIndex >= 0) {
+ data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
+ cachedData.remove(dataIndex)
+ }
+ sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
}
}