aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala50
1 files changed, 22 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index e216fa5528..f8a6fb74cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-
+import org.apache.spark.sql.catalyst.util.StringUtils
/**
* An in-memory (ephemeral) implementation of the system catalog.
@@ -47,16 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
- private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
- val regex = pattern.replaceAll("\\*", ".*").r
- names.filter { funcName => regex.pattern.matcher(funcName).matches() }
- }
-
- private def functionExists(db: String, funcName: String): Boolean = {
- requireDbExists(db)
- catalog(db).functions.contains(funcName)
- }
-
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
@@ -72,7 +62,7 @@ class InMemoryCatalog extends ExternalCatalog {
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new AnalysisException(
- s"Table not found: '$table' does not exist in database '$db'")
+ s"Table or View not found: '$table' does not exist in database '$db'")
}
}
@@ -141,7 +131,7 @@ class InMemoryCatalog extends ExternalCatalog {
}
override def listDatabases(pattern: String): Seq[String] = synchronized {
- filterPattern(listDatabases(), pattern)
+ StringUtils.filterPattern(listDatabases(), pattern)
}
override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
@@ -155,7 +145,7 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
- val table = tableDefinition.name.table
+ val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
@@ -174,7 +164,7 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
- throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
+ throw new AnalysisException(s"Table or View '$table' does not exist in database '$db'")
}
}
}
@@ -182,14 +172,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
- oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
+ oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
- requireTableExists(db, tableDefinition.name.table)
- catalog(db).tables(tableDefinition.name.table).table = tableDefinition
+ requireTableExists(db, tableDefinition.identifier.table)
+ catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}
override def getTable(db: String, table: String): CatalogTable = synchronized {
@@ -197,6 +187,10 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}
+ override def getTableOption(db: String, table: String): Option[CatalogTable] = synchronized {
+ if (!tableExists(db, table)) None else Option(catalog(db).tables(table).table)
+ }
+
override def tableExists(db: String, table: String): Boolean = synchronized {
requireDbExists(db)
catalog(db).tables.contains(table)
@@ -208,7 +202,7 @@ class InMemoryCatalog extends ExternalCatalog {
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
- filterPattern(listTables(db), pattern)
+ StringUtils.filterPattern(listTables(db), pattern)
}
// --------------------------------------------------------------------------
@@ -296,10 +290,10 @@ class InMemoryCatalog extends ExternalCatalog {
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (functionExists(db, func.name.funcName)) {
+ if (functionExists(db, func.identifier.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
- catalog(db).functions.put(func.name.funcName, func)
+ catalog(db).functions.put(func.identifier.funcName, func)
}
}
@@ -310,24 +304,24 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
- val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
+ val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
- requireFunctionExists(db, funcDefinition.name.funcName)
- catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
- }
-
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = {
+ requireDbExists(db)
+ catalog(db).functions.contains(funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
- filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
+ StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
}