From 70bfb5c7282df84e76eba01f59bf1b8551583c33 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 20 Feb 2015 16:20:02 +0800 Subject: [SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager JIRA: https://issues.apache.org/jira/browse/SPARK-5909 Author: Yin Huai Closes #4694 from yhuai/clearCache and squashes the following commits: 397ecc4 [Yin Huai] Address comments. a2702fc [Yin Huai] Update parser. 3a54506 [Yin Huai] add isEmpty to CacheManager. 6d14460 [Yin Huai] Python clearCache. f7b8dbd [Yin Huai] Add clear cache command. --- python/pyspark/sql/context.py | 4 ++++ .../main/scala/org/apache/spark/sql/CacheManager.scala | 6 ++++++ .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 5 +++++ .../main/scala/org/apache/spark/sql/SparkSQLParser.scala | 11 +++++++---- .../scala/org/apache/spark/sql/execution/commands.scala | 15 +++++++++++++++ .../scala/org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++ 6 files changed, 53 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 2e2309f103..3f168f718b 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -687,6 +687,10 @@ class SQLContext(object): """Removes the specified table from the in-memory cache.""" self._ssql_ctx.uncacheTable(tableName) + def clearCache(self): + """Removes all cached tables from the in-memory cache. """ + self._ssql_ctx.clearCache() + class HiveContext(SQLContext): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index f1949aa5dd..ca4a127120 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { } } + /** Clears all cached tables. */ private[sql] def clearCache(): Unit = writeLock { cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist()) cachedData.clear() } + /** Checks if the cache is empty. */ + private[sql] def isEmpty: Boolean = readLock { + cachedData.isEmpty + } + /** * Caches the data produced by the logical representation of the given schema rdd. Unlike * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a6cf3cd9dd..4bdaa02391 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) + /** + * Removes all cached tables from the in-memory cache. + */ + def clearCache(): Unit = cacheManager.clearCache() + // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index 00e19da437..5921eaf5e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.types.StringType @@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val AS = Keyword("AS") protected val CACHE = Keyword("CACHE") + protected val CLEAR = Keyword("CLEAR") protected val IN = Keyword("IN") protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") @@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr } private lazy val uncache: Parser[LogicalPlan] = - UNCACHE ~ TABLE ~> ident ^^ { - case tableName => UncacheTableCommand(tableName) - } + ( UNCACHE ~ TABLE ~> ident ^^ { + case tableName => UncacheTableCommand(tableName) + } + | CLEAR ~ CACHE ^^^ ClearCacheCommand + ) private lazy val set: Parser[LogicalPlan] = SET ~> restInput ^^ { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 7c92e9fc88..a11232142d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -174,6 +174,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { override def output: Seq[Attribute] = Seq.empty } +/** + * :: DeveloperApi :: + * Clear all cached data from the in-memory cache. + */ +@DeveloperApi +case object ClearCacheCommand extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + sqlContext.clearCache() + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + /** * :: DeveloperApi :: */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e70e866fdb..c240f2be95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest { assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found")) assert(!isCached("t2")) } + + test("Clear all cache") { + sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1") + sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2") + cacheTable("t1") + cacheTable("t2") + clearCache() + assert(cacheManager.isEmpty) + + sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1") + sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2") + cacheTable("t1") + cacheTable("t2") + sql("Clear CACHE") + assert(cacheManager.isEmpty) + } } -- cgit v1.2.3