aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-11-02 16:00:24 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 16:00:24 -0800
commit9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (patch)
tree1092ba773f5e55f1c5e4a984601d907e0eda97e2 /sql
parent06232d23ff2a6344c49fff81364d9f6b02af326b (diff)
downloadspark-9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130.tar.gz
spark-9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130.tar.bz2
spark-9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130.zip
[SPARK-2189][SQL] Adds dropTempTable API
This PR adds an API for unregistering temporary tables. If a temporary table has been cached before, it's unpersisted as well. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #3039 from liancheng/unregister-temp-table and squashes the following commits: 54ae99f [Cheng Lian] Fixes Scala styling issue 1948c14 [Cheng Lian] Removes the unpersist argument aca41d3 [Cheng Lian] Ensures thread safety 7d4fb2b [Cheng Lian] Adds unregisterTempTable API
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala20
3 files changed, 46 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3ced11a5e6..2e7abac1f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -103,6 +103,19 @@ private[sql] trait CacheManager {
cachedData.remove(dataIndex)
}
+ /** Tries to remove the data for the given SchemaRDD from the cache if it's cached */
+ private[sql] def tryUncacheQuery(
+ query: SchemaRDD,
+ blocking: Boolean = true): Boolean = writeLock {
+ val planToCache = query.queryExecution.analyzed
+ val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+ val found = dataIndex >= 0
+ if (found) {
+ cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+ cachedData.remove(dataIndex)
+ }
+ found
+ }
/** Optionally returns cached data for the given SchemaRDD */
private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4cded98c80..3cf6af5f7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -277,6 +277,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
/**
+ * Drops the temporary table with the given table name in the catalog. If the table has been
+ * cached/persisted before, it's also unpersisted.
+ *
+ * @param tableName the name of the table to be unregistered.
+ *
+ * @group userf
+ */
+ def dropTempTable(tableName: String): Unit = {
+ tryUncacheQuery(table(tableName))
+ catalog.unregisterTable(None, tableName)
+ }
+
+ /**
* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 44a2961b27..765fa82776 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -231,4 +231,24 @@ class CachedTableSuite extends QueryTest {
assert(cached.statistics.sizeInBytes === actualSizeInBytes)
}
}
+
+ test("Drops temporary table") {
+ testData.select('key).registerTempTable("t1")
+ table("t1")
+ dropTempTable("t1")
+ assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
+ }
+
+ test("Drops cached temporary table") {
+ testData.select('key).registerTempTable("t1")
+ testData.select('key).registerTempTable("t2")
+ cacheTable("t1")
+
+ assert(isCached("t1"))
+ assert(isCached("t2"))
+
+ dropTempTable("t1")
+ assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found"))
+ assert(!isCached("t2"))
+ }
}