aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-02-01 14:11:52 -0800
committerReynold Xin <rxin@databricks.com>2016-02-01 14:11:52 -0800
commitbe7a2fc0716b7d25327b6f8f683390fc62532e3b (patch)
treec9f8161f6ef8f128a98363725465cbfede83b231
parenta2973fed30fbe9a0b12e1c1225359fdf55d322b4 (diff)
downloadspark-be7a2fc0716b7d25327b6f8f683390fc62532e3b.tar.gz
spark-be7a2fc0716b7d25327b6f8f683390fc62532e3b.tar.bz2
spark-be7a2fc0716b7d25327b6f8f683390fc62532e3b.zip
[SPARK-13078][SQL] API and test cases for internal catalog
This pull request creates an internal catalog API. The creation of this API is the first step towards consolidating SQLContext and HiveContext. I envision we will have two different implementations in Spark 2.0: (1) a simple in-memory implementation, and (2) an implementation based on the current HiveClient (ClientWrapper). I took a look at what Hive's internal metastore interface/implementation, and then created this API based on it. I believe this is the minimal set needed in order to achieve all the needed functionality. Author: Reynold Xin <rxin@databricks.com> Closes #10982 from rxin/SPARK-13078.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala246
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala178
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala263
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala23
4 files changed, 710 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
new file mode 100644
index 0000000000..9e6dfb7e95
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * An in-memory (ephemeral) implementation of the system catalog.
+ *
+ * All public methods should be synchronized for thread-safety.
+ */
+class InMemoryCatalog extends Catalog {
+
+ private class TableDesc(var table: Table) {
+ val partitions = new mutable.HashMap[String, TablePartition]
+ }
+
+ private class DatabaseDesc(var db: Database) {
+ val tables = new mutable.HashMap[String, TableDesc]
+ val functions = new mutable.HashMap[String, Function]
+ }
+
+ private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
+
+ private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
+ val regex = pattern.replaceAll("\\*", ".*").r
+ names.filter { funcName => regex.pattern.matcher(funcName).matches() }
+ }
+
+ private def existsFunction(db: String, funcName: String): Boolean = {
+ catalog(db).functions.contains(funcName)
+ }
+
+ private def existsTable(db: String, table: String): Boolean = {
+ catalog(db).tables.contains(table)
+ }
+
+ private def assertDbExists(db: String): Unit = {
+ if (!catalog.contains(db)) {
+ throw new AnalysisException(s"Database $db does not exist")
+ }
+ }
+
+ private def assertFunctionExists(db: String, funcName: String): Unit = {
+ assertDbExists(db)
+ if (!existsFunction(db, funcName)) {
+ throw new AnalysisException(s"Function $funcName does not exists in $db database")
+ }
+ }
+
+ private def assertTableExists(db: String, table: String): Unit = {
+ assertDbExists(db)
+ if (!existsTable(db, table)) {
+ throw new AnalysisException(s"Table $table does not exists in $db database")
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
+ if (catalog.contains(dbDefinition.name)) {
+ if (!ifNotExists) {
+ throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
+ }
+ } else {
+ catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
+ }
+ }
+
+ override def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = synchronized {
+ if (catalog.contains(db)) {
+ if (!cascade) {
+ // If cascade is false, make sure the database is empty.
+ if (catalog(db).tables.nonEmpty) {
+ throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
+ }
+ if (catalog(db).functions.nonEmpty) {
+ throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
+ }
+ }
+ // Remove the database.
+ catalog.remove(db)
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new AnalysisException(s"Database $db does not exist")
+ }
+ }
+ }
+
+ override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
+ assertDbExists(db)
+ assert(db == dbDefinition.name)
+ catalog(db).db = dbDefinition
+ }
+
+ override def getDatabase(db: String): Database = synchronized {
+ assertDbExists(db)
+ catalog(db).db
+ }
+
+ override def listDatabases(): Seq[String] = synchronized {
+ catalog.keySet.toSeq
+ }
+
+ override def listDatabases(pattern: String): Seq[String] = synchronized {
+ filterPattern(listDatabases(), pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
+ : Unit = synchronized {
+ assertDbExists(db)
+ if (existsTable(db, tableDefinition.name)) {
+ if (!ifNotExists) {
+ throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
+ }
+ } else {
+ catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
+ }
+ }
+
+ override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
+ : Unit = synchronized {
+ assertDbExists(db)
+ if (existsTable(db, table)) {
+ catalog(db).tables.remove(table)
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new AnalysisException(s"Table $table does not exist in $db database")
+ }
+ }
+ }
+
+ override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
+ assertTableExists(db, oldName)
+ val oldDesc = catalog(db).tables(oldName)
+ oldDesc.table = oldDesc.table.copy(name = newName)
+ catalog(db).tables.put(newName, oldDesc)
+ catalog(db).tables.remove(oldName)
+ }
+
+ override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
+ assertTableExists(db, table)
+ assert(table == tableDefinition.name)
+ catalog(db).tables(table).table = tableDefinition
+ }
+
+ override def getTable(db: String, table: String): Table = synchronized {
+ assertTableExists(db, table)
+ catalog(db).tables(table).table
+ }
+
+ override def listTables(db: String): Seq[String] = synchronized {
+ assertDbExists(db)
+ catalog(db).tables.keySet.toSeq
+ }
+
+ override def listTables(db: String, pattern: String): Seq[String] = synchronized {
+ assertDbExists(db)
+ filterPattern(listTables(db), pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ override def alterPartition(db: String, table: String, part: TablePartition)
+ : Unit = synchronized {
+ throw new UnsupportedOperationException
+ }
+
+ override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
+ : Unit = synchronized {
+ throw new UnsupportedOperationException
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ override def createFunction(
+ db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
+ assertDbExists(db)
+
+ if (existsFunction(db, func.name)) {
+ if (!ifNotExists) {
+ throw new AnalysisException(s"Function $func already exists in $db database")
+ }
+ } else {
+ catalog(db).functions.put(func.name, func)
+ }
+ }
+
+ override def dropFunction(db: String, funcName: String): Unit = synchronized {
+ assertFunctionExists(db, funcName)
+ catalog(db).functions.remove(funcName)
+ }
+
+ override def alterFunction(db: String, funcName: String, funcDefinition: Function)
+ : Unit = synchronized {
+ assertFunctionExists(db, funcName)
+ if (funcName != funcDefinition.name) {
+ // Also a rename; remove the old one and add the new one back
+ catalog(db).functions.remove(funcName)
+ }
+ catalog(db).functions.put(funcName, funcDefinition)
+ }
+
+ override def getFunction(db: String, funcName: String): Function = synchronized {
+ assertFunctionExists(db, funcName)
+ catalog(db).functions(funcName)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
+ assertDbExists(db)
+ val regex = pattern.replaceAll("\\*", ".*").r
+ filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
+ }
+
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
new file mode 100644
index 0000000000..a6caf91f33
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * Interface for the system catalog (of columns, partitions, tables, and databases).
+ *
+ * This is only used for non-temporary items, and implementations must be thread-safe as they
+ * can be accessed in multiple threads.
+ *
+ * Implementations should throw [[AnalysisException]] when table or database don't exist.
+ */
+abstract class Catalog {
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
+
+ def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit
+
+ def alterDatabase(db: String, dbDefinition: Database): Unit
+
+ def getDatabase(db: String): Database
+
+ def listDatabases(): Seq[String]
+
+ def listDatabases(pattern: String): Seq[String]
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+
+ def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
+
+ def renameTable(db: String, oldName: String, newName: String): Unit
+
+ def alterTable(db: String, table: String, tableDefinition: Table): Unit
+
+ def getTable(db: String, table: String): Table
+
+ def listTables(db: String): Seq[String]
+
+ def listTables(db: String, pattern: String): Seq[String]
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ // TODO: need more functions for partitioning.
+
+ def alterPartition(db: String, table: String, part: TablePartition): Unit
+
+ def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+
+ def dropFunction(db: String, funcName: String): Unit
+
+ def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+
+ def getFunction(db: String, funcName: String): Function
+
+ def listFunctions(db: String, pattern: String): Seq[String]
+
+}
+
+
+/**
+ * A function defined in the catalog.
+ *
+ * @param name name of the function
+ * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
+ */
+case class Function(
+ name: String,
+ className: String
+)
+
+
+/**
+ * Storage format, used to describe how a partition or a table is stored.
+ */
+case class StorageFormat(
+ locationUri: String,
+ inputFormat: String,
+ outputFormat: String,
+ serde: String,
+ serdeProperties: Map[String, String]
+)
+
+
+/**
+ * A column in a table.
+ */
+case class Column(
+ name: String,
+ dataType: String,
+ nullable: Boolean,
+ comment: String
+)
+
+
+/**
+ * A partition (Hive style) defined in the catalog.
+ *
+ * @param values values for the partition columns
+ * @param storage storage format of the partition
+ */
+case class TablePartition(
+ values: Seq[String],
+ storage: StorageFormat
+)
+
+
+/**
+ * A table defined in the catalog.
+ *
+ * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
+ * future once we have a better understanding of how we want to handle skewed columns.
+ */
+case class Table(
+ name: String,
+ description: String,
+ schema: Seq[Column],
+ partitionColumns: Seq[Column],
+ sortColumns: Seq[Column],
+ storage: StorageFormat,
+ numBuckets: Int,
+ properties: Map[String, String],
+ tableType: String,
+ createTime: Long,
+ lastAccessTime: Long,
+ viewOriginalText: Option[String],
+ viewText: Option[String]) {
+
+ require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
+ tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+}
+
+
+/**
+ * A database defined in the catalog.
+ */
+case class Database(
+ name: String,
+ description: String,
+ locationUri: String,
+ properties: Map[String, String]
+)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
new file mode 100644
index 0000000000..ab9d5ac8a2
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * A reasonable complete test suite (i.e. behaviors) for a [[Catalog]].
+ *
+ * Implementations of the [[Catalog]] interface can create test suites by extending this.
+ */
+abstract class CatalogTestCases extends SparkFunSuite {
+
+ protected def newEmptyCatalog(): Catalog
+
+ /**
+ * Creates a basic catalog, with the following structure:
+ *
+ * db1
+ * db2
+ * - tbl1
+ * - tbl2
+ * - func1
+ */
+ private def newBasicCatalog(): Catalog = {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("db1"), ifNotExists = false)
+ catalog.createDatabase(newDb("db2"), ifNotExists = false)
+
+ catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog
+ }
+
+ private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
+
+ private def newDb(name: String = "default"): Database =
+ Database(name, name + " description", "uri", Map.empty)
+
+ private def newTable(name: String): Table =
+ Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
+ None, None)
+
+ private def newFunc(name: String): Function = Function(name, "class.name")
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ test("basic create, drop and list databases") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb(), ifNotExists = false)
+ assert(catalog.listDatabases().toSet == Set("default"))
+
+ catalog.createDatabase(newDb("default2"), ifNotExists = false)
+ assert(catalog.listDatabases().toSet == Set("default", "default2"))
+ }
+
+ test("get database when a database exists") {
+ val db1 = newBasicCatalog().getDatabase("db1")
+ assert(db1.name == "db1")
+ assert(db1.description.contains("db1"))
+ }
+
+ test("get database should throw exception when the database does not exist") {
+ intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist") }
+ }
+
+ test("list databases without pattern") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+ }
+
+ test("list databases with pattern") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listDatabases("db").toSet == Set.empty)
+ assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+ assert(catalog.listDatabases("*1").toSet == Set("db1"))
+ assert(catalog.listDatabases("db2").toSet == Set("db2"))
+ }
+
+ test("drop database") {
+ val catalog = newBasicCatalog()
+ catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+ assert(catalog.listDatabases().toSet == Set("db2"))
+ }
+
+ test("drop database when the database is not empty") {
+ // Throw exception if there are functions left
+ val catalog1 = newBasicCatalog()
+ catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+ intercept[AnalysisException] {
+ catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+
+ // Throw exception if there are tables left
+ val catalog2 = newBasicCatalog()
+ catalog2.dropFunction("db2", "func1")
+ intercept[AnalysisException] {
+ catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+ }
+
+ // When cascade is true, it should drop them
+ val catalog3 = newBasicCatalog()
+ catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+ assert(catalog3.listDatabases().toSet == Set("db1"))
+ }
+
+ test("drop database when the database does not exist") {
+ val catalog = newBasicCatalog()
+
+ intercept[AnalysisException] {
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+ }
+
+ catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+ }
+
+ test("alter database") {
+ val catalog = newBasicCatalog()
+ catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
+ assert(catalog.getDatabase("db1").description == "new description")
+ }
+
+ test("alter database should throw exception when the database does not exist") {
+ intercept[AnalysisException] {
+ newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ test("drop table") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ assert(catalog.listTables("db2").toSet == Set("tbl2"))
+ }
+
+ test("drop table when database / table does not exist") {
+ val catalog = newBasicCatalog()
+
+ // Should always throw exception when the database does not exist
+ intercept[AnalysisException] {
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
+ }
+
+ intercept[AnalysisException] {
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
+ }
+
+ // Should throw exception when the table does not exist, if ignoreIfNotExists is false
+ intercept[AnalysisException] {
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
+ }
+
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
+ }
+
+ test("rename table") {
+ val catalog = newBasicCatalog()
+
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ catalog.renameTable("db2", "tbl1", "tblone")
+ assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+ }
+
+ test("rename table when database / table does not exist") {
+ val catalog = newBasicCatalog()
+
+ intercept[AnalysisException] { // Throw exception when the database does not exist
+ catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
+ }
+
+ intercept[AnalysisException] { // Throw exception when the table does not exist
+ catalog.renameTable("db2", "unknown_table", "unknown_table")
+ }
+ }
+
+ test("alter table") {
+ val catalog = newBasicCatalog()
+ catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
+ assert(catalog.getTable("db2", "tbl1").createTime == 10)
+ }
+
+ test("alter table when database / table does not exist") {
+ val catalog = newBasicCatalog()
+
+ intercept[AnalysisException] { // Throw exception when the database does not exist
+ catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+ }
+
+ intercept[AnalysisException] { // Throw exception when the table does not exist
+ catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+ }
+ }
+
+ test("get table") {
+ assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
+ }
+
+ test("get table when database / table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getTable("unknown_db", "unknown_table")
+ }
+
+ intercept[AnalysisException] {
+ catalog.getTable("db2", "unknown_table")
+ }
+ }
+
+ test("list tables without pattern") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listTables("db1").toSet == Set.empty)
+ assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+ }
+
+ test("list tables with pattern") {
+ val catalog = newBasicCatalog()
+
+ // Test when database does not exist
+ intercept[AnalysisException] { catalog.listTables("unknown_db") }
+
+ assert(catalog.listTables("db1", "*").toSet == Set.empty)
+ assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
+ assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
+ assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ // TODO: Add tests cases for partitions
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ // TODO: Add tests cases for functions
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
new file mode 100644
index 0000000000..871f0a0f46
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+/** Test suite for the [[InMemoryCatalog]]. */
+class InMemoryCatalogSuite extends CatalogTestCases {
+ override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog
+}