From 88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 30 Jul 2014 17:30:51 -0700 Subject: [SPARK-2734][SQL] Remove tables from cache when DROP TABLE is run. Author: Michael Armbrust Closes #1650 from marmbrus/dropCached and squashes the following commits: e6ab80b [Michael Armbrust] Support if exists. 83426c6 [Michael Armbrust] Remove tables from cache when DROP TABLE is run. --- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 9 +++- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 + .../spark/sql/hive/execution/DropTable.scala | 48 ++++++++++++++++++++++ .../apache/spark/sql/hive/CachedTableSuite.scala | 16 ++++++++ 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index d18ccf8167..3d2eb1eefa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command private[hive] case class AddFile(filePath: String) extends Command +private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( @@ -96,7 +98,6 @@ private[hive] object HiveQl { "TOK_CREATEINDEX", "TOK_DROPDATABASE", "TOK_DROPINDEX", - "TOK_DROPTABLE", "TOK_MSCK", // TODO(marmbrus): Figure out how view are expanded by hive, as we might need to handle this. @@ -377,6 +378,12 @@ private[hive] object HiveQl { } protected def nodeToPlan(node: Node): LogicalPlan = node match { + // Special drop table that also uncaches. + case Token("TOK_DROPTABLE", + Token("TOK_TABNAME", tableNameParts) :: + ifExists) => + val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") + DropTable(tableName, ifExists.nonEmpty) // Just fake explain for any of the native commands. case Token("TOK_EXPLAIN", explainArgs) if noExplainCommands.contains(explainArgs.head.getText) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4d0fab4140..2175c5f383 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -81,6 +81,8 @@ private[hive] trait HiveStrategies { case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil + case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala new file mode 100644 index 0000000000..9cd0c86c6c --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.hive.HiveContext + +/** + * :: DeveloperApi :: + * Drops a table from the metastore and removes it if it is cached. + */ +@DeveloperApi +case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command { + + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + def output = Seq.empty + + override protected[sql] lazy val sideEffectResult: Seq[Any] = { + val ifExistsClause = if (ifExists) "IF EXISTS " else "" + hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") + hiveContext.catalog.unregisterTable(None, tableName) + Seq.empty + } + + override def execute(): RDD[Row] = { + sideEffectResult + sparkContext.emptyRDD[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 3132d0112c..08da6405a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.hive.execution.HiveComparisonTest import org.apache.spark.sql.hive.test.TestHive class CachedTableSuite extends HiveComparisonTest { + import TestHive._ + TestHive.loadTestTable("src") test("cache table") { @@ -32,6 +34,20 @@ class CachedTableSuite extends HiveComparisonTest { createQueryTest("read from cached table", "SELECT * FROM src LIMIT 1", reset = false) + test("Drop cached table") { + hql("CREATE TABLE test(a INT)") + cacheTable("test") + hql("SELECT * FROM test").collect() + hql("DROP TABLE test") + intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] { + hql("SELECT * FROM test").collect() + } + } + + test("DROP nonexistant table") { + hql("DROP TABLE IF EXISTS nonexistantTable") + } + test("check that table is cached and uncache") { TestHive.table("src").queryExecution.analyzed match { case _ : InMemoryRelation => // Found evidence of caching -- cgit v1.2.3