aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-06-10 20:43:18 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-10 20:43:18 -0700
commit468da03e23a01e02718608f05d778386cbb8416b (patch)
tree716d829d8a12df9f401c5b9e61902b52de6c4d49
parent8e7b56f3d4917692d3ff44d91aa264738a6fc2ed (diff)
downloadspark-468da03e23a01e02718608f05d778386cbb8416b.tar.gz
spark-468da03e23a01e02718608f05d778386cbb8416b.tar.bz2
spark-468da03e23a01e02718608f05d778386cbb8416b.zip
[SPARK-15678] Add support to REFRESH data source paths
## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal <sameer@databricks.com> Closes #13566 from sameeragarwal/refresh-path-2.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala45
8 files changed, 158 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d10255946a..044f910388 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -113,6 +113,7 @@ statement
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
tableIdentifier partitionSpec? describeColName? #describeTable
| REFRESH TABLE tableIdentifier #refreshTable
+ | REFRESH .*? #refreshResource
| CACHE LAZY? TABLE identifier (AS? query)? #cacheTable
| UNCACHE TABLE identifier #uncacheTable
| CLEAR CACHE #clearCache
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6ddb1a7a1f..083a63c98c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -226,4 +226,11 @@ abstract class Catalog {
*/
def refreshTable(tableName: String): Unit
+ /**
+ * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
+ * contains the given data source path.
+ *
+ * @since 2.0.0
+ */
+ def refreshByPath(path: String): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c8bdb0d22c..b584cf4079 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -19,10 +19,14 @@ package org.apache.spark.sql.execution
import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.hadoop.fs.{FileSystem, Path}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging {
case _ =>
}
}
+
+ /**
+ * Invalidates the cache of any data that contains `resourcePath` in one or more
+ * `HadoopFsRelation` node(s) as part of its logical plan.
+ */
+ private[sql] def invalidateCachedPath(
+ sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
+ val (fs, qualifiedPath) = {
+ val path = new Path(resourcePath)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ }
+
+ cachedData.foreach {
+ case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
+ val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
+ if (dataIndex >= 0) {
+ data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
+ cachedData.remove(dataIndex)
+ }
+ sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
+ case _ => // Do Nothing
+ }
+ }
+
+ /**
+ * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
+ * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
+ * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
+ * false.
+ */
+ private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, qualifiedPath: Path): Boolean = {
+ plan match {
+ case lr: LogicalRelation => lr.relation match {
+ case hr: HadoopFsRelation =>
+ val invalidate = hr.location.paths
+ .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ .contains(qualifiedPath)
+ if (invalidate) hr.location.refresh()
+ invalidate
+ case _ => false
+ }
+ case _ => false
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index dc74222051..06d8f15dc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -210,6 +209,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[RefreshTable]] logical plan.
+ */
+ override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan = withOrigin(ctx) {
+ val resourcePath = remainder(ctx.REFRESH.getSymbol).trim
+ RefreshResource(resourcePath)
+ }
+
+ /**
* Create a [[CacheTableCommand]] logical plan.
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index aa42eae986..31a2075d2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -102,6 +102,15 @@ case class RefreshTable(tableIdent: TableIdentifier)
}
}
+case class RefreshResource(path: String)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.catalog.refreshByPath(path)
+ Seq.empty[Row]
+ }
+}
+
/**
* Builds a map in which keys are case insensitive
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 70e17b10ac..f42fd174b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
}
+ /**
+ * Refresh the cache entry and the associated metadata for all dataframes (if any), that contain
+ * the given data source path.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def refreshByPath(resourcePath: String): Unit = {
+ sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession, resourcePath)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index ea57f71c50..b4fd0ef6ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
TableIdentifier("tmp"), ignoreIfNotExists = true)
}
+ test("SPARK-15678: not use cache on overwrite") {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("overwrite").parquet(path)
+ val df = spark.read.parquet(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("overwrite").parquet(path)
+ assert(df.count() == 1000)
+ spark.catalog.refreshByPath(path)
+ assert(df.count() == 10)
+ assert(spark.read.parquet(path).count() == 10)
+ }
+ }
+
+ test("SPARK-15678: not use cache on append") {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("append").parquet(path)
+ val df = spark.read.parquet(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("append").parquet(path)
+ assert(df.count() == 1000)
+ spark.catalog.refreshByPath(path)
+ assert(df.count() == 1010)
+ assert(spark.read.parquet(path).count() == 1010)
+ }
+ }
+
test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 52ba90f02c..5121440f06 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
Utils.deleteRecursively(tempPath)
}
+ test("SPARK-15678: REFRESH PATH") {
+ val tempPath: File = Utils.createTempDir()
+ tempPath.delete()
+ table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
+ sql("DROP TABLE IF EXISTS refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Cache the table.
+ sql("CACHE TABLE refreshTable")
+ assertCached(table("refreshTable"))
+ // Append new data.
+ table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
+ // We are still using the old data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Refresh the table.
+ sql(s"REFRESH ${tempPath.toString}")
+ // We are using the new data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+
+ // Drop the table and create it again.
+ sql("DROP TABLE refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+ // Refresh the table. REFRESH command should not make a uncached
+ // table cached.
+ sql(s"REFRESH ${tempPath.toString}")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+ sql("DROP TABLE refreshTable")
+ Utils.deleteRecursively(tempPath)
+ }
+
test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")