aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-07 09:21:58 -0800
committerXiao Li <gatorsmile@gmail.com>2017-03-07 09:21:58 -0800
commitc05baabf10dd4c808929b4ae7a6d118aba6dd665 (patch)
tree82bcafd601ad4d90279bf82ebbe7b6c9bec7b4cc /sql/core/src/test
parent030acdd1f06f49383079c306b63e874ad738851f (diff)
downloadspark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.tar.gz
spark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.tar.bz2
spark-c05baabf10dd4c808929b4ae7a6d118aba6dd665.zip
[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table
## What changes were proposed in this pull request? When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. This PR also includes some refactors: 1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating 2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17097 from cloud-fan/cache.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala50
1 files changed, 35 insertions, 15 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 2a0e088437..7a7d52b214 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -24,15 +24,15 @@ import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
import org.apache.spark.CleanerListener
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, Utils}
private case class BigData(s: String)
@@ -65,7 +65,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
maybeBlock.nonEmpty
}
- private def getNumInMemoryRelations(plan: LogicalPlan): Int = {
+ private def getNumInMemoryRelations(ds: Dataset[_]): Int = {
+ val plan = ds.queryExecution.withCachedData
var sum = plan.collect { case _: InMemoryRelation => 1 }.sum
plan.transformAllExpressions {
case e: SubqueryExpression =>
@@ -187,7 +188,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assertCached(spark.table("testData"))
assertResult(1, "InMemoryRelation not found, testData should have been cached") {
- getNumInMemoryRelations(spark.table("testData").queryExecution.withCachedData)
+ getNumInMemoryRelations(spark.table("testData"))
}
spark.catalog.cacheTable("testData")
@@ -580,21 +581,21 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
localRelation.createOrReplaceTempView("localRelation")
spark.catalog.cacheTable("localRelation")
- assert(getNumInMemoryRelations(localRelation.queryExecution.withCachedData) == 1)
+ assert(getNumInMemoryRelations(localRelation) == 1)
}
test("SPARK-19093 Caching in side subquery") {
withTempView("t1") {
Seq(1).toDF("c1").createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
- val cachedPlan =
+ val ds =
sql(
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
- """.stripMargin).queryExecution.optimizedPlan
- assert(getNumInMemoryRelations(cachedPlan) == 2)
+ """.stripMargin)
+ assert(getNumInMemoryRelations(ds) == 2)
}
}
@@ -610,17 +611,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.catalog.cacheTable("t4")
// Nested predicate subquery
- val cachedPlan =
+ val ds =
sql(
"""
|SELECT * FROM t1
|WHERE
|c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
- """.stripMargin).queryExecution.optimizedPlan
- assert(getNumInMemoryRelations(cachedPlan) == 3)
+ """.stripMargin)
+ assert(getNumInMemoryRelations(ds) == 3)
// Scalar subquery and predicate subquery
- val cachedPlan2 =
+ val ds2 =
sql(
"""
|SELECT * FROM (SELECT max(c1) FROM t1 GROUP BY c1)
@@ -630,8 +631,27 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
|EXISTS (SELECT c1 FROM t3)
|OR
|c1 IN (SELECT c1 FROM t4)
- """.stripMargin).queryExecution.optimizedPlan
- assert(getNumInMemoryRelations(cachedPlan2) == 4)
+ """.stripMargin)
+ assert(getNumInMemoryRelations(ds2) == 4)
+ }
+ }
+
+ test("SPARK-19765: UNCACHE TABLE should un-cache all cached plans that refer to this table") {
+ withTable("t") {
+ withTempPath { path =>
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
+ sql(s"CREATE TABLE t USING parquet LOCATION '$path'")
+ spark.catalog.cacheTable("t")
+ spark.table("t").select($"i").cache()
+ checkAnswer(spark.table("t").select($"i"), Row(1))
+ assertCached(spark.table("t").select($"i"))
+
+ Utils.deleteRecursively(path)
+ spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
+ spark.catalog.uncacheTable("t")
+ assert(spark.table("t").select($"i").count() == 0)
+ assert(getNumInMemoryRelations(spark.table("t").select($"i")) == 0)
+ }
}
}