diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-04-09 17:40:36 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-09 17:40:36 -0700 |
commit | dfce9665c4b2b29a19e6302216dae2800da68ff9 (patch) | |
tree | a1a1c2e14c14a28249eb4a2b29ca47ea1a07947f | |
parent | 9be5558e009069925d1f2d737d42e1683ed6b47f (diff) | |
download | spark-dfce9665c4b2b29a19e6302216dae2800da68ff9.tar.gz spark-dfce9665c4b2b29a19e6302216dae2800da68ff9.tar.bz2 spark-dfce9665c4b2b29a19e6302216dae2800da68ff9.zip |
[SPARK-14362][SPARK-14406][SQL] DDL Native Support: Drop View and Drop Table
#### What changes were proposed in this pull request?
This PR is to provide a native support for DDL `DROP VIEW` and `DROP TABLE`. The PR includes native parsing and native analysis.
Based on the HIVE DDL document for [DROP_VIEW_WEB_LINK](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-
DropView
), `DROP VIEW` is defined as,
**Syntax:**
```SQL
DROP VIEW [IF EXISTS] [db_name.]view_name;
```
- to remove metadata for the specified view.
- illegal to use DROP TABLE on a view.
- illegal to use DROP VIEW on a table.
- this command only works in `HiveContext`. In `SQLContext`, we will get an exception.
This PR also handles `DROP TABLE`.
**Syntax:**
```SQL
DROP TABLE [IF EXISTS] table_name [PURGE];
```
- Previously, the `DROP TABLE` command only can drop Hive tables in `HiveContext`. Now, after this PR, this command also can drop temporary table, external table, external data source table in `SQLContext`.
- In `HiveContext`, we will not issue an exception if the to-be-dropped table does not exist and users did not specify `IF EXISTS`. Instead, we just log an error message. If `IF EXISTS` is specified, we will not issue any error message/exception.
- In `SQLContext`, we will issue an exception if the to-be-dropped table does not exist, unless `IF EXISTS` is specified.
- Data will not be deleted if the tables are `external`, unless table type is `managed_table`.
#### How was this patch tested?
For verifying command parsing, added test cases in `spark/sql/hive/HiveDDLCommandSuite.scala`
For verifying command analysis, added test cases in `spark/sql/hive/execution/HiveDDLSuite.scala`
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes #12146 from gatorsmile/dropView.
16 files changed, 376 insertions, 63 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 85cb585919..2f2e060b38 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -104,6 +104,7 @@ statement REPLACE COLUMNS '(' colTypeList ')' (CASCADE | RESTRICT)? #replaceColumns | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? (FOR METADATA? REPLICATION '(' STRING ')')? #dropTable + | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable | CREATE (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? (COMMENT STRING)? (PARTITIONED ON identifierList)? @@ -141,7 +142,6 @@ hiveNativeCommands | DELETE FROM tableIdentifier (WHERE booleanExpression)? | TRUNCATE TABLE tableIdentifier partitionSpec? (COLUMNS identifierList)? - | DROP VIEW (IF EXISTS)? qualifiedName | SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM|IN) identifier)? | START TRANSACTION (transactionMode (',' transactionMode)*)? | COMMIT WORK? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 186bbccef1..1994acd1ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -187,6 +187,10 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } + override def getTableOption(db: String, table: String): Option[CatalogTable] = synchronized { + if (!tableExists(db, table)) None else Option(catalog(db).tables(table).table) + } + override def tableExists(db: String, table: String): Boolean = synchronized { requireDbExists(db) catalog(db).tables.contains(table) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7db9fd0527..c1e5a485e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -21,6 +21,7 @@ import java.io.File import scala.collection.mutable +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -41,7 +42,7 @@ class SessionCatalog( externalCatalog: ExternalCatalog, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: CatalystConf) { + conf: CatalystConf) extends Logging { import ExternalCatalog._ def this( @@ -175,6 +176,17 @@ class SessionCatalog( externalCatalog.getTable(db, table) } + /** + * Retrieve the metadata of an existing metastore table. + * If no database is specified, assume the table is in the current database. + * If the specified table is not found in the database then return None if it doesn't exist. + */ + def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + externalCatalog.getTableOption(db, table) + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- @@ -229,7 +241,13 @@ class SessionCatalog( val db = name.database.getOrElse(currentDb) val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { - externalCatalog.dropTable(db, table, ignoreIfNotExists) + // When ignoreIfNotExists is false, no exception is issued when the table does not exist. + // Instead, log it as an error message. This is consistent with Hive. + if (externalCatalog.tableExists(db, table)) { + externalCatalog.dropTable(db, table, ignoreIfNotExists = true) + } else if (!ignoreIfNotExists) { + logError(s"Table '${name.quotedString}' does not exist") + } } else { tempTables.remove(table) } @@ -283,10 +301,15 @@ class SessionCatalog( * explicitly specified. */ def isTemporaryTable(name: TableIdentifier): Boolean = { - !name.database.isDefined && tempTables.contains(formatTableName(name.table)) + name.database.isEmpty && tempTables.contains(formatTableName(name.table)) } /** + * Return whether View is supported + */ + def isViewSupported: Boolean = false + + /** * List all tables in the specified database, including temporary tables. */ def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index e29d6bd8b0..4ef59316ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,6 +91,8 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable + def getTableOption(db: String, table: String): Option[CatalogTable] + def tableExists(db: String, table: String): Boolean def listTables(db: String): Seq[String] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 1850dc8156..862fc275ad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -233,10 +233,9 @@ class SessionCatalogSuite extends SparkFunSuite { intercept[AnalysisException] { catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true) } - // Table does not exist - intercept[AnalysisException] { - catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false) - } + // If the table does not exist, we do not issue an exception. Instead, we output an error log + // message to console when ignoreIfNotExists is set to false. + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false) catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c8d0f4e3c5..3da715cdb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -364,6 +364,22 @@ class SparkSqlAstBuilder extends AstBuilder { } /** + * Create a [[DropTable]] command. + */ + override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { + if (ctx.PURGE != null) { + throw new ParseException("Unsupported operation: PURGE option", ctx) + } + if (ctx.REPLICATION != null) { + throw new ParseException("Unsupported operation: REPLICATION clause", ctx) + } + DropTable( + visitTableIdentifier(ctx.tableIdentifier), + ctx.EXISTS != null, + ctx.VIEW != null) + } + + /** * Create a [[AlterTableRename]] command. * * For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 20779d68e0..e941736f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types._ @@ -176,12 +176,60 @@ case class DescribeDatabase( } /** + * Drops a table/view from the metastore and removes it if it is cached. + * + * The syntax of this command is: + * {{{ + * DROP TABLE [IF EXISTS] table_name; + * DROP VIEW [IF EXISTS] [db_name.]view_name; + * }}} + */ +case class DropTable( + tableName: TableIdentifier, + ifExists: Boolean, + isView: Boolean) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + if (isView && !catalog.isViewSupported) { + throw new AnalysisException(s"Not supported object: views") + } + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadataOption(tableName).map(_.tableType match { + case CatalogTableType.VIRTUAL_VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIRTUAL_VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + }) + + try { + sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString)) + } catch { + // This table's metadata is not in Hive metastore (e.g. the table does not exist). + case e if e.getClass.getName == "org.apache.hadoop.hive.ql.metadata.InvalidTableException" => + case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException => + // Other Throwables can be caused by users providing wrong parameters in OPTIONS + // (e.g. invalid paths). We catch it and log a warning message. + // Users should be able to drop such kinds of tables regardless if there is an error. + case e: Throwable => log.warn(s"${e.getMessage}", e) + } + catalog.invalidateTable(tableName) + catalog.dropTable(tableName, ifExists) + Seq.empty[Row] + } +} + +/** * A command that renames a table/view. * * The syntax of this command is: * {{{ - * ALTER TABLE table1 RENAME TO table2; - * ALTER VIEW view1 RENAME TO view2; + * ALTER TABLE table1 RENAME TO table2; + * ALTER VIEW view1 RENAME TO view2; * }}} */ case class AlterTableRename( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b1c1fd0951..ac69518ddf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ class DDLCommandSuite extends PlanTest { @@ -667,7 +666,10 @@ class DDLCommandSuite extends PlanTest { test("unsupported operations") { intercept[ParseException] { - parser.parsePlan("DROP TABLE D1.T1") + parser.parsePlan("DROP TABLE tab PURGE") + } + intercept[ParseException] { + parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')") } intercept[ParseException] { parser.parsePlan("CREATE VIEW testView AS SELECT id FROM tab") @@ -700,4 +702,52 @@ class DDLCommandSuite extends PlanTest { val parsed = parser.parsePlan(sql) assert(parsed.isInstanceOf[Project]) } + + test("drop table") { + val tableName1 = "db.tab" + val tableName2 = "tab" + + val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1") + val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1") + val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2") + val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2") + + val expected1 = + DropTable(TableIdentifier("tab", Option("db")), ifExists = false, isView = false) + val expected2 = + DropTable(TableIdentifier("tab", Option("db")), ifExists = true, isView = false) + val expected3 = + DropTable(TableIdentifier("tab", None), ifExists = false, isView = false) + val expected4 = + DropTable(TableIdentifier("tab", None), ifExists = true, isView = false) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + } + + test("drop view") { + val viewName1 = "db.view" + val viewName2 = "view" + + val parsed1 = parser.parsePlan(s"DROP VIEW $viewName1") + val parsed2 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName1") + val parsed3 = parser.parsePlan(s"DROP VIEW $viewName2") + val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") + + val expected1 = + DropTable(TableIdentifier("view", Option("db")), ifExists = false, isView = true) + val expected2 = + DropTable(TableIdentifier("view", Option("db")), ifExists = true, isView = true) + val expected3 = + DropTable(TableIdentifier("view", None), ifExists = false, isView = true) + val expected4 = + DropTable(TableIdentifier("view", None), ifExists = true, isView = true) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7084665b3b..e75e5f5cb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -391,6 +391,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Nil) } + test("drop table - temporary table") { + val catalog = sqlContext.sessionState.catalog + sql( + """ + |CREATE TEMPORARY TABLE tab1 + |USING org.apache.spark.sql.sources.DDLScanSource + |OPTIONS ( + | From '1', + | To '10', + | Table 'test1' + |) + """.stripMargin) + assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"))) + sql("DROP TABLE tab1") + assert(catalog.listTables("default") == Nil) + } + + test("drop table") { + testDropTable(isDatasourceTable = false) + } + + test("drop table - data source table") { + testDropTable(isDatasourceTable = true) + } + + private def testDropTable(isDatasourceTable: Boolean): Unit = { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + assert(catalog.listTables("dbx") == Seq(tableIdent)) + sql("DROP TABLE dbx.tab1") + assert(catalog.listTables("dbx") == Nil) + sql("DROP TABLE IF EXISTS dbx.tab1") + // no exception will be thrown + sql("DROP TABLE dbx.tab1") + } + + test("drop view") { + val e = intercept[AnalysisException] { + sql("DROP VIEW dbx.tab1") + } + assert(e.getMessage.contains("Not supported object: views")) + } + private def convertToDatasourceTable( catalog: SessionCatalog, tableIdent: TableIdentifier): Unit = { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index e93b0c145f..9ec8b9a9a6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -172,7 +172,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SELECT COUNT(*) FROM hive_test;" -> "5", "DROP TABLE hive_test;" - -> "OK" + -> "" ) } @@ -220,9 +220,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SELECT count(key) FROM t1;" -> "5", "DROP TABLE t1;" - -> "OK", + -> "", "DROP TABLE sourceTable;" - -> "OK" + -> "" ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b1156fb3e2..a49ce33ba1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -182,6 +182,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.getTable(db, table) } + override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { + client.getTableOption(db, table) + } + override def tableExists(db: String, table: String): Boolean = withClient { client.getTableOption(db, table).isDefined } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 0cccc22e5a..875652c226 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -70,6 +70,8 @@ private[sql] class HiveSessionCatalog( } } + override def isViewSupported: Boolean = true + // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 657edb493a..7a435117e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -104,19 +104,6 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder { } /** - * Create a [[DropTable]] command. - */ - override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - if (ctx.PURGE != null) { - logWarning("PURGE option is ignored.") - } - if (ctx.REPLICATION != null) { - logWarning("REPLICATION clause is ignored.") - } - DropTable(visitTableIdentifier(ctx.tableIdentifier).toString, ctx.EXISTS != null) - } - - /** * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other * options are passed on to Hive) e.g.: * {{{ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 64d1341a47..06badff474 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -46,36 +46,6 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { } } -/** - * Drops a table from the metastore and removes it if it is cached. - */ -private[hive] -case class DropTable( - tableName: String, - ifExists: Boolean) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val hiveContext = sqlContext.asInstanceOf[HiveContext] - val ifExistsClause = if (ifExists) "IF EXISTS " else "" - try { - hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName)) - } catch { - // This table's metadata is not in Hive metastore (e.g. the table does not exist). - case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => - case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException => - // Other Throwables can be caused by users providing wrong parameters in OPTIONS - // (e.g. invalid paths). We catch it and log a warning message. - // Users should be able to drop such kinds of tables regardless if there is an error. - case e: Throwable => log.warn(s"${e.getMessage}", e) - } - hiveContext.invalidateTable(tableName) - hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.sessionState.catalog.dropTable( - TableIdentifier(tableName), ignoreIfNotExists = true) - Seq.empty[Row] - } -} - private[hive] case class AddJar(path: String) extends RunnableCommand { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 12a582c10a..a144da4997 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -72,6 +72,7 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) + assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) @@ -118,6 +119,7 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) + assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == CatalogColumn("dt", "string", comment = Some("date type")) :: CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) @@ -138,6 +140,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText + assert(desc.viewOriginalText.isEmpty) assert(desc.storage.serdeProperties == Map()) assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == @@ -173,6 +176,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText + assert(desc.viewOriginalText.isEmpty) assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) @@ -286,7 +290,7 @@ class HiveDDLCommandSuite extends PlanTest { } test("use backticks in output of Script Transform") { - val plan = parser.parsePlan( + parser.parsePlan( """SELECT `t`.`thing1` |FROM (SELECT TRANSFORM (`parquet_t1`.`key`, `parquet_t1`.`value`) |USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t @@ -294,7 +298,7 @@ class HiveDDLCommandSuite extends PlanTest { } test("use backticks in output of Generator") { - val plan = parser.parsePlan( + parser.parsePlan( """ |SELECT `gentab2`.`gencol2` |FROM `default`.`src` @@ -304,7 +308,7 @@ class HiveDDLCommandSuite extends PlanTest { } test("use escaped backticks in output of Generator") { - val plan = parser.parsePlan( + parser.parsePlan( """ |SELECT `gen``tab2`.`gen``col2` |FROM `default`.`src` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala new file mode 100644 index 0000000000..78ccdc7adb --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext.implicits._ + + // check if the directory for recording the data of the table exists. + private def tableDirectoryExists(tableIdentifier: TableIdentifier): Boolean = { + val expectedTablePath = + hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) + val filesystemPath = new Path(expectedTablePath) + val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + fs.exists(filesystemPath) + } + + test("drop tables") { + withTable("tab1") { + val tabName = "tab1" + + assert(!tableDirectoryExists(TableIdentifier(tabName))) + sql(s"CREATE TABLE $tabName(c1 int)") + + assert(tableDirectoryExists(TableIdentifier(tabName))) + sql(s"DROP TABLE $tabName") + + assert(!tableDirectoryExists(TableIdentifier(tabName))) + sql(s"DROP TABLE IF EXISTS $tabName") + sql(s"DROP VIEW IF EXISTS $tabName") + } + } + + test("drop managed tables") { + withTempDir { tmpDir => + val tabName = "tab1" + withTable(tabName) { + assert(tmpDir.listFiles.isEmpty) + sql( + s""" + |create table $tabName + |stored as parquet + |location '$tmpDir' + |as select 1, '3' + """.stripMargin) + + val hiveTable = + hiveContext.sessionState.catalog + .getTableMetadata(TableIdentifier(tabName, Some("default"))) + // It is a managed table, although it uses external in SQL + assert(hiveTable.tableType == CatalogTableType.MANAGED_TABLE) + + assert(tmpDir.listFiles.nonEmpty) + sql(s"DROP TABLE $tabName") + // The data are deleted since the table type is not EXTERNAL + assert(tmpDir.listFiles == null) + } + } + } + + test("drop external data source table") { + withTempDir { tmpDir => + val tabName = "tab1" + withTable(tabName) { + assert(tmpDir.listFiles.isEmpty) + + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { + Seq(1 -> "a").toDF("i", "j") + .write + .mode(SaveMode.Overwrite) + .format("parquet") + .option("path", tmpDir.toString) + .saveAsTable(tabName) + } + + val hiveTable = + hiveContext.sessionState.catalog + .getTableMetadata(TableIdentifier(tabName, Some("default"))) + // This data source table is external table + assert(hiveTable.tableType == CatalogTableType.EXTERNAL_TABLE) + + assert(tmpDir.listFiles.nonEmpty) + sql(s"DROP TABLE $tabName") + // The data are not deleted since the table type is EXTERNAL + assert(tmpDir.listFiles.nonEmpty) + } + } + } + + test("drop views") { + withTable("tab1") { + val tabName = "tab1" + sqlContext.range(10).write.saveAsTable("tab1") + withView("view1") { + val viewName = "view1" + + assert(tableDirectoryExists(TableIdentifier(tabName))) + assert(!tableDirectoryExists(TableIdentifier(viewName))) + sql(s"CREATE VIEW $viewName AS SELECT * FROM tab1") + + assert(tableDirectoryExists(TableIdentifier(tabName))) + assert(!tableDirectoryExists(TableIdentifier(viewName))) + sql(s"DROP VIEW $viewName") + + assert(tableDirectoryExists(TableIdentifier(tabName))) + sql(s"DROP VIEW IF EXISTS $viewName") + } + } + } + + test("drop table using drop view") { + withTable("tab1") { + sql("CREATE TABLE tab1(c1 int)") + val message = intercept[AnalysisException] { + sql("DROP VIEW tab1") + }.getMessage + assert(message.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")) + } + } + + test("drop view using drop table") { + withTable("tab1") { + sqlContext.range(10).write.saveAsTable("tab1") + withView("view1") { + sql("CREATE VIEW view1 AS SELECT * FROM tab1") + val message = intercept[AnalysisException] { + sql("DROP TABLE view1") + }.getMessage + assert(message.contains("Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")) + } + } + } +} |