aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-10 11:25:39 -0700
committerAndrew Or <andrew@databricks.com>2016-05-10 11:25:55 -0700
commit5c6b0855787c080d3e233eb09c05c025395e7cb3 (patch)
treeba75170f0e9629e540d9ef5924fbcea185807637
parented0b4070fb50054b1ecf66ff6c32458a4967dfd3 (diff)
downloadspark-5c6b0855787c080d3e233eb09c05c025395e7cb3.tar.gz
spark-5c6b0855787c080d3e233eb09c05c025395e7cb3.tar.bz2
spark-5c6b0855787c080d3e233eb09c05c025395e7cb3.zip
[SPARK-14603][SQL] Verification of Metadata Operations by Session Catalog
Since we cannot really trust if the underlying external catalog can throw exceptions when there is an invalid metadata operation, let's do it in SessionCatalog. - [X] The first step is to unify the error messages issued in Hive-specific Session Catalog and general Session Catalog. - [X] The second step is to verify the inputs of metadata operations for partitioning-related operations. This is moved to a separate PR: https://github.com/apache/spark/pull/12801 - [X] The third step is to add database existence verification in `SessionCatalog` - [X] The fourth step is to add table existence verification in `SessionCatalog` - [X] The fifth step is to add function existence verification in `SessionCatalog` Add test cases and verify the error messages we issued Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12385 from gatorsmile/verifySessionAPIs.
-rw-r--r--python/pyspark/sql/utils.py2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala37
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala84
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala148
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala30
10 files changed, 261 insertions, 123 deletions
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index cb172d21f3..36c93228b9 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -61,6 +61,8 @@ def capture_sql_exception(f):
e.java_exception.getStackTrace()))
if s.startswith('org.apache.spark.sql.AnalysisException: '):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
+ if s.startswith('org.apache.spark.sql.catalyst.analysis.NoSuchTableException: '):
+ raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
new file mode 100644
index 0000000000..ec56fe7729
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+
+/**
+ * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+class DatabaseAlreadyExistsException(db: String)
+ extends AnalysisException(s"Database '$db' already exists")
+
+class TableAlreadyExistsException(db: String, table: String)
+ extends AnalysisException(s"Table or view '$table' already exists in database '$db'")
+
+class TempTableAlreadyExistsException(table: String)
+ extends AnalysisException(s"Temporary table '$table' already exists")
+
+class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
+ extends AnalysisException(
+ s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
+
+class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
+ extends AnalysisException(
+ s"The following partitions already exists in table '$table' database '$db':\n"
+ + specs.mkString("\n===\n"))
+
+class FunctionAlreadyExistsException(db: String, func: String)
+ extends AnalysisException(s"Function '$func' already exists in database '$db'")
+
+class TempFunctionAlreadyExistsException(func: String)
+ extends AnalysisException(s"Temporary function '$func' already exists")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index ff13bcec43..8febdcaee8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -44,3 +44,11 @@ class NoSuchFunctionException(db: String, func: String)
extends AnalysisException(
s"Undefined function: '$func'. This function is neither a registered temporary function nor " +
s"a permanent function registered in the database '$db'.")
+
+class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec])
+ extends AnalysisException(
+ s"The following partitions not found in table '$table' database '$db':\n"
+ + specs.mkString("\n===\n"))
+
+class NoSuchTempFunctionException(func: String)
+ extends AnalysisException(s"Temporary function '$func' not found")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 178ae6d7c2..81974b282b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
/**
@@ -27,14 +27,14 @@ import org.apache.spark.sql.AnalysisException
* can be accessed in multiple threads. This is an external catalog because it is expected to
* interact with external systems.
*
- * Implementations should throw [[AnalysisException]] when table or database don't exist.
+ * Implementations should throw [[NoSuchDatabaseException]] when table or database don't exist.
*/
abstract class ExternalCatalog {
import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
- throw new AnalysisException(s"Database '$db' does not exist")
+ throw new NoSuchDatabaseException(db)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 982b035f18..21da55cbc3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.util.StringUtils
/**
@@ -60,29 +61,25 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!functionExists(db, funcName)) {
- throw new AnalysisException(
- s"Function not found: '$funcName' does not exist in database '$db'")
+ throw new NoSuchFunctionException(db = db, func = funcName)
}
}
private def requireFunctionNotExists(db: String, funcName: String): Unit = {
if (functionExists(db, funcName)) {
- throw new AnalysisException(
- s"Function already exists: '$funcName' exists in database '$db'")
+ throw new FunctionAlreadyExistsException(db = db, func = funcName)
}
}
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
- throw new AnalysisException(
- s"Table or view not found: '$table' does not exist in database '$db'")
+ throw new NoSuchTableException(db = db, table = table)
}
}
private def requireTableNotExists(db: String, table: String): Unit = {
if (tableExists(db, table)) {
- throw new AnalysisException(
- s"Table or view exists: '$table' exists in database '$db'")
+ throw new TableAlreadyExistsException(db = db, table = table)
}
}
@@ -92,8 +89,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
specs: Seq[TablePartitionSpec]): Unit = {
specs foreach { s =>
if (!partitionExists(db, table, s)) {
- throw new AnalysisException(
- s"Partition not found: database '$db' table '$table' does not contain: '$s'")
+ throw new NoSuchPartitionException(db = db, table = table, spec = s)
}
}
}
@@ -104,8 +100,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
specs: Seq[TablePartitionSpec]): Unit = {
specs foreach { s =>
if (partitionExists(db, table, s)) {
- throw new AnalysisException(
- s"Partition exists: database '$db' table '$table' already contains: '$s'")
+ throw new PartitionAlreadyExistsException(db = db, table = table, spec = s)
}
}
}
@@ -121,7 +116,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
- throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.")
+ throw new DatabaseAlreadyExistsException(dbDefinition.name)
}
} else {
try {
@@ -161,7 +156,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
catalog.remove(db)
} else {
if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Database '$db' does not exist")
+ throw new NoSuchDatabaseException(db)
}
}
}
@@ -202,7 +197,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
- throw new AnalysisException(s"Table '$table' already exists in database '$db'")
+ throw new TableAlreadyExistsException(db = db, table = table)
}
} else {
if (tableDefinition.tableType == CatalogTableType.MANAGED) {
@@ -238,7 +233,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Table or view '$table' does not exist in database '$db'")
+ throw new NoSuchTableException(db = db, table = table)
}
}
}
@@ -328,9 +323,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
if (dupSpecs.nonEmpty) {
- val dupSpecsStr = dupSpecs.mkString("\n===\n")
- throw new AnalysisException("The following partitions already exist in database " +
- s"'$db' table '$table':\n$dupSpecsStr")
+ throw new PartitionsAlreadyExistException(db = db, table = table, specs = dupSpecs)
}
}
@@ -365,9 +358,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
if (missingSpecs.nonEmpty) {
- val missingSpecsStr = missingSpecs.mkString("\n===\n")
- throw new AnalysisException("The following partitions do not exist in database " +
- s"'$db' table '$table':\n$missingSpecsStr")
+ throw new NoSuchPartitionsException(db = db, table = table, specs = missingSpecs)
}
}
@@ -467,7 +458,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (functionExists(db, func.identifier.funcName)) {
- throw new AnalysisException(s"Function '$func' already exists in '$db' database")
+ throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName)
} else {
catalog(db).functions.put(func.identifier.funcName, func)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b267798e7d..7505e2c236 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, NoSuchPermanentFunctionException, SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -111,6 +111,25 @@ class SessionCatalog(
fs.makeQualified(hadoopPath)
}
+ protected[this] def requireDbExists(db: String): Unit = {
+ if (!databaseExists(db)) {
+ throw new NoSuchDatabaseException(db)
+ }
+ }
+
+ protected[this] def requireTableExists(name: TableIdentifier): Unit = {
+ if (!tableExists(name)) {
+ val db = name.database.getOrElse(currentDb)
+ throw new NoSuchTableException(db = db, table = name.table)
+ }
+ }
+
+ private def requireTableNotExists(name: TableIdentifier): Unit = {
+ if (tableExists(name)) {
+ val db = name.database.getOrElse(currentDb)
+ throw new TableAlreadyExistsException(db = db, table = name.table)
+ }
+ }
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -135,11 +154,13 @@ class SessionCatalog(
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
+ requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
}
def getDatabaseMetadata(db: String): CatalogDatabase = {
val dbName = formatDatabaseName(db)
+ requireDbExists(dbName)
externalCatalog.getDatabase(dbName)
}
@@ -160,9 +181,7 @@ class SessionCatalog(
def setCurrentDatabase(db: String): Unit = {
val dbName = formatDatabaseName(db)
- if (!databaseExists(dbName)) {
- throw new AnalysisException(s"Database '$dbName' does not exist.")
- }
+ requireDbExists(dbName)
synchronized { currentDb = dbName }
}
@@ -196,6 +215,7 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+ requireDbExists(db)
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
}
@@ -211,18 +231,23 @@ class SessionCatalog(
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
- val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+ val tableIdentifier = TableIdentifier(table, Some(db))
+ val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier)
+ requireDbExists(db)
+ requireTableExists(tableIdentifier)
externalCatalog.alterTable(db, newTableDefinition)
}
/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
- * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
@@ -234,13 +259,14 @@ class SessionCatalog(
def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
+ requireDbExists(db)
externalCatalog.getTableOption(db, table)
}
/**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
- * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown.
*/
def loadTable(
name: TableIdentifier,
@@ -249,13 +275,15 @@ class SessionCatalog(
holdDDLTime: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
}
/**
* Load files stored in given path into the partition of an existing metastore table.
* If no database is specified, assume the table is in the current database.
- * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown.
*/
def loadPartition(
name: TableIdentifier,
@@ -267,6 +295,8 @@ class SessionCatalog(
isSkewedStoreAsSubdir: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime,
inheritTableSpecs, isSkewedStoreAsSubdir)
}
@@ -291,7 +321,7 @@ class SessionCatalog(
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempTables.contains(table) && !overrideIfExists) {
- throw new AnalysisException(s"Temporary table '$name' already exists.")
+ throw new TempTableAlreadyExistsException(name)
}
tempTables.put(table, tableDefinition)
}
@@ -307,6 +337,7 @@ class SessionCatalog(
*/
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
+ requireDbExists(db)
val newDb = formatDatabaseName(newName.database.getOrElse(currentDb))
if (db != newDb) {
throw new AnalysisException(
@@ -315,6 +346,8 @@ class SessionCatalog(
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName.table)
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
+ requireTableExists(TableIdentifier(oldTableName, Some(db)))
+ requireTableNotExists(TableIdentifier(newTableName, Some(db)))
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
@@ -343,12 +376,13 @@ class SessionCatalog(
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
+ requireDbExists(db)
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
- if (externalCatalog.tableExists(db, table)) {
+ if (tableExists(TableIdentifier(table, Option(db)))) {
externalCatalog.dropTable(db, table, ignoreIfNotExists = true)
} else if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Table or view '${name.quotedString}' does not exist")
+ throw new NoSuchTableException(db = db, table = table)
}
} else {
tempTables.remove(table)
@@ -418,6 +452,7 @@ class SessionCatalog(
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbName = formatDatabaseName(db)
+ requireDbExists(dbName)
val dbTables =
externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
synchronized {
@@ -477,6 +512,8 @@ class SessionCatalog(
ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
@@ -490,6 +527,8 @@ class SessionCatalog(
ignoreIfNotExists: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
}
@@ -505,6 +544,8 @@ class SessionCatalog(
newSpecs: Seq[TablePartitionSpec]): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
@@ -520,6 +561,8 @@ class SessionCatalog(
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.alterPartitions(db, table, parts)
}
@@ -530,6 +573,8 @@ class SessionCatalog(
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.getPartition(db, table, spec)
}
@@ -545,6 +590,8 @@ class SessionCatalog(
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.listPartitions(db, table, partialSpec)
}
@@ -567,12 +614,13 @@ class SessionCatalog(
*/
def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+ requireDbExists(db)
val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
val newFuncDefinition = funcDefinition.copy(identifier = identifier)
if (!functionExists(identifier)) {
externalCatalog.createFunction(db, newFuncDefinition)
} else if (!ignoreIfExists) {
- throw new AnalysisException(s"Function '$identifier' already exists in database '$db'")
+ throw new FunctionAlreadyExistsException(db = db, func = identifier.toString)
}
}
@@ -582,6 +630,7 @@ class SessionCatalog(
*/
def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ requireDbExists(db)
val identifier = name.copy(database = Some(db))
if (functionExists(identifier)) {
// TODO: registry should just take in FunctionIdentifier for type safety
@@ -594,7 +643,7 @@ class SessionCatalog(
}
externalCatalog.dropFunction(db, name.funcName)
} else if (!ignoreIfNotExists) {
- throw new AnalysisException(s"function '$identifier' does not exist in database '$db'")
+ throw new NoSuchFunctionException(db = db, func = identifier.toString)
}
}
@@ -606,6 +655,7 @@ class SessionCatalog(
*/
def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ requireDbExists(db)
externalCatalog.getFunction(db, name.funcName)
}
@@ -614,6 +664,7 @@ class SessionCatalog(
*/
def functionExists(name: FunctionIdentifier): Boolean = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ requireDbExists(db)
functionRegistry.functionExists(name.unquotedString) ||
externalCatalog.functionExists(db, name.funcName)
}
@@ -654,7 +705,7 @@ class SessionCatalog(
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
- throw new AnalysisException(s"Temporary function '$name' already exists.")
+ throw new TempFunctionAlreadyExistsException(name)
}
functionRegistry.registerFunction(name, info, funcDefinition)
}
@@ -664,8 +715,7 @@ class SessionCatalog(
*/
def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
- throw new AnalysisException(
- s"Temporary function '$name' cannot be dropped because it does not exist!")
+ throw new NoSuchTempFunctionException(name)
}
}
@@ -684,6 +734,7 @@ class SessionCatalog(
.orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
.getOrElse {
val db = qualifiedName.database.get
+ requireDbExists(db)
if (externalCatalog.functionExists(db, name.funcName)) {
val metadata = externalCatalog.getFunction(db, name.funcName)
new ExpressionInfo(metadata.className, qualifiedName.unquotedString)
@@ -760,6 +811,7 @@ class SessionCatalog(
*/
def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
val dbName = formatDatabaseName(db)
+ requireDbExists(dbName)
val dbFunctions = externalCatalog.listFunctions(dbName, pattern)
.map { f => FunctionIdentifier(f, Some(dbName)) }
val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index a704ca78f9..f2d2e99a3c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
@@ -69,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database should throw exception when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.getDatabaseMetadata("db_that_does_not_exist")
}
}
@@ -120,7 +121,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop database when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
}
catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
@@ -140,8 +141,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter database should throw exception when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.alterDatabase(newDb("does_not_exist"))
+ intercept[NoSuchDatabaseException] {
+ catalog.alterDatabase(newDb("unknown_db"))
}
}
@@ -150,7 +151,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.getCurrentDatabase == "default")
catalog.setCurrentDatabase("db2")
assert(catalog.getCurrentDatabase == "db2")
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.setCurrentDatabase("deebo")
}
catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
@@ -181,14 +182,14 @@ class SessionCatalogSuite extends SparkFunSuite {
test("create table when database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
// Creating table in non-existent database should always fail
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
}
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
}
// Table already exists
- intercept[AnalysisException] {
+ intercept[TableAlreadyExistsException] {
catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
}
catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true)
@@ -200,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable2 = Range(1, 20, 2, 10, Seq())
catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false)
- assert(catalog.getTempTable("tbl1") == Some(tempTable1))
- assert(catalog.getTempTable("tbl2") == Some(tempTable2))
- assert(catalog.getTempTable("tbl3") == None)
+ assert(catalog.getTempTable("tbl1") == Option(tempTable1))
+ assert(catalog.getTempTable("tbl2") == Option(tempTable2))
+ assert(catalog.getTempTable("tbl3").isEmpty)
// Temporary table already exists
- intercept[AnalysisException] {
+ intercept[TempTableAlreadyExistsException] {
catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
}
// Temporary table already exists but we override it
catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true)
- assert(catalog.getTempTable("tbl1") == Some(tempTable2))
+ assert(catalog.getTempTable("tbl1") == Option(tempTable2))
}
test("drop table") {
@@ -227,13 +228,13 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
// Should always throw exception when the database does not exist
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false)
}
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
}
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
@@ -281,15 +282,20 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.renameTable(
TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
}
+ // The new table already exists
+ intercept[TableAlreadyExistsException] {
+ sessionCatalog.renameTable(
+ TableIdentifier("tblone", Some("db2")), TableIdentifier("table_two", Some("db2")))
+ }
}
test("rename table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.renameTable(
TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2", Some("unknown_db")))
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.renameTable(
TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2", Some("db2")))
}
@@ -301,18 +307,18 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable = Range(1, 10, 2, 10, Seq())
sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be renamed first
sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
- assert(sessionCatalog.getTempTable("tbl1") == None)
- assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
+ assert(sessionCatalog.getTempTable("tbl1").isEmpty)
+ assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is specified, temp tables are never renamed
sessionCatalog.renameTable(
TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2")))
- assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
- assert(sessionCatalog.getTempTable("tbl4") == None)
+ assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
+ assert(sessionCatalog.getTempTable("tbl4").isEmpty)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
}
@@ -334,10 +340,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.alterTable(newTable("tbl1", "unknown_db"))
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.alterTable(newTable("unknown_table", "db2"))
}
}
@@ -355,14 +361,25 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
}
}
+ test("get option of table metadata") {
+ val externalCatalog = newBasicCatalog()
+ val catalog = new SessionCatalog(externalCatalog)
+ assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2")))
+ == Option(externalCatalog.getTable("db2", "tbl1")))
+ assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty)
+ intercept[NoSuchDatabaseException] {
+ catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db")))
+ }
+ }
+
test("lookup table relation") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
@@ -427,7 +444,7 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl4"),
TableIdentifier("tbl1", Some("db2")),
TableIdentifier("tbl2", Some("db2"))))
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.listTables("unknown_db")
}
}
@@ -446,7 +463,7 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2"))))
assert(catalog.listTables("db2", "*1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.listTables("unknown_db", "*")
}
}
@@ -471,11 +488,11 @@ class SessionCatalogSuite extends SparkFunSuite {
test("create partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createPartitions(
- TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfExists = false)
+ TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false)
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.createPartitions(
TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false)
}
@@ -520,13 +537,13 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropPartitions(
- TableIdentifier("tbl1", Some("does_not_exist")),
+ TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
ignoreIfNotExists = false)
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.dropPartitions(
TableIdentifier("does_not_exist", Some("db2")),
Seq(),
@@ -566,10 +583,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get partition when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.getPartition(TableIdentifier("tbl1", Some("does_not_exist")), part1.spec)
+ intercept[NoSuchDatabaseException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec)
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec)
}
}
@@ -606,11 +623,11 @@ class SessionCatalogSuite extends SparkFunSuite {
test("rename partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.renamePartitions(
- TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1.spec), Seq(part2.spec))
+ TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec))
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.renamePartitions(
TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec))
}
@@ -648,10 +665,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.alterPartitions(TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1))
+ intercept[NoSuchDatabaseException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1))
}
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1))
}
}
@@ -664,6 +681,16 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2))
}
+ test("list partitions when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[NoSuchDatabaseException] {
+ catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db")))
+ }
+ intercept[NoSuchTableException] {
+ catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2")))
+ }
+ }
+
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
@@ -682,7 +709,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("create function when database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createFunction(
newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
}
@@ -690,7 +717,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("create function that already exists") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[FunctionAlreadyExistsException] {
catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false)
}
catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true)
@@ -708,13 +735,13 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1))
assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3))
// Temporary function does not exist.
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.lookupFunction(FunctionIdentifier("temp3"), arguments)
}
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
val info3 = new ExpressionInfo("tempFunc3", "temp1")
// Temporary function already exists
- intercept[AnalysisException] {
+ intercept[TempFunctionAlreadyExistsException] {
catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
}
// Temporary function is overridden
@@ -740,11 +767,11 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropFunction(
- FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false)
+ FunctionIdentifier("something", Some("unknown_db")), ignoreIfNotExists = false)
}
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false)
}
catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true)
@@ -758,10 +785,10 @@ class SessionCatalogSuite extends SparkFunSuite {
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.lookupFunction(FunctionIdentifier("func1"), arguments)
}
- intercept[AnalysisException] {
+ intercept[NoSuchTempFunctionException] {
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
}
catalog.dropTempFunction("func1", ignoreIfNotExists = true)
@@ -780,10 +807,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist")))
+ intercept[NoSuchDatabaseException] {
+ catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("unknown_db")))
}
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2")))
}
}
@@ -796,7 +823,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.lookupFunction(
FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.lookupFunction(FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3)))
}
}
@@ -826,4 +853,11 @@ class SessionCatalogSuite extends SparkFunSuite {
FunctionIdentifier("func2", Some("db2"))))
}
+ test("list functions when database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[NoSuchDatabaseException] {
+ catalog.listFunctions("unknown_db", "func*")
+ }
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5fbab2382a..64b90b1ed6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@@ -212,10 +213,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
expectedLocation,
Map.empty))
- val message = intercept[AnalysisException] {
+ intercept[DatabaseAlreadyExistsException] {
sql(s"CREATE DATABASE $dbName")
- }.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+ }
} finally {
catalog.reset()
}
@@ -280,17 +280,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
var message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName")
}.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
message = intercept[AnalysisException] {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
}.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
message = intercept[AnalysisException] {
sql(s"DESCRIBE DATABASE EXTENDED $dbName")
}.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
sql(s"DROP DATABASE IF EXISTS $dbName")
}
@@ -1014,7 +1014,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("DROP DATABASE DeFault")
}.getMessage
if (caseSensitive == "true") {
- assert(message.contains("Database 'DeFault' does not exist"))
+ assert(message.contains("Database 'DeFault' not found"))
} else {
assert(message.contains("Can not drop default database"))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index b8fef23f54..1eed5b6a6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
@@ -122,10 +122,10 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
test("show tblproperties for datasource table - errors") {
- val message1 = intercept[AnalysisException] {
+ val message1 = intercept[NoSuchTableException] {
sql("SHOW TBLPROPERTIES badtable")
}.getMessage
- assert(message1.contains("'badtable' not found in database 'default'"))
+ assert(message1.contains("Table or view 'badtable' not found in database 'default'"))
// When key is not found, a row containing the error is returned.
checkAnswer(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2e4077df54..6ce5051cbd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -191,20 +191,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
allBuiltinFunctions.foreach { f =>
assert(allFunctions.contains(f))
}
- checkAnswer(sql("SHOW functions abs"), Row("abs"))
- checkAnswer(sql("SHOW functions 'abs'"), Row("abs"))
- checkAnswer(sql("SHOW functions abc.abs"), Row("abs"))
- checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
- checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
- checkAnswer(sql("SHOW functions `~`"), Row("~"))
- checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil)
- checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear"))
- // this probably will failed if we add more function with `sha` prefixing.
- checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
- // Test '|' for alternation.
- checkAnswer(
- sql("SHOW functions 'sha*|weekofyea*'"),
- Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil)
+ withTempDatabase { db =>
+ checkAnswer(sql("SHOW functions abs"), Row("abs"))
+ checkAnswer(sql("SHOW functions 'abs'"), Row("abs"))
+ checkAnswer(sql(s"SHOW functions $db.abs"), Row("abs"))
+ checkAnswer(sql(s"SHOW functions `$db`.`abs`"), Row("abs"))
+ checkAnswer(sql(s"SHOW functions `$db`.`abs`"), Row("abs"))
+ checkAnswer(sql("SHOW functions `~`"), Row("~"))
+ checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil)
+ checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear"))
+ // this probably will failed if we add more function with `sha` prefixing.
+ checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
+ // Test '|' for alternation.
+ checkAnswer(
+ sql("SHOW functions 'sha*|weekofyea*'"),
+ Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil)
+ }
}
test("describe functions - built-in functions") {