aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/context.py4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala16
6 files changed, 53 insertions, 4 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 2e2309f103..3f168f718b 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -687,6 +687,10 @@ class SQLContext(object):
"""Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName)
+ def clearCache(self):
+ """Removes all cached tables from the in-memory cache. """
+ self._ssql_ctx.clearCache()
+
class HiveContext(SQLContext):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index f1949aa5dd..ca4a127120 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
}
}
+ /** Clears all cached tables. */
private[sql] def clearCache(): Unit = writeLock {
cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
cachedData.clear()
}
+ /** Checks if the cache is empty. */
+ private[sql] def isEmpty: Boolean = readLock {
+ cachedData.isEmpty
+ }
+
/**
* Caches the data produced by the logical representation of the given schema rdd. Unlike
* `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a6cf3cd9dd..4bdaa02391 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
+ /**
+ * Removes all cached tables from the in-memory cache.
+ */
+ def clearCache(): Unit = cacheManager.clearCache()
+
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 00e19da437..5921eaf5e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand}
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.types.StringType
@@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
protected val AS = Keyword("AS")
protected val CACHE = Keyword("CACHE")
+ protected val CLEAR = Keyword("CLEAR")
protected val IN = Keyword("IN")
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
@@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
}
private lazy val uncache: Parser[LogicalPlan] =
- UNCACHE ~ TABLE ~> ident ^^ {
- case tableName => UncacheTableCommand(tableName)
- }
+ ( UNCACHE ~ TABLE ~> ident ^^ {
+ case tableName => UncacheTableCommand(tableName)
+ }
+ | CLEAR ~ CACHE ^^^ ClearCacheCommand
+ )
private lazy val set: Parser[LogicalPlan] =
SET ~> restInput ^^ {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 7c92e9fc88..a11232142d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -176,6 +176,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
/**
* :: DeveloperApi ::
+ * Clear all cached data from the in-memory cache.
+ */
+@DeveloperApi
+case object ClearCacheCommand extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
+ sqlContext.clearCache()
+ Seq.empty[Row]
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
+
+/**
+ * :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e70e866fdb..c240f2be95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest {
assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
assert(!isCached("t2"))
}
+
+ test("Clear all cache") {
+ sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+ sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+ cacheTable("t1")
+ cacheTable("t2")
+ clearCache()
+ assert(cacheManager.isEmpty)
+
+ sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
+ sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
+ cacheTable("t1")
+ cacheTable("t2")
+ sql("Clear CACHE")
+ assert(cacheManager.isEmpty)
+ }
}