diff options
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 50 |
1 files changed, 22 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index e216fa5528..f8a6fb74cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} - +import org.apache.spark.sql.catalyst.util.StringUtils /** * An in-memory (ephemeral) implementation of the system catalog. @@ -47,16 +47,6 @@ class InMemoryCatalog extends ExternalCatalog { // Database name -> description private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc] - private def filterPattern(names: Seq[String], pattern: String): Seq[String] = { - val regex = pattern.replaceAll("\\*", ".*").r - names.filter { funcName => regex.pattern.matcher(funcName).matches() } - } - - private def functionExists(db: String, funcName: String): Boolean = { - requireDbExists(db) - catalog(db).functions.contains(funcName) - } - private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) @@ -72,7 +62,7 @@ class InMemoryCatalog extends ExternalCatalog { private def requireTableExists(db: String, table: String): Unit = { if (!tableExists(db, table)) { throw new AnalysisException( - s"Table not found: '$table' does not exist in database '$db'") + s"Table or View not found: '$table' does not exist in database '$db'") } } @@ -141,7 +131,7 @@ class InMemoryCatalog extends ExternalCatalog { } override def listDatabases(pattern: String): Seq[String] = synchronized { - filterPattern(listDatabases(), pattern) + StringUtils.filterPattern(listDatabases(), pattern) } override def setCurrentDatabase(db: String): Unit = { /* no-op */ } @@ -155,7 +145,7 @@ class InMemoryCatalog extends ExternalCatalog { tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) - val table = tableDefinition.name.table + val table = tableDefinition.identifier.table if (tableExists(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") @@ -174,7 +164,7 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { - throw new AnalysisException(s"Table '$table' does not exist in database '$db'") + throw new AnalysisException(s"Table or View '$table' does not exist in database '$db'") } } } @@ -182,14 +172,14 @@ class InMemoryCatalog extends ExternalCatalog { override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { requireTableExists(db, oldName) val oldDesc = catalog(db).tables(oldName) - oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db))) + oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) catalog(db).tables.put(newName, oldDesc) catalog(db).tables.remove(oldName) } override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized { - requireTableExists(db, tableDefinition.name.table) - catalog(db).tables(tableDefinition.name.table).table = tableDefinition + requireTableExists(db, tableDefinition.identifier.table) + catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition } override def getTable(db: String, table: String): CatalogTable = synchronized { @@ -197,6 +187,10 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } + override def getTableOption(db: String, table: String): Option[CatalogTable] = synchronized { + if (!tableExists(db, table)) None else Option(catalog(db).tables(table).table) + } + override def tableExists(db: String, table: String): Boolean = synchronized { requireDbExists(db) catalog(db).tables.contains(table) @@ -208,7 +202,7 @@ class InMemoryCatalog extends ExternalCatalog { } override def listTables(db: String, pattern: String): Seq[String] = synchronized { - filterPattern(listTables(db), pattern) + StringUtils.filterPattern(listTables(db), pattern) } // -------------------------------------------------------------------------- @@ -296,10 +290,10 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (functionExists(db, func.name.funcName)) { + if (functionExists(db, func.identifier.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { - catalog(db).functions.put(func.name.funcName, func) + catalog(db).functions.put(func.identifier.funcName, func) } } @@ -310,24 +304,24 @@ class InMemoryCatalog extends ExternalCatalog { override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { requireFunctionExists(db, oldName) - val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db))) + val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db))) catalog(db).functions.remove(oldName) catalog(db).functions.put(newName, newFunc) } - override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized { - requireFunctionExists(db, funcDefinition.name.funcName) - catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition) - } - override def getFunction(db: String, funcName: String): CatalogFunction = synchronized { requireFunctionExists(db, funcName) catalog(db).functions(funcName) } + override def functionExists(db: String, funcName: String): Boolean = { + requireDbExists(db) + catalog(db).functions.contains(funcName) + } + override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { requireDbExists(db) - filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) + StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) } } |