aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala21
5 files changed, 39 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 889b8a0278..aecdda1c36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -343,7 +343,8 @@ abstract class Catalog {
/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
- * contains the given data source path.
+ * contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
+ * everything that is cached.
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 92fd366e10..fb72c679e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -185,9 +185,10 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
+ val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
- .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
- .contains(qualifiedPath)
+ .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
+ .exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case _ => false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 5648ab480a..fc08c3798e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -48,13 +48,18 @@ class TableFileCatalog(
private val baseLocation = catalogTable.storage.locationUri
+ // Populated on-demand by calls to cachedAllPartitions
+ private var cachedAllPartitions: ListingFileCatalog = null
+
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}
- override def refresh(): Unit = {}
+ override def refresh(): Unit = synchronized {
+ cachedAllPartitions = null
+ }
/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
@@ -64,7 +69,7 @@ class TableFileCatalog(
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
if (filters.isEmpty) {
- cachedAllPartitions
+ allPartitions
} else {
filterPartitions0(filters)
}
@@ -89,9 +94,14 @@ class TableFileCatalog(
}
// Not used in the hot path of queries when metastore partition pruning is enabled
- lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
+ def allPartitions: ListingFileCatalog = synchronized {
+ if (cachedAllPartitions == null) {
+ cachedAllPartitions = filterPartitions0(Nil)
+ }
+ cachedAllPartitions
+ }
- override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
+ override def inputFiles: Array[String] = allPartitions.inputFiles
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 16e1e37b2f..c909eb5d20 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
- catalog.cachedAllPartitions
+ catalog.allPartitions
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 7af81a3a90..2ca1cd4c07 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val df = spark.sql("select * from test")
assert(sql("select * from test").count() == 5)
+ def deleteRandomFile(): Unit = {
+ val p = new Path(spark.table("test").inputFiles.head)
+ assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
+ }
+
// Delete a file, then assert that we tried to read it. This means the table was cached.
- val p = new Path(spark.table("test").inputFiles.head)
- assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
+ deleteRandomFile()
val e = intercept[SparkException] {
sql("select * from test").count()
}
@@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
// Test refreshing the cache.
spark.catalog.refreshTable("test")
assert(sql("select * from test").count() == 4)
+ assert(spark.table("test").inputFiles.length == 4)
+
+ // Test refresh by path separately since it goes through different code paths than
+ // refreshTable does.
+ deleteRandomFile()
+ spark.catalog.cacheTable("test")
+ spark.catalog.refreshByPath("/some-invalid-path") // no-op
+ val e2 = intercept[SparkException] {
+ sql("select * from test").count()
+ }
+ assert(e2.getMessage.contains("FileNotFoundException"))
+ spark.catalog.refreshByPath(dir.getAbsolutePath)
+ assert(sql("select * from test").count() == 3)
}
}
}