aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-03-16 18:02:43 -0700
committerYin Huai <yhuai@databricks.com>2016-03-16 18:02:43 -0700
commitca9ef86c84ee84263f437a979017898f4bed0feb (patch)
tree8dbcb871a0664a2375bd59ffbecdbf7bdc23c4d8 /sql/catalyst
parent92b70576eabf8ff94ac476e2b3c66f8b3d28e79e (diff)
downloadspark-ca9ef86c84ee84263f437a979017898f4bed0feb.tar.gz
spark-ca9ef86c84ee84263f437a979017898f4bed0feb.tar.bz2
spark-ca9ef86c84ee84263f437a979017898f4bed0feb.zip
[SPARK-13923][SQL] Implement SessionCatalog
## What changes were proposed in this pull request? As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`. A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands. ## How was this patch tested? 800+ lines of tests in `SessionCatalogSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11750 from andrewor14/temp-catalog.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala51
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala469
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala29
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala68
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala171
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala864
8 files changed, 1555 insertions, 141 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
deleted file mode 100644
index 4d4e4ded99..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-/**
- * Identifies a `table` in `database`. If `database` is not defined, the current database is used.
- */
-private[sql] case class TableIdentifier(table: String, database: Option[String]) {
- def this(table: String) = this(table, None)
-
- override def toString: String = quotedString
-
- def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
-
- def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
-}
-
-private[sql] object TableIdentifier {
- def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f3fa7958db..7ead1ddebe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
/**
@@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog {
private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
- throw new AnalysisException(s"Function $funcName does not exist in $db database")
+ throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
}
}
private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
- throw new AnalysisException(s"Table $table does not exist in $db database")
+ throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
- throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
+ throw new AnalysisException(
+ s"Partition does not exist in database '$db' table '$table': '$spec'")
}
}
@@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
- throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
+ throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.")
}
} else {
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
@@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog {
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
- throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
+ throw new AnalysisException(s"Database '$db' is not empty. One or more tables exist.")
}
if (catalog(db).functions.nonEmpty) {
- throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
+ throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.")
}
}
// Remove the database.
catalog.remove(db)
} else {
if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Database $db does not exist")
+ throw new AnalysisException(s"Database '$db' does not exist")
}
}
}
@@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
- if (existsTable(db, tableDefinition.name)) {
+ val table = tableDefinition.name.table
+ if (existsTable(db, table)) {
if (!ignoreIfExists) {
- throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
+ throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
} else {
- catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
+ catalog(db).tables.put(table, new TableDesc(tableDefinition))
}
}
@@ -174,7 +177,7 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Table $table does not exist in $db database")
+ throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}
}
@@ -182,14 +185,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
- oldDesc.table = oldDesc.table.copy(name = newName)
+ oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
- requireTableExists(db, tableDefinition.name)
- catalog(db).tables(tableDefinition.name).table = tableDefinition
+ requireTableExists(db, tableDefinition.name.table)
+ catalog(db).tables(tableDefinition.name.table).table = tableDefinition
}
override def getTable(db: String, table: String): CatalogTable = synchronized {
@@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
if (dupSpecs.nonEmpty) {
val dupSpecsStr = dupSpecs.mkString("\n===\n")
- throw new AnalysisException(
- s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
+ throw new AnalysisException("The following partitions already exist in database " +
+ s"'$db' table '$table':\n$dupSpecsStr")
}
}
parts.foreach { p => existingParts.put(p.spec, p) }
@@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
if (missingSpecs.nonEmpty) {
val missingSpecsStr = missingSpecs.mkString("\n===\n")
- throw new AnalysisException(
- s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
+ throw new AnalysisException("The following partitions do not exist in database " +
+ s"'$db' table '$table':\n$missingSpecsStr")
}
}
partSpecs.foreach(existingParts.remove)
@@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog {
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (existsFunction(db, func.name)) {
- throw new AnalysisException(s"Function $func already exists in $db database")
+ if (existsFunction(db, func.name.funcName)) {
+ throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
- catalog(db).functions.put(func.name, func)
+ catalog(db).functions.put(func.name.funcName, func)
}
}
@@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
- val newFunc = getFunction(db, oldName).copy(name = newName)
+ val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}
override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
- requireFunctionExists(db, funcDefinition.name)
- catalog(db).functions.put(funcDefinition.name, funcDefinition)
+ requireFunctionExists(db, funcDefinition.name.funcName)
+ catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
}
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
new file mode 100644
index 0000000000..4dec0429bd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+
+
+/**
+ * An internal catalog that is used by a Spark Session. This internal catalog serves as a
+ * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
+ * tables and functions of the Spark Session that it belongs to.
+ */
+class SessionCatalog(externalCatalog: ExternalCatalog) {
+ import ExternalCatalog._
+
+ private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+ private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+
+ // Note: we track current database here because certain operations do not explicitly
+ // specify the database (e.g. DROP TABLE my_table). In these cases we must first
+ // check whether the temporary table or function exists, then, if not, operate on
+ // the corresponding item in the current database.
+ private[this] var currentDb = "default"
+
+ // ----------------------------------------------------------------------------
+ // Databases
+ // ----------------------------------------------------------------------------
+ // All methods in this category interact directly with the underlying catalog.
+ // ----------------------------------------------------------------------------
+
+ def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+ externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+ }
+
+ def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+ externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
+ }
+
+ def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
+ externalCatalog.alterDatabase(dbDefinition)
+ }
+
+ def getDatabase(db: String): CatalogDatabase = {
+ externalCatalog.getDatabase(db)
+ }
+
+ def databaseExists(db: String): Boolean = {
+ externalCatalog.databaseExists(db)
+ }
+
+ def listDatabases(): Seq[String] = {
+ externalCatalog.listDatabases()
+ }
+
+ def listDatabases(pattern: String): Seq[String] = {
+ externalCatalog.listDatabases(pattern)
+ }
+
+ def getCurrentDatabase: String = currentDb
+
+ def setCurrentDatabase(db: String): Unit = {
+ if (!databaseExists(db)) {
+ throw new AnalysisException(s"cannot set current database to non-existent '$db'")
+ }
+ currentDb = db
+ }
+
+ // ----------------------------------------------------------------------------
+ // Tables
+ // ----------------------------------------------------------------------------
+ // There are two kinds of tables, temporary tables and metastore tables.
+ // Temporary tables are isolated across sessions and do not belong to any
+ // particular database. Metastore tables can be used across multiple
+ // sessions as their metadata is persisted in the underlying catalog.
+ // ----------------------------------------------------------------------------
+
+ // ----------------------------------------------------
+ // | Methods that interact with metastore tables only |
+ // ----------------------------------------------------
+
+ /**
+ * Create a metastore table in the database specified in `tableDefinition`.
+ * If no such database is specified, create it in the current database.
+ */
+ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ val db = tableDefinition.name.database.getOrElse(currentDb)
+ val newTableDefinition = tableDefinition.copy(
+ name = TableIdentifier(tableDefinition.name.table, Some(db)))
+ externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
+ }
+
+ /**
+ * Alter the metadata of an existing metastore table identified by `tableDefinition`.
+ *
+ * If no database is specified in `tableDefinition`, assume the table is in the
+ * current database.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
+ */
+ def alterTable(tableDefinition: CatalogTable): Unit = {
+ val db = tableDefinition.name.database.getOrElse(currentDb)
+ val newTableDefinition = tableDefinition.copy(
+ name = TableIdentifier(tableDefinition.name.table, Some(db)))
+ externalCatalog.alterTable(db, newTableDefinition)
+ }
+
+ /**
+ * Retrieve the metadata of an existing metastore table.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def getTable(name: TableIdentifier): CatalogTable = {
+ val db = name.database.getOrElse(currentDb)
+ externalCatalog.getTable(db, name.table)
+ }
+
+ // -------------------------------------------------------------
+ // | Methods that interact with temporary and metastore tables |
+ // -------------------------------------------------------------
+
+ /**
+ * Create a temporary table.
+ */
+ def createTempTable(
+ name: String,
+ tableDefinition: LogicalPlan,
+ ignoreIfExists: Boolean): Unit = {
+ if (tempTables.containsKey(name) && !ignoreIfExists) {
+ throw new AnalysisException(s"Temporary table '$name' already exists.")
+ }
+ tempTables.put(name, tableDefinition)
+ }
+
+ /**
+ * Rename a table.
+ *
+ * If a database is specified in `oldName`, this will rename the table in that database.
+ * If no database is specified, this will first attempt to rename a temporary table with
+ * the same name, then, if that does not exist, rename the table in the current database.
+ *
+ * This assumes the database specified in `oldName` matches the one specified in `newName`.
+ */
+ def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
+ if (oldName.database != newName.database) {
+ throw new AnalysisException("rename does not support moving tables across databases")
+ }
+ val db = oldName.database.getOrElse(currentDb)
+ if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
+ externalCatalog.renameTable(db, oldName.table, newName.table)
+ } else {
+ val table = tempTables.remove(oldName.table)
+ tempTables.put(newName.table, table)
+ }
+ }
+
+ /**
+ * Drop a table.
+ *
+ * If a database is specified in `name`, this will drop the table from that database.
+ * If no database is specified, this will first attempt to drop a temporary table with
+ * the same name, then, if that does not exist, drop the table from the current database.
+ */
+ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+ externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
+ } else {
+ tempTables.remove(name.table)
+ }
+ }
+
+ /**
+ * Return a [[LogicalPlan]] that represents the given table.
+ *
+ * If a database is specified in `name`, this will return the table from that database.
+ * If no database is specified, this will first attempt to return a temporary table with
+ * the same name, then, if that does not exist, return the table from the current database.
+ */
+ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
+ val db = name.database.getOrElse(currentDb)
+ val relation =
+ if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+ val metadata = externalCatalog.getTable(db, name.table)
+ CatalogRelation(db, metadata, alias)
+ } else {
+ tempTables.get(name.table)
+ }
+ val tableWithQualifiers = SubqueryAlias(name.table, relation)
+ // If an alias was specified by the lookup, wrap the plan in a subquery so that
+ // attributes are properly qualified with this alias.
+ alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
+ }
+
+ /**
+ * List all tables in the specified database, including temporary tables.
+ */
+ def listTables(db: String): Seq[TableIdentifier] = {
+ val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
+ val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
+ dbTables ++ _tempTables
+ }
+
+ /**
+ * List all matching tables in the specified database, including temporary tables.
+ */
+ def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
+ val dbTables =
+ externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
+ val regex = pattern.replaceAll("\\*", ".*").r
+ val _tempTables = tempTables.keys().asScala
+ .filter { t => regex.pattern.matcher(t).matches() }
+ .map { t => TableIdentifier(t) }
+ dbTables ++ _tempTables
+ }
+
+ /**
+ * Return a temporary table exactly as it was stored.
+ * For testing only.
+ */
+ private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
+ Option(tempTables.get(name))
+ }
+
+ // ----------------------------------------------------------------------------
+ // Partitions
+ // ----------------------------------------------------------------------------
+ // All methods in this category interact directly with the underlying catalog.
+ // These methods are concerned with only metastore tables.
+ // ----------------------------------------------------------------------------
+
+ // TODO: We need to figure out how these methods interact with our data source
+ // tables. For such tables, we do not store values of partitioning columns in
+ // the metastore. For now, partition values of a data source table will be
+ // automatically discovered when we load the table.
+
+ /**
+ * Create partitions in an existing table, assuming it exists.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
+ }
+
+ /**
+ * Drop partitions from a table, assuming they exist.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def dropPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
+ }
+
+ /**
+ * Override the specs of one or many existing table partitions, assuming they exist.
+ *
+ * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def renamePartitions(
+ tableName: TableIdentifier,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
+ }
+
+ /**
+ * Alter one or many table partitions whose specs that match those specified in `parts`,
+ * assuming the partitions exist.
+ *
+ * If no database is specified, assume the table is in the current database.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
+ */
+ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.alterPartitions(db, tableName.table, parts)
+ }
+
+ /**
+ * Retrieve the metadata of a table partition, assuming it exists.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.getPartition(db, tableName.table, spec)
+ }
+
+ /**
+ * List all partitions in a table, assuming it exists.
+ * If no database is specified, assume the table is in the current database.
+ */
+ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
+ val db = tableName.database.getOrElse(currentDb)
+ externalCatalog.listPartitions(db, tableName.table)
+ }
+
+ // ----------------------------------------------------------------------------
+ // Functions
+ // ----------------------------------------------------------------------------
+ // There are two kinds of functions, temporary functions and metastore
+ // functions (permanent UDFs). Temporary functions are isolated across
+ // sessions. Metastore functions can be used across multiple sessions as
+ // their metadata is persisted in the underlying catalog.
+ // ----------------------------------------------------------------------------
+
+ // -------------------------------------------------------
+ // | Methods that interact with metastore functions only |
+ // -------------------------------------------------------
+
+ /**
+ * Create a metastore function in the database specified in `funcDefinition`.
+ * If no such database is specified, create it in the current database.
+ */
+ def createFunction(funcDefinition: CatalogFunction): Unit = {
+ val db = funcDefinition.name.database.getOrElse(currentDb)
+ val newFuncDefinition = funcDefinition.copy(
+ name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+ externalCatalog.createFunction(db, newFuncDefinition)
+ }
+
+ /**
+ * Drop a metastore function.
+ * If no database is specified, assume the function is in the current database.
+ */
+ def dropFunction(name: FunctionIdentifier): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ externalCatalog.dropFunction(db, name.funcName)
+ }
+
+ /**
+ * Alter a metastore function whose name that matches the one specified in `funcDefinition`.
+ *
+ * If no database is specified in `funcDefinition`, assume the function is in the
+ * current database.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
+ */
+ def alterFunction(funcDefinition: CatalogFunction): Unit = {
+ val db = funcDefinition.name.database.getOrElse(currentDb)
+ val newFuncDefinition = funcDefinition.copy(
+ name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+ externalCatalog.alterFunction(db, newFuncDefinition)
+ }
+
+ // ----------------------------------------------------------------
+ // | Methods that interact with temporary and metastore functions |
+ // ----------------------------------------------------------------
+
+ /**
+ * Create a temporary function.
+ * This assumes no database is specified in `funcDefinition`.
+ */
+ def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
+ require(funcDefinition.name.database.isEmpty,
+ "attempted to create a temporary function while specifying a database")
+ val name = funcDefinition.name.funcName
+ if (tempFunctions.containsKey(name) && !ignoreIfExists) {
+ throw new AnalysisException(s"Temporary function '$name' already exists.")
+ }
+ tempFunctions.put(name, funcDefinition)
+ }
+
+ /**
+ * Drop a temporary function.
+ */
+ // TODO: The reason that we distinguish dropFunction and dropTempFunction is that
+ // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
+ // dropFunction and dropTempFunction.
+ def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
+ if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
+ throw new AnalysisException(
+ s"Temporary function '$name' cannot be dropped because it does not exist!")
+ }
+ tempFunctions.remove(name)
+ }
+
+ /**
+ * Rename a function.
+ *
+ * If a database is specified in `oldName`, this will rename the function in that database.
+ * If no database is specified, this will first attempt to rename a temporary function with
+ * the same name, then, if that does not exist, rename the function in the current database.
+ *
+ * This assumes the database specified in `oldName` matches the one specified in `newName`.
+ */
+ def renameFunction(oldName: FunctionIdentifier, newName: FunctionIdentifier): Unit = {
+ if (oldName.database != newName.database) {
+ throw new AnalysisException("rename does not support moving functions across databases")
+ }
+ val db = oldName.database.getOrElse(currentDb)
+ if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) {
+ externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
+ } else {
+ val func = tempFunctions.remove(oldName.funcName)
+ val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName))
+ tempFunctions.put(newName.funcName, newFunc)
+ }
+ }
+
+ /**
+ * Retrieve the metadata of an existing function.
+ *
+ * If a database is specified in `name`, this will return the function in that database.
+ * If no database is specified, this will first attempt to return a temporary function with
+ * the same name, then, if that does not exist, return the function in the current database.
+ */
+ def getFunction(name: FunctionIdentifier): CatalogFunction = {
+ val db = name.database.getOrElse(currentDb)
+ if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
+ externalCatalog.getFunction(db, name.funcName)
+ } else {
+ tempFunctions.get(name.funcName)
+ }
+ }
+
+ // TODO: implement lookupFunction that returns something from the registry itself
+
+ /**
+ * List all matching functions in the specified database, including temporary functions.
+ */
+ def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
+ val dbFunctions =
+ externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
+ val regex = pattern.replaceAll("\\*", ".*").r
+ val _tempFunctions = tempFunctions.keys().asScala
+ .filter { f => regex.pattern.matcher(f).matches() }
+ .map { f => FunctionIdentifier(f) }
+ dbFunctions ++ _tempFunctions
+ }
+
+ /**
+ * Return a temporary function. For testing only.
+ */
+ private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = {
+ Option(tempFunctions.get(name))
+ }
+
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index db34af3d26..c4e49614c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.catalog
import javax.annotation.Nullable
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
/**
@@ -167,7 +170,7 @@ abstract class ExternalCatalog {
* @param name name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
-case class CatalogFunction(name: String, className: String)
+case class CatalogFunction(name: FunctionIdentifier, className: String)
/**
@@ -211,8 +214,7 @@ case class CatalogTablePartition(
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class CatalogTable(
- specifiedDatabase: Option[String],
- name: String,
+ name: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
@@ -226,12 +228,12 @@ case class CatalogTable(
viewText: Option[String] = None) {
/** Return the database this table was specified to belong to, assuming it exists. */
- def database: String = specifiedDatabase.getOrElse {
+ def database: String = name.database.getOrElse {
throw new AnalysisException(s"table $name did not specify database")
}
/** Return the fully qualified name of this table, assuming the database was specified. */
- def qualifiedName: String = s"$database.$name"
+ def qualifiedName: String = name.unquotedString
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
@@ -272,3 +274,20 @@ object ExternalCatalog {
*/
type TablePartitionSpec = Map[String, String]
}
+
+
+/**
+ * A [[LogicalPlan]] that wraps [[CatalogTable]].
+ */
+case class CatalogRelation(
+ db: String,
+ metadata: CatalogTable,
+ alias: Option[String] = None)
+ extends LeafNode {
+
+ // TODO: implement this
+ override def output: Seq[Attribute] = Seq.empty
+
+ require(metadata.name.database == Some(db),
+ "provided database does not much the one specified in the table definition")
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
new file mode 100644
index 0000000000..87f4d1b007
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+
+/**
+ * An identifier that optionally specifies a database.
+ *
+ * Format (unquoted): "name" or "db.name"
+ * Format (quoted): "`name`" or "`db`.`name`"
+ */
+sealed trait IdentifierWithDatabase {
+ val name: String
+ def database: Option[String]
+ def quotedString: String = database.map(db => s"`$db`.`$name`").getOrElse(s"`$name`")
+ def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
+ override def toString: String = quotedString
+}
+
+
+/**
+ * Identifies a table in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class TableIdentifier(table: String, database: Option[String])
+ extends IdentifierWithDatabase {
+
+ override val name: String = table
+
+ def this(name: String) = this(name, None)
+
+}
+
+object TableIdentifier {
+ def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+}
+
+
+/**
+ * Identifies a function in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class FunctionIdentifier(funcName: String, database: Option[String])
+ extends IdentifierWithDatabase {
+
+ override val name: String = funcName
+
+ def this(name: String) = this(name, None)
+}
+
+object FunctionIdentifier {
+ def apply(funcName: String): FunctionIdentifier = new FunctionIdentifier(funcName)
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index b03ba81b50..a1ea61920d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -21,6 +21,8 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.util.Utils
/**
@@ -29,23 +31,10 @@ import org.apache.spark.sql.AnalysisException
* Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
*/
abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
- private lazy val storageFormat = CatalogStorageFormat(
- locationUri = None,
- inputFormat = Some(tableInputFormat),
- outputFormat = Some(tableOutputFormat),
- serde = None,
- serdeProperties = Map.empty)
- private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
- private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
- private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
- private val funcClass = "org.apache.spark.myFunc"
-
- // Things subclasses should override
- protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
- protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
- protected def newUriForDatabase(): String = "uri"
+ protected val utils: CatalogTestUtils
+ import utils._
+
protected def resetState(): Unit = { }
- protected def newEmptyCatalog(): ExternalCatalog
// Clear all state after each test
override def afterEach(): Unit = {
@@ -56,62 +45,6 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}
}
- /**
- * Creates a basic catalog, with the following structure:
- *
- * default
- * db1
- * db2
- * - tbl1
- * - tbl2
- * - part1
- * - part2
- * - func1
- */
- private def newBasicCatalog(): ExternalCatalog = {
- val catalog = newEmptyCatalog()
- // When testing against a real catalog, the default database may already exist
- catalog.createDatabase(newDb("default"), ignoreIfExists = true)
- catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
- catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
- catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("func1"))
- catalog
- }
-
- private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
-
- private def newDb(name: String): CatalogDatabase = {
- CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
- }
-
- private def newTable(name: String, db: String): CatalogTable = {
- CatalogTable(
- specifiedDatabase = Some(db),
- name = name,
- tableType = CatalogTableType.EXTERNAL_TABLE,
- storage = storageFormat,
- schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
- partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
- }
-
- private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
-
- /**
- * Whether the catalog's table partitions equal the ones given.
- * Note: Hive sets some random serde things, so we just compare the specs here.
- */
- private def catalogPartitionsEqual(
- catalog: ExternalCatalog,
- db: String,
- table: String,
- parts: Seq[CatalogTablePartition]): Boolean = {
- catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
- }
-
-
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
@@ -277,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}
test("get table") {
- assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
+ assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
}
test("get table when database/table does not exist") {
@@ -409,7 +342,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("alter partitions") {
val catalog = newBasicCatalog()
- try{
+ try {
// Note: Before altering table partitions in Hive, you *must* set the current database
// to the one that contains the table of interest. Otherwise you will end up with the
// most helpful error message ever: "Unable to alter partition. alter is not possible."
@@ -498,7 +431,8 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("get function") {
val catalog = newBasicCatalog()
- assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
+ assert(catalog.getFunction("db2", "func1") ==
+ CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
intercept[AnalysisException] {
catalog.getFunction("db2", "does_not_exist")
}
@@ -517,7 +451,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
- assert(catalog.getFunction("db2", newName).name == newName)
+ assert(catalog.getFunction("db2", newName).name.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}
@@ -553,3 +487,88 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}
}
+
+
+/**
+ * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
+ */
+abstract class CatalogTestUtils {
+
+ // Unimplemented methods
+ val tableInputFormat: String
+ val tableOutputFormat: String
+ def newEmptyCatalog(): ExternalCatalog
+
+ // These fields must be lazy because they rely on fields that are not implemented yet
+ lazy val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(tableInputFormat),
+ outputFormat = Some(tableOutputFormat),
+ serde = None,
+ serdeProperties = Map.empty)
+ lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+ lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+ lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+ lazy val funcClass = "org.apache.spark.myFunc"
+
+ /**
+ * Creates a basic catalog, with the following structure:
+ *
+ * default
+ * db1
+ * db2
+ * - tbl1
+ * - tbl2
+ * - part1
+ * - part2
+ * - func1
+ */
+ def newBasicCatalog(): ExternalCatalog = {
+ val catalog = newEmptyCatalog()
+ // When testing against a real catalog, the default database may already exist
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+ catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
+ catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1", Some("db2")))
+ catalog
+ }
+
+ def newFunc(): CatalogFunction = newFunc("funcName")
+
+ def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+ def newDb(name: String): CatalogDatabase = {
+ CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+ }
+
+ def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
+
+ def newTable(name: String, database: Option[String] = None): CatalogTable = {
+ CatalogTable(
+ name = TableIdentifier(name, database),
+ tableType = CatalogTableType.EXTERNAL_TABLE,
+ storage = storageFormat,
+ schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+ partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+ }
+
+ def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
+ CatalogFunction(FunctionIdentifier(name, database), funcClass)
+ }
+
+ /**
+ * Whether the catalog's table partitions equal the ones given.
+ * Note: Hive sets some random serde things, so we just compare the specs here.
+ */
+ def catalogPartitionsEqual(
+ catalog: ExternalCatalog,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Boolean = {
+ catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+ }
+
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 9531758ffd..63a7b2c661 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -17,7 +17,14 @@
package org.apache.spark.sql.catalyst.catalog
+
/** Test suite for the [[InMemoryCatalog]]. */
class InMemoryCatalogSuite extends CatalogTestCases {
- override protected def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+
+ protected override val utils: CatalogTestUtils = new CatalogTestUtils {
+ override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
+ override val tableOutputFormat: String = "org.apache.park.SequenceFileOutputFormat"
+ override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+ }
+
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
new file mode 100644
index 0000000000..e1973ee258
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
+
+
+/**
+ * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
+ *
+ * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
+ * signatures but do not extend a common parent. This is largely by design but
+ * unfortunately leads to very similar test code in two places.
+ */
+class SessionCatalogSuite extends SparkFunSuite {
+ private val utils = new CatalogTestUtils {
+ override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
+ override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
+ override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+ }
+
+ import utils._
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ test("basic create and list databases") {
+ val catalog = new SessionCatalog(newEmptyCatalog())
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ assert(catalog.databaseExists("default"))
+ assert(!catalog.databaseExists("testing"))
+ assert(!catalog.databaseExists("testing2"))
+ catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+ assert(catalog.databaseExists("testing"))
+ assert(catalog.listDatabases().toSet == Set("default", "testing"))
+ catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+ assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+ assert(catalog.databaseExists("testing2"))
+ assert(!catalog.databaseExists("does_not_exist"))
+ }
+
+ test("get database when a database exists") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val db1 = catalog.getDatabase("db1")
+ assert(db1.name == "db1")
+ assert(db1.description.contains("db1"))
+ }
+
+ test("get database should throw exception when the database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.getDatabase("db_that_does_not_exist")
+ }
+ }
+
+ test("list databases without pattern") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+ }
+
+ test("list databases with pattern") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.listDatabases("db").toSet == Set.empty)
+ assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+ assert(catalog.listDatabases("*1").toSet == Set("db1"))
+ assert(catalog.listDatabases("db2").toSet == Set("db2"))
+ }
+
+ test("drop database") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+ assert(catalog.listDatabases().toSet == Set("default", "db2"))
+ }
+
+ test("drop database when the database is not empty") {
+ // Throw exception if there are functions left
+ val externalCatalog1 = newBasicCatalog()
+ val sessionCatalog1 = new SessionCatalog(externalCatalog1)
+ externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+ intercept[AnalysisException] {
+ sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+
+ // Throw exception if there are tables left
+ val externalCatalog2 = newBasicCatalog()
+ val sessionCatalog2 = new SessionCatalog(externalCatalog2)
+ externalCatalog2.dropFunction("db2", "func1")
+ intercept[AnalysisException] {
+ sessionCatalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+
+ // When cascade is true, it should drop them
+ val externalCatalog3 = newBasicCatalog()
+ val sessionCatalog3 = new SessionCatalog(externalCatalog3)
+ externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+ assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1"))
+ }
+
+ test("drop database when the database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+ }
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+ }
+
+ test("alter database") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val db1 = catalog.getDatabase("db1")
+ // Note: alter properties here because Hive does not support altering other fields
+ catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+ val newDb1 = catalog.getDatabase("db1")
+ assert(db1.properties.isEmpty)
+ assert(newDb1.properties.size == 2)
+ assert(newDb1.properties.get("k") == Some("v3"))
+ assert(newDb1.properties.get("good") == Some("true"))
+ }
+
+ test("alter database should throw exception when the database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.alterDatabase(newDb("does_not_exist"))
+ }
+ }
+
+ test("get/set current database") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.getCurrentDatabase == "default")
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.getCurrentDatabase == "db2")
+ intercept[AnalysisException] {
+ catalog.setCurrentDatabase("deebo")
+ }
+ catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
+ catalog.setCurrentDatabase("deebo")
+ assert(catalog.getCurrentDatabase == "deebo")
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ test("create table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(externalCatalog.listTables("db1").isEmpty)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ sessionCatalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false)
+ sessionCatalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false)
+ assert(externalCatalog.listTables("db1").toSet == Set("tbl3"))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+ // Create table without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db1")
+ sessionCatalog.createTable(newTable("tbl4"), ignoreIfExists = false)
+ assert(externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4"))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+ }
+
+ test("create table when database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ // Creating table in non-existent database should always fail
+ intercept[AnalysisException] {
+ catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
+ }
+ // Table already exists
+ intercept[AnalysisException] {
+ catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+ }
+ catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true)
+ }
+
+ test("create temp table") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempTable1 = Range(1, 10, 1, 10, Seq())
+ val tempTable2 = Range(1, 20, 2, 10, Seq())
+ catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+ catalog.createTempTable("tbl2", tempTable2, ignoreIfExists = false)
+ assert(catalog.getTempTable("tbl1") == Some(tempTable1))
+ assert(catalog.getTempTable("tbl2") == Some(tempTable2))
+ assert(catalog.getTempTable("tbl3") == None)
+ // Temporary table already exists
+ intercept[AnalysisException] {
+ catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+ }
+ // Temporary table already exists but we override it
+ catalog.createTempTable("tbl1", tempTable2, ignoreIfExists = true)
+ assert(catalog.getTempTable("tbl1") == Some(tempTable2))
+ }
+
+ test("drop table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+ // Drop table without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false)
+ assert(externalCatalog.listTables("db2").isEmpty)
+ }
+
+ test("drop table when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ // Should always throw exception when the database does not exist
+ intercept[AnalysisException] {
+ catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
+ }
+ // Table does not exist
+ intercept[AnalysisException] {
+ catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
+ }
+ catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
+ }
+
+ test("drop temp table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val tempTable = Range(1, 10, 2, 10, Seq())
+ sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.setCurrentDatabase("db2")
+ assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ // If database is not specified, temp table should be dropped first
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ assert(sessionCatalog.getTempTable("tbl1") == None)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ // If temp table does not exist, the table in the current database should be dropped
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+ // If database is specified, temp tables are never dropped
+ sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+ assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+ }
+
+ test("rename table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ sessionCatalog.renameTable(
+ TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone", Some("db2")))
+ assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+ sessionCatalog.renameTable(
+ TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo", Some("db2")))
+ assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
+ // Rename table without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
+ assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
+ // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
+ intercept[AnalysisException] {
+ sessionCatalog.renameTable(
+ TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
+ }
+ }
+
+ test("rename table when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.renameTable(
+ TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2", Some("unknown_db")))
+ }
+ intercept[AnalysisException] {
+ catalog.renameTable(
+ TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2", Some("db2")))
+ }
+ }
+
+ test("rename temp table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val tempTable = Range(1, 10, 2, 10, Seq())
+ sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ sessionCatalog.setCurrentDatabase("db2")
+ assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ // If database is not specified, temp table should be renamed first
+ sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
+ assert(sessionCatalog.getTempTable("tbl1") == None)
+ assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ // If database is specified, temp tables are never renamed
+ sessionCatalog.renameTable(
+ TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2")))
+ assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
+ assert(sessionCatalog.getTempTable("tbl4") == None)
+ assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
+ }
+
+ test("alter table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val tbl1 = externalCatalog.getTable("db2", "tbl1")
+ sessionCatalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
+ val newTbl1 = externalCatalog.getTable("db2", "tbl1")
+ assert(!tbl1.properties.contains("toh"))
+ assert(newTbl1.properties.size == tbl1.properties.size + 1)
+ assert(newTbl1.properties.get("toh") == Some("frem"))
+ // Alter table without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.alterTable(tbl1.copy(name = TableIdentifier("tbl1")))
+ val newestTbl1 = externalCatalog.getTable("db2", "tbl1")
+ assert(newestTbl1 == tbl1)
+ }
+
+ test("alter table when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.alterTable(newTable("tbl1", "unknown_db"))
+ }
+ intercept[AnalysisException] {
+ catalog.alterTable(newTable("unknown_table", "db2"))
+ }
+ }
+
+ test("get table") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ == externalCatalog.getTable("db2", "tbl1"))
+ // Get table without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+ == externalCatalog.getTable("db2", "tbl1"))
+ }
+
+ test("get table when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+ }
+ intercept[AnalysisException] {
+ catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+ }
+ }
+
+ test("lookup table relation") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val tempTable1 = Range(1, 10, 1, 10, Seq())
+ val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
+ sessionCatalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+ sessionCatalog.setCurrentDatabase("db2")
+ // If we explicitly specify the database, we'll look up the relation in that database
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
+ == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+ // Otherwise, we'll first look up a temporary table with the same name
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
+ == SubqueryAlias("tbl1", tempTable1))
+ // Then, if that does not exist, look up the relation in the current database
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
+ == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+ }
+
+ test("lookup table relation with alias") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val alias = "monster"
+ val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
+ val relationWithAlias =
+ SubqueryAlias(alias,
+ SubqueryAlias("tbl1",
+ CatalogRelation("db2", tableMetadata, Some(alias))))
+ assert(catalog.lookupRelation(
+ TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
+ assert(catalog.lookupRelation(
+ TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
+ }
+
+ test("list tables without pattern") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempTable = Range(1, 10, 2, 10, Seq())
+ catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+ assert(catalog.listTables("db1").toSet ==
+ Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
+ assert(catalog.listTables("db2").toSet ==
+ Set(TableIdentifier("tbl1"),
+ TableIdentifier("tbl4"),
+ TableIdentifier("tbl1", Some("db2")),
+ TableIdentifier("tbl2", Some("db2"))))
+ intercept[AnalysisException] {
+ catalog.listTables("unknown_db")
+ }
+ }
+
+ test("list tables with pattern") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempTable = Range(1, 10, 2, 10, Seq())
+ catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+ catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+ assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
+ assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
+ assert(catalog.listTables("db2", "tbl*").toSet ==
+ Set(TableIdentifier("tbl1"),
+ TableIdentifier("tbl4"),
+ TableIdentifier("tbl1", Some("db2")),
+ TableIdentifier("tbl2", Some("db2"))))
+ assert(catalog.listTables("db2", "*1").toSet ==
+ Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
+ intercept[AnalysisException] {
+ catalog.listTables("unknown_db")
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ test("basic create and list partitions") {
+ val externalCatalog = newEmptyCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
+ sessionCatalog.createPartitions(
+ TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
+ // Create partitions without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("mydb")
+ sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3)))
+ }
+
+ test("create partitions when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false)
+ }
+ }
+
+ test("create partitions that already exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false)
+ }
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
+ }
+
+ test("drop partitions") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+ sessionCatalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false)
+ assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
+ // Drop partitions without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.dropPartitions(
+ TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false)
+ assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
+ // Drop multiple partitions at once
+ sessionCatalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+ sessionCatalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
+ }
+
+ test("drop partitions when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false)
+ }
+ }
+
+ test("drop partitions that do not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false)
+ }
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true)
+ }
+
+ test("get partition") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.getPartition(
+ TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec)
+ assert(catalog.getPartition(
+ TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec)
+ // Get partition without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec)
+ assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec)
+ // Get non-existent partition
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl2"), part3.spec)
+ }
+ }
+
+ test("get partition when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("does_not_exist")), part1.spec)
+ }
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec)
+ }
+ }
+
+ test("rename partitions") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+ val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+ val newSpecs = Seq(newPart1.spec, newPart2.spec)
+ catalog.renamePartitions(
+ TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs)
+ assert(catalog.getPartition(
+ TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec)
+ assert(catalog.getPartition(
+ TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec)
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+ }
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+ }
+ // Rename partitions without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec))
+ assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec)
+ assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec)
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec)
+ }
+ intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec)
+ }
+ }
+
+ test("rename partitions when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1.spec), Seq(part2.spec))
+ }
+ intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec))
+ }
+ }
+
+ test("alter partitions") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val newLocation = newUriForDatabase()
+ // Alter but keep spec the same
+ val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+ val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+ catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(
+ oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+ oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+ val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+ val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+ assert(newPart1.storage.locationUri == Some(newLocation))
+ assert(newPart2.storage.locationUri == Some(newLocation))
+ assert(oldPart1.storage.locationUri != Some(newLocation))
+ assert(oldPart2.storage.locationUri != Some(newLocation))
+ // Alter partitions without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2))
+ val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec)
+ val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec)
+ assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri)
+ assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri)
+ // Alter but change spec, should fail because new partition specs do not exist yet
+ val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+ val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+ intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2))
+ }
+ }
+
+ test("alter partitions when database/table does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1))
+ }
+ intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1))
+ }
+ }
+
+ test("list partitions") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
+ // List partitions without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2))
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ test("basic create and list functions") {
+ val externalCatalog = newEmptyCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+ assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+ // Create function without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("mydb")
+ sessionCatalog.createFunction(newFunc("myfunc2"))
+ assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
+ }
+
+ test("create function when database does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+ }
+ }
+
+ test("create function that already exists") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.createFunction(newFunc("func1", Some("db2")))
+ }
+ }
+
+ test("create temp function") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempFunc1 = newFunc("temp1")
+ val tempFunc2 = newFunc("temp2")
+ catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+ catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+ assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
+ assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
+ assert(catalog.getTempFunction("temp3") == None)
+ // Temporary function already exists
+ intercept[AnalysisException] {
+ catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+ }
+ // Temporary function is overridden
+ val tempFunc3 = tempFunc1.copy(className = "something else")
+ catalog.createTempFunction(tempFunc3, ignoreIfExists = true)
+ assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
+ }
+
+ test("drop function") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+ sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+ assert(externalCatalog.listFunctions("db2", "*").isEmpty)
+ // Drop function without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
+ sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+ assert(externalCatalog.listFunctions("db2", "*").isEmpty)
+ }
+
+ test("drop function when database/function does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+ }
+ intercept[AnalysisException] {
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+ }
+ }
+
+ test("drop temp function") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempFunc = newFunc("func1")
+ catalog.createTempFunction(tempFunc, ignoreIfExists = false)
+ assert(catalog.getTempFunction("func1") == Some(tempFunc))
+ catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+ assert(catalog.getTempFunction("func1") == None)
+ intercept[AnalysisException] {
+ catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+ }
+ catalog.dropTempFunction("func1", ignoreIfNotExists = true)
+ }
+
+ test("get function") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass)
+ assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+ // Get function without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+ }
+
+ test("get function when database/function does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+ }
+ intercept[AnalysisException] {
+ catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+ }
+ }
+
+ test("get temp function") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val metastoreFunc = externalCatalog.getFunction("db2", "func1")
+ val tempFunc = newFunc("func1").copy(className = "something weird")
+ sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
+ sessionCatalog.setCurrentDatabase("db2")
+ // If a database is specified, we'll always return the function in that database
+ assert(sessionCatalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == metastoreFunc)
+ // If no database is specified, we'll first return temporary functions
+ assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == tempFunc)
+ // Then, if no such temporary function exist, check the current database
+ sessionCatalog.dropTempFunction("func1", ignoreIfNotExists = false)
+ assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == metastoreFunc)
+ }
+
+ test("rename function") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val newName = "funcky"
+ assert(sessionCatalog.getFunction(
+ FunctionIdentifier("func1", Some("db2"))) == newFunc("func1", Some("db2")))
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+ sessionCatalog.renameFunction(
+ FunctionIdentifier("func1", Some("db2")), FunctionIdentifier(newName, Some("db2")))
+ assert(sessionCatalog.getFunction(
+ FunctionIdentifier(newName, Some("db2"))) == newFunc(newName, Some("db2")))
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set(newName))
+ // Rename function without explicitly specifying database
+ sessionCatalog.setCurrentDatabase("db2")
+ sessionCatalog.renameFunction(FunctionIdentifier(newName), FunctionIdentifier("func1"))
+ assert(sessionCatalog.getFunction(
+ FunctionIdentifier("func1")) == newFunc("func1", Some("db2")))
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+ // Renaming "db2.func1" to "db1.func2" should fail because databases don't match
+ intercept[AnalysisException] {
+ sessionCatalog.renameFunction(
+ FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func2", Some("db1")))
+ }
+ }
+
+ test("rename function when database/function does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.renameFunction(
+ FunctionIdentifier("func1", Some("does_not_exist")),
+ FunctionIdentifier("func5", Some("does_not_exist")))
+ }
+ intercept[AnalysisException] {
+ catalog.renameFunction(
+ FunctionIdentifier("does_not_exist", Some("db2")),
+ FunctionIdentifier("x", Some("db2")))
+ }
+ }
+
+ test("rename temp function") {
+ val externalCatalog = newBasicCatalog()
+ val sessionCatalog = new SessionCatalog(externalCatalog)
+ val tempFunc = newFunc("func1").copy(className = "something weird")
+ sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
+ sessionCatalog.setCurrentDatabase("db2")
+ // If a database is specified, we'll always rename the function in that database
+ sessionCatalog.renameFunction(
+ FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func3", Some("db2")))
+ assert(sessionCatalog.getTempFunction("func1") == Some(tempFunc))
+ assert(sessionCatalog.getTempFunction("func3") == None)
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3"))
+ // If no database is specified, we'll first rename temporary functions
+ sessionCatalog.createFunction(newFunc("func1", Some("db2")))
+ sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4"))
+ assert(sessionCatalog.getTempFunction("func4") ==
+ Some(tempFunc.copy(name = FunctionIdentifier("func4"))))
+ assert(sessionCatalog.getTempFunction("func1") == None)
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3"))
+ // Then, if no such temporary function exist, rename the function in the current database
+ sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func5"))
+ assert(sessionCatalog.getTempFunction("func5") == None)
+ assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3", "func5"))
+ }
+
+ test("alter function") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == funcClass)
+ catalog.alterFunction(newFunc("func1", Some("db2")).copy(className = "muhaha"))
+ assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == "muhaha")
+ // Alter function without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ catalog.alterFunction(newFunc("func1").copy(className = "derpy"))
+ assert(catalog.getFunction(FunctionIdentifier("func1")).className == "derpy")
+ }
+
+ test("alter function when database/function does not exist") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.alterFunction(newFunc("func5", Some("does_not_exist")))
+ }
+ intercept[AnalysisException] {
+ catalog.alterFunction(newFunc("funcky", Some("db2")))
+ }
+ }
+
+ test("list functions") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val tempFunc1 = newFunc("func1").copy(className = "march")
+ val tempFunc2 = newFunc("yes_me").copy(className = "april")
+ catalog.createFunction(newFunc("func2", Some("db2")))
+ catalog.createFunction(newFunc("not_me", Some("db2")))
+ catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+ catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+ assert(catalog.listFunctions("db1", "*").toSet ==
+ Set(FunctionIdentifier("func1"),
+ FunctionIdentifier("yes_me")))
+ assert(catalog.listFunctions("db2", "*").toSet ==
+ Set(FunctionIdentifier("func1"),
+ FunctionIdentifier("yes_me"),
+ FunctionIdentifier("func1", Some("db2")),
+ FunctionIdentifier("func2", Some("db2")),
+ FunctionIdentifier("not_me", Some("db2"))))
+ assert(catalog.listFunctions("db2", "func*").toSet ==
+ Set(FunctionIdentifier("func1"),
+ FunctionIdentifier("func1", Some("db2")),
+ FunctionIdentifier("func2", Some("db2"))))
+ }
+
+}