aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-07-30 17:30:51 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-30 17:30:51 -0700
commit88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b (patch)
treea9636539a3ec6c6d072b3ed25ccf0e5295725960 /sql
parent2ac37db7ac8f7ec5c99f3bfe459f8e2ac240961f (diff)
downloadspark-88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b.tar.gz
spark-88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b.tar.bz2
spark-88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b.zip
[SPARK-2734][SQL] Remove tables from cache when DROP TABLE is run.
Author: Michael Armbrust <michael@databricks.com> Closes #1650 from marmbrus/dropCached and squashes the following commits: e6ab80b [Michael Armbrust] Support if exists. 83426c6 [Michael Armbrust] Remove tables from cache when DROP TABLE is run.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala48
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala16
4 files changed, 74 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index d18ccf8167..3d2eb1eefa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command
private[hive] case class AddFile(filePath: String) extends Command
+private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
+
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
@@ -96,7 +98,6 @@ private[hive] object HiveQl {
"TOK_CREATEINDEX",
"TOK_DROPDATABASE",
"TOK_DROPINDEX",
- "TOK_DROPTABLE",
"TOK_MSCK",
// TODO(marmbrus): Figure out how view are expanded by hive, as we might need to handle this.
@@ -377,6 +378,12 @@ private[hive] object HiveQl {
}
protected def nodeToPlan(node: Node): LogicalPlan = node match {
+ // Special drop table that also uncaches.
+ case Token("TOK_DROPTABLE",
+ Token("TOK_TABNAME", tableNameParts) ::
+ ifExists) =>
+ val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
+ DropTable(tableName, ifExists.nonEmpty)
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4d0fab4140..2175c5f383 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -81,6 +81,8 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
+ case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
+
case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala
new file mode 100644
index 0000000000..9cd0c86c6c
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+
+/**
+ * :: DeveloperApi ::
+ * Drops a table from the metastore and removes it if it is cached.
+ */
+@DeveloperApi
+case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
+
+ def hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+ def output = Seq.empty
+
+ override protected[sql] lazy val sideEffectResult: Seq[Any] = {
+ val ifExistsClause = if (ifExists) "IF EXISTS " else ""
+ hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
+ hiveContext.catalog.unregisterTable(None, tableName)
+ Seq.empty
+ }
+
+ override def execute(): RDD[Row] = {
+ sideEffectResult
+ sparkContext.emptyRDD[Row]
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 3132d0112c..08da6405a1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.hive.execution.HiveComparisonTest
import org.apache.spark.sql.hive.test.TestHive
class CachedTableSuite extends HiveComparisonTest {
+ import TestHive._
+
TestHive.loadTestTable("src")
test("cache table") {
@@ -32,6 +34,20 @@ class CachedTableSuite extends HiveComparisonTest {
createQueryTest("read from cached table",
"SELECT * FROM src LIMIT 1", reset = false)
+ test("Drop cached table") {
+ hql("CREATE TABLE test(a INT)")
+ cacheTable("test")
+ hql("SELECT * FROM test").collect()
+ hql("DROP TABLE test")
+ intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
+ hql("SELECT * FROM test").collect()
+ }
+ }
+
+ test("DROP nonexistant table") {
+ hql("DROP TABLE IF EXISTS nonexistantTable")
+ }
+
test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
case _ : InMemoryRelation => // Found evidence of caching