aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-10-19 10:20:12 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-19 10:20:12 +0800
commit5f20ae0394388574a3767daf7f499c89658f61e1 (patch)
tree7939521366699b07f05d79d47a31fe3b87beb0c8 /sql/core
parent941b3f9aca59e62c078508a934f8c2221ced96ce (diff)
downloadspark-5f20ae0394388574a3767daf7f499c89658f61e1.tar.gz
spark-5f20ae0394388574a3767daf7f499c89658f61e1.tar.bz2
spark-5f20ae0394388574a3767daf7f499c89658f61e1.zip
[SPARK-17980][SQL] Fix refreshByPath for converted Hive tables
## What changes were proposed in this pull request? There was a bug introduced in https://github.com/apache/spark/pull/14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions). This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue. cc sameeragarwal for refreshByPath changes mallman ## How was this patch tested? Extended unit test. Author: Eric Liang <ekl@databricks.com> Closes #15521 from ericl/fix-caching.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala18
3 files changed, 19 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 889b8a0278..aecdda1c36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -343,7 +343,8 @@ abstract class Catalog {
/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
- * contains the given data source path.
+ * contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
+ * everything that is cached.
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 92fd366e10..fb72c679e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -185,9 +185,10 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
+ val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
- .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
- .contains(qualifiedPath)
+ .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
+ .exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case _ => false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 5648ab480a..fc08c3798e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -48,13 +48,18 @@ class TableFileCatalog(
private val baseLocation = catalogTable.storage.locationUri
+ // Populated on-demand by calls to cachedAllPartitions
+ private var cachedAllPartitions: ListingFileCatalog = null
+
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}
- override def refresh(): Unit = {}
+ override def refresh(): Unit = synchronized {
+ cachedAllPartitions = null
+ }
/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
@@ -64,7 +69,7 @@ class TableFileCatalog(
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
if (filters.isEmpty) {
- cachedAllPartitions
+ allPartitions
} else {
filterPartitions0(filters)
}
@@ -89,9 +94,14 @@ class TableFileCatalog(
}
// Not used in the hot path of queries when metastore partition pruning is enabled
- lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
+ def allPartitions: ListingFileCatalog = synchronized {
+ if (cachedAllPartitions == null) {
+ cachedAllPartitions = filterPartitions0(Nil)
+ }
+ cachedAllPartitions
+ }
- override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
+ override def inputFiles: Array[String] = allPartitions.inputFiles
}
/**