From 468da03e23a01e02718608f05d778386cbb8416b Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 10 Jun 2016 20:43:18 -0700 Subject: [SPARK-15678] Add support to REFRESH data source paths ## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal Closes #13566 from sameeragarwal/refresh-path-2. --- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../org/apache/spark/sql/catalog/Catalog.scala | 7 +++ .../apache/spark/sql/execution/CacheManager.scala | 51 +++++++++++++++++++++- .../spark/sql/execution/SparkSqlParser.scala | 9 +++- .../spark/sql/execution/datasources/ddl.scala | 9 ++++ .../apache/spark/sql/internal/CatalogImpl.scala | 10 +++++ .../datasources/parquet/ParquetQuerySuite.scala | 28 ++++++++++++ .../apache/spark/sql/hive/CachedTableSuite.scala | 45 +++++++++++++++++++ 8 files changed, 158 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d10255946a..044f910388 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,7 @@ statement | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable + | REFRESH .*? #refreshResource | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6ddb1a7a1f..083a63c98c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -226,4 +226,11 @@ abstract class Catalog { */ def refreshTable(tableName: String): Unit + /** + * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that + * contains the given data source path. + * + * @since 2.0.0 + */ + def refreshByPath(path: String): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c8bdb0d22c..b584cf4079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,10 +19,14 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging { case _ => } } + + /** + * Invalidates the cache of any data that contains `resourcePath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + private[sql] def invalidateCachedPath( + sparkSession: SparkSession, resourcePath: String): Unit = writeLock { + val (fs, qualifiedPath) = { + val path = new Path(resourcePath) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + + cachedData.foreach { + case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => + val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) + if (dataIndex >= 0) { + data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) + cachedData.remove(dataIndex) + } + sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) + case _ => // Do Nothing + } + } + + /** + * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the + * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes + * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns + * false. + */ + private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, qualifiedPath: Path): Boolean = { + plan match { + case lr: LogicalRelation => lr.relation match { + case hr: HadoopFsRelation => + val invalidate = hr.location.paths + .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) + .contains(qualifiedPath) + if (invalidate) hr.location.refresh() + invalidate + case _ => false + } + case _ => false + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index dc74222051..06d8f15dc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -209,6 +208,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { RefreshTable(visitTableIdentifier(ctx.tableIdentifier)) } + /** + * Create a [[RefreshTable]] logical plan. + */ + override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan = withOrigin(ctx) { + val resourcePath = remainder(ctx.REFRESH.getSymbol).trim + RefreshResource(resourcePath) + } + /** * Create a [[CacheTableCommand]] logical plan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index aa42eae986..31a2075d2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -102,6 +102,15 @@ case class RefreshTable(tableIdent: TableIdentifier) } } +case class RefreshResource(path: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.catalog.refreshByPath(path) + Seq.empty[Row] + } +} + /** * Builds a map in which keys are case insensitive */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 70e17b10ac..f42fd174b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } + /** + * Refresh the cache entry and the associated metadata for all dataframes (if any), that contain + * the given data source path. + * + * @group cachemgmt + * @since 2.0.0 + */ + override def refreshByPath(resourcePath: String): Unit = { + sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession, resourcePath) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index ea57f71c50..b4fd0ef6ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext TableIdentifier("tmp"), ignoreIfNotExists = true) } + test("SPARK-15678: not use cache on overwrite") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("overwrite").parquet(path) + val df = spark.read.parquet(path).cache() + assert(df.count() == 1000) + spark.range(10).write.mode("overwrite").parquet(path) + assert(df.count() == 1000) + spark.catalog.refreshByPath(path) + assert(df.count() == 10) + assert(spark.read.parquet(path).count() == 10) + } + } + + test("SPARK-15678: not use cache on append") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("append").parquet(path) + val df = spark.read.parquet(path).cache() + assert(df.count() == 1000) + spark.range(10).write.mode("append").parquet(path) + assert(df.count() == 1000) + spark.catalog.refreshByPath(path) + assert(df.count() == 1010) + assert(spark.read.parquet(path).count() == 1010) + } + } + test("self-join") { // 4 rows, cells of column 1 of row 2 and row 4 are null val data = (1 to 4).map { i => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 52ba90f02c..5121440f06 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { Utils.deleteRecursively(tempPath) } + test("SPARK-15678: REFRESH PATH") { + val tempPath: File = Utils.createTempDir() + tempPath.delete() + table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) + sql("DROP TABLE IF EXISTS refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Cache the table. + sql("CACHE TABLE refreshTable") + assertCached(table("refreshTable")) + // Append new data. + table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) + // We are still using the old data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Refresh the table. + sql(s"REFRESH ${tempPath.toString}") + // We are using the new data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + + // Drop the table and create it again. + sql("DROP TABLE refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + // Refresh the table. REFRESH command should not make a uncached + // table cached. + sql(s"REFRESH ${tempPath.toString}") + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + + sql("DROP TABLE refreshTable") + Utils.deleteRecursively(tempPath) + } + test("SPARK-11246 cache parquet table") { sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") -- cgit v1.2.3