diff options
Diffstat (limited to 'sql/catalyst')
13 files changed, 174 insertions, 319 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5951a70c48..178e9402fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing - * when all relations are already filled in and the analyzer needs only to resolve attribute - * references. + * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. + * Used for testing when all relations are already filled in and the analyzer needs only + * to resolve attribute references. */ object SimpleAnalyzer - extends Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = true)) + extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true)) +class SimpleAnalyzer(conf: CatalystConf) + extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and - * a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a + * [[SessionCatalog]] and a [[FunctionRegistry]]. */ class Analyzer( - catalog: Catalog, + catalog: SessionCatalog, registry: FunctionRegistry, conf: CatalystConf, maxIterations: Int = 100) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala deleted file mode 100644 index 2f0a4dbc10..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} - - -/** - * An interface for looking up relations by name. Used by an [[Analyzer]]. - */ -trait Catalog { - - val conf: CatalystConf - - def tableExists(tableIdent: TableIdentifier): Boolean - - def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan - - def setCurrentDatabase(databaseName: String): Unit = { - throw new UnsupportedOperationException - } - - /** - * Returns tuples of (tableName, isTemporary) for all tables in the given database. - * isTemporary is a Boolean value indicates if a table is a temporary or not. - */ - def getTables(databaseName: Option[String]): Seq[(String, Boolean)] - - def refreshTable(tableIdent: TableIdentifier): Unit - - def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit - - def unregisterTable(tableIdent: TableIdentifier): Unit - - def unregisterAllTables(): Unit - - /** - * Get the table name of TableIdentifier for temporary tables. - */ - protected def getTableName(tableIdent: TableIdentifier): String = { - // It is not allowed to specify database name for temporary tables. - // We check it here and throw exception if database is defined. - if (tableIdent.database.isDefined) { - throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + - "for temporary tables. If the table name has dots (.) in it, please quote the " + - "table name with backticks (`).") - } - if (conf.caseSensitiveAnalysis) { - tableIdent.table - } else { - tableIdent.table.toLowerCase - } - } -} - -class SimpleCatalog(val conf: CatalystConf) extends Catalog { - private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - tables.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - tables.remove(getTableName(tableIdent)) - } - - override def unregisterAllTables(): Unit = { - tables.clear() - } - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - tables.containsKey(getTableName(tableIdent)) - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - val tableName = getTableName(tableIdent) - val table = tables.get(tableName) - if (table == null) { - throw new AnalysisException("Table not found: " + tableName) - } - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are - // properly qualified with this alias. - alias - .map(a => SubqueryAlias(a, qualifiedTable)) - .getOrElse(qualifiedTable) - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - tables.keySet().asScala.map(_ -> true).toSeq - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} - -/** - * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with - * new logical plans. This can be used to bind query result to virtual tables, or replace tables - * with in-memory cached versions. Note that the set of overrides is stored in memory and thus - * lost when the JVM exits. - */ -trait OverrideCatalog extends Catalog { - private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] - - private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { - if (tableIdent.database.isDefined) { - None - } else { - Option(overrides.get(getTableName(tableIdent))) - } - } - - abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { - getOverriddenTable(tableIdent) match { - case Some(_) => true - case None => super.tableExists(tableIdent) - } - } - - abstract override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - getOverriddenTable(tableIdent) match { - case Some(table) => - val tableName = getTableName(tableIdent) - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes - // are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - - case None => super.lookupRelation(tableIdent, alias) - } - } - - abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - overrides.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - if (tableIdent.database.isEmpty) { - overrides.remove(getTableName(tableIdent)) - } - } - - override def unregisterAllTables(): Unit = { - overrides.clear() - } -} - -/** - * A trivial catalog that returns an error when a relation is requested. Used for testing when all - * relations are already filled in and the analyzer needs only to resolve attribute references. - */ -object EmptyCatalog extends Catalog { - - override val conf: CatalystConf = EmptyConf - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - throw new UnsupportedOperationException - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - throw new UnsupportedOperationException - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - throw new UnsupportedOperationException - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterAllTables(): Unit = { - throw new UnsupportedOperationException - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 9518309fbf..e73d367a73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) /** - * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. + * Holds the name of a relation that has yet to be looked up in a catalog. */ case class UnresolvedRelation( tableIdentifier: TableIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 7ead1ddebe..e216fa5528 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog { names.filter { funcName => regex.pattern.matcher(funcName).matches() } } - private def existsFunction(db: String, funcName: String): Boolean = { + private def functionExists(db: String, funcName: String): Boolean = { requireDbExists(db) catalog(db).functions.contains(funcName) } - private def existsTable(db: String, table: String): Boolean = { - requireDbExists(db) - catalog(db).tables.contains(table) - } - - private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { + private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } private def requireFunctionExists(db: String, funcName: String): Unit = { - if (!existsFunction(db, funcName)) { - throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") + if (!functionExists(db, funcName)) { + throw new AnalysisException( + s"Function not found: '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { - if (!existsTable(db, table)) { - throw new AnalysisException(s"Table '$table' does not exist in database '$db'") + if (!tableExists(db, table)) { + throw new AnalysisException( + s"Table not found: '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!existsPartition(db, table, spec)) { + if (!partitionExists(db, table, spec)) { throw new AnalysisException( - s"Partition does not exist in database '$db' table '$table': '$spec'") + s"Partition not found: database '$db' table '$table' does not contain: '$spec'") } } @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) val table = tableDefinition.name.table - if (existsTable(db, table)) { + if (tableExists(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog { table: String, ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) - if (existsTable(db, table)) { + if (tableExists(db, table)) { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } + override def tableExists(db: String, table: String): Boolean = synchronized { + requireDbExists(db) + catalog(db).tables.contains(table) + } + override def listTables(db: String): Seq[String] = synchronized { requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { - requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (existsFunction(db, func.name.funcName)) { + if (functionExists(db, func.name.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { catalog(db).functions.put(func.name.funcName, func) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 3ac2bcf7e8..34265faa74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -31,17 +32,34 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -class SessionCatalog(externalCatalog: ExternalCatalog) { +class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { import ExternalCatalog._ - private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + def this(externalCatalog: ExternalCatalog) { + this(externalCatalog, new SimpleCatalystConf(true)) + } + + protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. - private[this] var currentDb = "default" + protected[this] var currentDb = { + val defaultName = "default" + val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) + // Initialize default database if it doesn't already exist + createDatabase(defaultDbDefinition, ignoreIfExists = true) + defaultName + } + + /** + * Format table name, taking into account case sensitivity. + */ + protected[this] def formatTableName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } // ---------------------------------------------------------------------------- // Databases @@ -105,8 +123,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -121,8 +139,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterTable(tableDefinition: CatalogTable): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -132,7 +150,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getTable(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) - externalCatalog.getTable(db, name.table) + val table = formatTableName(name.table) + externalCatalog.getTable(db, table) } // ------------------------------------------------------------- @@ -146,10 +165,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { name: String, tableDefinition: LogicalPlan, ignoreIfExists: Boolean): Unit = { - if (tempTables.containsKey(name) && !ignoreIfExists) { + val table = formatTableName(name) + if (tempTables.containsKey(table) && !ignoreIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } - tempTables.put(name, tableDefinition) + tempTables.put(table, tableDefinition) } /** @@ -166,11 +186,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { throw new AnalysisException("rename does not support moving tables across databases") } val db = oldName.database.getOrElse(currentDb) - if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { - externalCatalog.renameTable(db, oldName.table, newName.table) + val oldTableName = formatTableName(oldName.table) + val newTableName = formatTableName(newName.table) + if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { + externalCatalog.renameTable(db, oldTableName, newTableName) } else { - val table = tempTables.remove(oldName.table) - tempTables.put(newName.table, table) + val table = tempTables.remove(oldTableName) + tempTables.put(newTableName, table) } } @@ -183,10 +205,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - externalCatalog.dropTable(db, name.table, ignoreIfNotExists) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.dropTable(db, table, ignoreIfNotExists) } else { - tempTables.remove(name.table) + tempTables.remove(table) } } @@ -199,29 +222,44 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - val metadata = externalCatalog.getTable(db, name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + val metadata = externalCatalog.getTable(db, table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(name.table) + tempTables.get(table) } - val qualifiedTable = SubqueryAlias(name.table, relation) + val qualifiedTable = SubqueryAlias(table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } /** - * List all tables in the specified database, including temporary tables. + * Return whether a table with the specified name exists. + * + * Note: If a database is explicitly specified, then this will return whether the table + * exists in that particular database instead. In that case, even if there is a temporary + * table with the same name, we will return false if the specified database does not + * contain the table. */ - def listTables(db: String): Seq[TableIdentifier] = { - val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } - val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } - dbTables ++ _tempTables + def tableExists(name: TableIdentifier): Boolean = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.tableExists(db, table) + } else { + true // it's a temporary table + } } /** + * List all tables in the specified database, including temporary tables. + */ + def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + + /** * List all matching tables in the specified database, including temporary tables. */ def listTables(db: String, pattern: String): Seq[TableIdentifier] = { @@ -235,6 +273,19 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { } /** + * Refresh the cache entry for a metastore table, if any. + */ + def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } + + /** + * Drop all existing temporary tables. + * For testing only. + */ + def clearTempTables(): Unit = { + tempTables.clear() + } + + /** * Return a temporary table exactly as it was stored. * For testing only. */ @@ -263,7 +314,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) + val table = formatTableName(tableName.table) + externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } /** @@ -275,7 +327,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) + val table = formatTableName(tableName.table) + externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } /** @@ -289,7 +342,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) + val table = formatTableName(tableName.table) + externalCatalog.renamePartitions(db, table, specs, newSpecs) } /** @@ -303,7 +357,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.alterPartitions(db, tableName.table, parts) + val table = formatTableName(tableName.table) + externalCatalog.alterPartitions(db, table, parts) } /** @@ -312,7 +367,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.getPartition(db, tableName.table, spec) + val table = formatTableName(tableName.table) + externalCatalog.getPartition(db, table, spec) } /** @@ -321,7 +377,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.listPartitions(db, tableName.table) + val table = formatTableName(tableName.table) + externalCatalog.listPartitions(db, table) } // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c4e49614c5..34803133f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,6 +91,8 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable + def tableExists(db: String, table: String): Boolean + def listTables(db: String): Seq[String] def listTables(db: String, pattern: String): Seq[String] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 8b568b6dd6..afc2f327df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest { } test("resolve relations") { - assertAnalysisError( - UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) - + assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq()) checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) - checkAnalysis( UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) - checkAnalysis( UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 39166c4f8e..6fa4beed99 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -18,26 +18,21 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { - val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true) + protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false) - val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) - val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) - - caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - - new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { - override val extendedResolutionRules = EliminateSubqueryAliases :: Nil - } -> - new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) { + private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { + val conf = new SimpleCatalystConf(caseSensitive) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) + catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) + new Analyzer(catalog, EmptyFunctionRegistry, conf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 9aa685e1e8..31501864a8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -30,11 +31,11 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val catalog = new SimpleCatalog(conf) - val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) + private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + private val catalog = new SessionCatalog(new InMemoryCatalog, conf) + private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) - val relation = LocalRelation( + private val relation = LocalRelation( AttributeReference("i", IntegerType)(), AttributeReference("d1", DecimalType(2, 1))(), AttributeReference("d2", DecimalType(5, 2))(), @@ -43,15 +44,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { AttributeReference("b", DoubleType)() ) - val i: Expression = UnresolvedAttribute("i") - val d1: Expression = UnresolvedAttribute("d1") - val d2: Expression = UnresolvedAttribute("d2") - val u: Expression = UnresolvedAttribute("u") - val f: Expression = UnresolvedAttribute("f") - val b: Expression = UnresolvedAttribute("b") + private val i: Expression = UnresolvedAttribute("i") + private val d1: Expression = UnresolvedAttribute("d1") + private val d2: Expression = UnresolvedAttribute("d2") + private val u: Expression = UnresolvedAttribute("u") + private val f: Expression = UnresolvedAttribute("f") + private val b: Expression = UnresolvedAttribute("b") before { - catalog.registerTable(TableIdentifier("table"), relation) + catalog.createTempTable("table", relation, ignoreIfExists = true) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index a1ea61920d..277c2d717e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -225,13 +225,14 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("list tables without pattern") { val catalog = newBasicCatalog() + intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1").toSet == Set.empty) assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) } test("list tables with pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } + intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e1973ee258..74e995cc5b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -397,6 +397,24 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } + test("table exists") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) + assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) + assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) + // If database is explicitly specified, do not check temporary tables + val tempTable = Range(1, 10, 1, 10, Seq()) + catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + // If database is not explicitly specified, check the current database + catalog.setCurrentDatabase("db2") + assert(catalog.tableExists(TableIdentifier("tbl1"))) + assert(catalog.tableExists(TableIdentifier("tbl2"))) + assert(catalog.tableExists(TableIdentifier("tbl3"))) + } + test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) @@ -429,7 +447,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) intercept[AnalysisException] { - catalog.listTables("unknown_db") + catalog.listTables("unknown_db", "*") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 2ab31eea8a..e2c76b700f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -137,11 +138,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } - private val caseInsensitiveAnalyzer = - new Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = false)) + private val caseInsensitiveConf = new SimpleCatalystConf(false) + private val caseInsensitiveAnalyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf), + EmptyFunctionRegistry, + caseInsensitiveConf) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index a4c8d1c6d2..3824c67563 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) - val catalog = new SimpleCatalog(conf) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { |