aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-06-14 11:44:37 -0700
committerWenchen Fan <wenchen@databricks.com>2016-06-14 11:44:37 -0700
commitdf4ea6614d709ee66f1ceb966df6216b125b8ea1 (patch)
tree41dd629c3ff4d166513d0f01887946152a91a17c
parentc5b735581922c52a1b1cc6cd8c7b5878d3cf8f20 (diff)
downloadspark-df4ea6614d709ee66f1ceb966df6216b125b8ea1.tar.gz
spark-df4ea6614d709ee66f1ceb966df6216b125b8ea1.tar.bz2
spark-df4ea6614d709ee66f1ceb966df6216b125b8ea1.zip
[SPARK-15864][SQL] Fix Inconsistent Behaviors when Uncaching Non-cached Tables
#### What changes were proposed in this pull request? To uncache a table, we have three different ways: - _SQL interface_: `UNCACHE TABLE` - _DataSet API_: `sparkSession.catalog.uncacheTable` - _DataSet API_: `sparkSession.table(tableName).unpersist()` When the table is not cached, - _SQL interface_: `UNCACHE TABLE non-cachedTable` -> **no error message** - _Dataset API_: `sparkSession.catalog.uncacheTable("non-cachedTable")` -> **report a strange error message:** ```requirement failed: Table [a: int] is not cached``` - _Dataset API_: `sparkSession.table("non-cachedTable").unpersist()` -> **no error message** This PR will make them consistent. No operation if the table has already been uncached. In addition, this PR also removes `uncacheQuery` and renames `tryUncacheQuery` to `uncacheQuery`, and documents it that it's noop if the table has already been uncached #### How was this patch tested? Improved the existing test case for verifying the cases when the table has not been cached. Also added test cases for verifying the cases when the table does not exist Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13593 from gatorsmile/uncacheNonCachedTable.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala24
8 files changed, 30 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5a67fc79ef..53779df3d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2316,7 +2316,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def unpersist(blocking: Boolean): this.type = {
- sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
+ sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
this
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4e95754e9b..de2503a87a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -104,22 +104,11 @@ private[sql] class CacheManager extends Logging {
}
}
- /** Removes the data for the given [[Dataset]] from the cache */
- private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
- val planToCache = query.queryExecution.analyzed
- val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
- require(dataIndex >= 0, s"Table $query is not cached.")
- cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
- cachedData.remove(dataIndex)
- }
-
/**
- * Tries to remove the data for the given [[Dataset]] from the cache
- * if it's cached
+ * Tries to remove the data for the given [[Dataset]] from the cache.
+ * No operation, if it's already uncached.
*/
- private[sql] def tryUncacheQuery(
- query: Dataset[_],
- blocking: Boolean = true): Boolean = writeLock {
+ private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
val found = dataIndex >= 0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 3e5eed2efa..5332366d24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -53,7 +53,7 @@ case class CacheTableCommand(
case class UncacheTableCommand(tableName: String) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.table(tableName).unpersist(blocking = false)
+ sparkSession.catalog.uncacheTable(tableName)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5fd0b83cf0..fc00912bf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -201,7 +201,7 @@ case class DropTableCommand(
case _ =>
})
try {
- sparkSession.sharedState.cacheManager.tryUncacheQuery(
+ sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 90db785332..58bb5cdca9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -374,7 +374,7 @@ case class TruncateTableCommand(
spark.sessionState.invalidateTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
- spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+ spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table '$tableName'", e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f42fd174b2..601334b97a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -292,7 +292,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def dropTempView(viewName: String): Unit = {
- sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+ sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
}
@@ -323,7 +323,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def uncacheTable(tableName: String): Unit = {
- sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
+ sparkSession.sharedState.cacheManager.uncacheQuery(query = sparkSession.table(tableName))
}
/**
@@ -367,7 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
- sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = true)
+ sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3306ac42a3..d7df18ae1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -186,12 +186,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assertCached(spark.table("testData"), 0)
}
- test("correct error on uncache of non-cached table") {
- intercept[IllegalArgumentException] {
- spark.catalog.uncacheTable("testData")
- }
- }
-
test("SELECT star from cached table") {
sql("SELECT * FROM testData").createOrReplaceTempView("selectStar")
spark.catalog.cacheTable("selectStar")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 5121440f06..e35a71917f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
-class CachedTableSuite extends QueryTest with TestHiveSingleton {
+class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext._
def rddIdOf(tableName: String): Int = {
@@ -95,9 +97,23 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
sql("DROP TABLE IF EXISTS nonexistantTable")
}
- test("correct error on uncache of non-cached table") {
- intercept[IllegalArgumentException] {
- spark.catalog.uncacheTable("src")
+ test("correct error on uncache of nonexistant tables") {
+ intercept[NoSuchTableException] {
+ spark.catalog.uncacheTable("nonexistantTable")
+ }
+ intercept[NoSuchTableException] {
+ sql("UNCACHE TABLE nonexistantTable")
+ }
+ }
+
+ test("no error on uncache of non-cached table") {
+ val tableName = "newTable"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(a INT)")
+ // no error will be reported in the following three ways to uncache a table.
+ spark.catalog.uncacheTable(tableName)
+ sql("UNCACHE TABLE newTable")
+ sparkSession.table(tableName).unpersist()
}
}