diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-03-07 09:21:58 -0800 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-07 09:21:58 -0800 |
commit | c05baabf10dd4c808929b4ae7a6d118aba6dd665 (patch) | |
tree | 82bcafd601ad4d90279bf82ebbe7b6c9bec7b4cc /sql/hive | |
parent | 030acdd1f06f49383079c306b63e874ad738851f (diff) | |
download | spark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.tar.gz spark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.tar.bz2 spark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.zip |
[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table
## What changes were proposed in this pull request?
When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table.
This PR also includes some refactors:
1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating
2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does.
## How was this patch tested?
new regression test
Author: Wenchen Fan <wenchen@databricks.com>
Closes #17097 from cloud-fan/cache.
Diffstat (limited to 'sql/hive')
3 files changed, 5 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3c57ee4c8b..b8536d0c1b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -393,8 +393,8 @@ case class InsertIntoHiveTable( logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } - // Invalidate the cache. - sparkSession.catalog.uncacheTable(table.qualifiedName) + // un-cache this table. + sparkSession.catalog.uncacheTable(table.identifier.quotedString) sparkSession.sessionState.catalog.refreshTable(table.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 8ccc2b7527..2b3f36064c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -195,10 +195,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto tempPath.delete() table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) sql("DROP TABLE IF EXISTS refreshTable") - sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") - checkAnswer( - table("refreshTable"), - table("src").collect()) + sparkSession.catalog.createTable("refreshTable", tempPath.toString, "parquet") + checkAnswer(table("refreshTable"), table("src")) // Cache the table. sql("CACHE TABLE refreshTable") assertCached(table("refreshTable")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 3512c4a890..81af24979d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -453,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { // Converted test_parquet should be cached. sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK + case LogicalRelation(_: HadoopFsRelation, _, _) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + |