aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-08-30 17:27:00 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-30 17:27:00 +0800
commitbca79c823024c41731ec89f96a3722d7b1c99639 (patch)
treeeb9cc2753226d8ba2ff25ba8e836676ccb8a7802
parent94922d79e9f90fac3777db0974ccf7566b8ac3b3 (diff)
downloadspark-bca79c823024c41731ec89f96a3722d7b1c99639.tar.gz
spark-bca79c823024c41731ec89f96a3722d7b1c99639.tar.bz2
spark-bca79c823024c41731ec89f96a3722d7b1c99639.zip
[SPARK-17234][SQL] Table Existence Checking when Index Table with the Same Name Exists
### What changes were proposed in this pull request? Hive Index tables are not supported by Spark SQL. Thus, we issue an exception when users try to access Hive Index tables. When the internal function `tableExists` tries to access Hive Index tables, it always gets the same error message: ```Hive index table is not supported```. This message could be confusing to users, since their SQL operations could be completely unrelated to Hive Index tables. For example, when users try to alter a table to a new name and there exists an index table with the same name, the expected exception should be a `TableAlreadyExistsException`. This PR made the following changes: - Introduced a new `AnalysisException` type: `SQLFeatureNotSupportedException`. When users try to access an `Index Table`, we will issue a `SQLFeatureNotSupportedException`. - `tableExists` returns `true` when hitting a `SQLFeatureNotSupportedException` and the feature is `Hive index table`. - Add a checking `requireTableNotExists` for `SessionCatalog`'s `createTable` API; otherwise, the current implementation relies on the Hive's internal checking. ### How was this patch tested? Added a test case Author: gatorsmile <gatorsmile@gmail.com> Closes #14801 from gatorsmile/tableExists.
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala33
6 files changed, 62 insertions, 1 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 54365fd978..19f8665383 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -162,6 +163,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(actual.tableType === CatalogTableType.EXTERNAL)
}
+ test("create table when the table already exists") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ val table = newTable("tbl1", "db2")
+ intercept[TableAlreadyExistsException] {
+ catalog.createTable(table, ignoreIfExists = false)
+ }
+ }
+
test("drop table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7f50e38d30..ed87ac3c3e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
@@ -171,9 +172,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
ignoreIfExists: Boolean): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
+ val table = tableDefinition.identifier.table
requireDbExists(db)
verifyTableProperties(tableDefinition)
+ if (tableExists(db, table) && !ignoreIfExists) {
+ throw new TableAlreadyExistsException(db = db, table = table)
+ }
// Before saving data source table metadata into Hive metastore, we should:
// 1. Put table schema, partition column names and bucket specification in table properties.
// 2. Check if this table is hive compatible
@@ -450,7 +455,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
override def tableExists(db: String, table: String): Boolean = withClient {
- client.getTableOption(db, table).isDefined
+ client.tableExists(db, table)
}
override def listTables(db: String): Seq[String] = withClient {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f009d714b..dc74fa257a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -68,6 +68,9 @@ private[hive] trait HiveClient {
/** List the names of all the databases that match the specified pattern. */
def listDatabases(pattern: String): Seq[String]
+ /** Return whether a table/view with the specified name exists. */
+ def tableExists(dbName: String, tableName: String): Boolean
+
/** Returns the specified table, or throws [[NoSuchTableException]]. */
final def getTable(dbName: String, tableName: String): CatalogTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b45ad30dca..dd982192a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -347,6 +347,10 @@ private[hive] class HiveClientImpl(
client.getDatabasesByPattern(pattern).asScala
}
+ override def tableExists(dbName: String, tableName: String): Boolean = withHiveState {
+ Option(client.getTable(dbName, tableName, false /* do not throw exception */)).nonEmpty
+ }
+
override def getTableOption(
dbName: String,
tableName: String): Option[CatalogTable] = withHiveState {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a2509f2a75..10b6cd1024 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -218,6 +218,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
holdDDLTime = false)
}
+ test(s"$version: tableExists") {
+ // No exception should be thrown
+ assert(client.tableExists("default", "src"))
+ assert(!client.tableExists("default", "nonexistent"))
+ }
+
test(s"$version: getTable") {
// No exception should be thrown
client.getTable("default", "src")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9019333d76..58c43ebcae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -24,8 +24,10 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -675,6 +677,37 @@ class HiveDDLSuite
}
}
+ test("create table with the same name as an index table") {
+ val tabName = "tab1"
+ val indexName = tabName + "_index"
+ withTable(tabName) {
+ // Spark SQL does not support creating index. Thus, we have to use Hive client.
+ val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ sql(s"CREATE TABLE $tabName(a int)")
+
+ try {
+ client.runSqlHive(
+ s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD")
+ val indexTabName =
+ spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table
+ intercept[TableAlreadyExistsException] {
+ sql(s"CREATE TABLE $indexTabName(b int)")
+ }
+ intercept[TableAlreadyExistsException] {
+ sql(s"ALTER TABLE $tabName RENAME TO $indexTabName")
+ }
+
+ // When tableExists is not invoked, we still can get an AnalysisException
+ val e = intercept[AnalysisException] {
+ sql(s"DESCRIBE $indexTabName")
+ }.getMessage
+ assert(e.contains("Hive index table is not supported."))
+ } finally {
+ client.runSqlHive(s"DROP INDEX IF EXISTS $indexName ON $tabName")
+ }
+ }
+ }
+
test("desc table for data source table - no user-defined schema") {
Seq("parquet", "json", "orc").foreach { fileFormat =>
withTable("t1") {