diff options
author | Andrew Or <andrew@databricks.com> | 2016-03-23 22:21:15 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-23 22:21:15 -0700 |
commit | c44d140cae99d0b880e6d25f158125ad3adc6a05 (patch) | |
tree | 7f0e5324e67efeff2cccf661cd27a21c3618098c /sql/catalyst/src | |
parent | cf823bead18c5be86b36da59b4bbf935c4804d04 (diff) | |
download | spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.gz spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.bz2 spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.zip |
Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog"
This reverts commit 5dfc01976bb0d72489620b4f32cc12d620bb6260.
Diffstat (limited to 'sql/catalyst/src')
13 files changed, 319 insertions, 174 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 54543eebb7..07b0f5ee70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -37,22 +36,23 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. - * Used for testing when all relations are already filled in and the analyzer needs only - * to resolve attribute references. + * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing + * when all relations are already filled in and the analyzer needs only to resolve attribute + * references. */ object SimpleAnalyzer - extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true)) -class SimpleAnalyzer(conf: CatalystConf) - extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a - * [[SessionCatalog]] and a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and + * a [[FunctionRegistry]]. */ class Analyzer( - catalog: SessionCatalog, + catalog: Catalog, registry: FunctionRegistry, conf: CatalystConf, maxIterations: Int = 100) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala new file mode 100644 index 0000000000..2f0a4dbc10 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} + + +/** + * An interface for looking up relations by name. Used by an [[Analyzer]]. + */ +trait Catalog { + + val conf: CatalystConf + + def tableExists(tableIdent: TableIdentifier): Boolean + + def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan + + def setCurrentDatabase(databaseName: String): Unit = { + throw new UnsupportedOperationException + } + + /** + * Returns tuples of (tableName, isTemporary) for all tables in the given database. + * isTemporary is a Boolean value indicates if a table is a temporary or not. + */ + def getTables(databaseName: Option[String]): Seq[(String, Boolean)] + + def refreshTable(tableIdent: TableIdentifier): Unit + + def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit + + def unregisterTable(tableIdent: TableIdentifier): Unit + + def unregisterAllTables(): Unit + + /** + * Get the table name of TableIdentifier for temporary tables. + */ + protected def getTableName(tableIdent: TableIdentifier): String = { + // It is not allowed to specify database name for temporary tables. + // We check it here and throw exception if database is defined. + if (tableIdent.database.isDefined) { + throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + + "for temporary tables. If the table name has dots (.) in it, please quote the " + + "table name with backticks (`).") + } + if (conf.caseSensitiveAnalysis) { + tableIdent.table + } else { + tableIdent.table.toLowerCase + } + } +} + +class SimpleCatalog(val conf: CatalystConf) extends Catalog { + private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + tables.put(getTableName(tableIdent), plan) + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + tables.remove(getTableName(tableIdent)) + } + + override def unregisterAllTables(): Unit = { + tables.clear() + } + + override def tableExists(tableIdent: TableIdentifier): Boolean = { + tables.containsKey(getTableName(tableIdent)) + } + + override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + val tableName = getTableName(tableIdent) + val table = tables.get(tableName) + if (table == null) { + throw new AnalysisException("Table not found: " + tableName) + } + val qualifiedTable = SubqueryAlias(tableName, table) + + // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are + // properly qualified with this alias. + alias + .map(a => SubqueryAlias(a, qualifiedTable)) + .getOrElse(qualifiedTable) + } + + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + tables.keySet().asScala.map(_ -> true).toSeq + } + + override def refreshTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } +} + +/** + * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with + * new logical plans. This can be used to bind query result to virtual tables, or replace tables + * with in-memory cached versions. Note that the set of overrides is stored in memory and thus + * lost when the JVM exits. + */ +trait OverrideCatalog extends Catalog { + private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] + + private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { + if (tableIdent.database.isDefined) { + None + } else { + Option(overrides.get(getTableName(tableIdent))) + } + } + + abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { + getOverriddenTable(tableIdent) match { + case Some(_) => true + case None => super.tableExists(tableIdent) + } + } + + abstract override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + getOverriddenTable(tableIdent) match { + case Some(table) => + val tableName = getTableName(tableIdent) + val qualifiedTable = SubqueryAlias(tableName, table) + + // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes + // are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + + case None => super.lookupRelation(tableIdent, alias) + } + } + + abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) + } + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + overrides.put(getTableName(tableIdent), plan) + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + if (tableIdent.database.isEmpty) { + overrides.remove(getTableName(tableIdent)) + } + } + + override def unregisterAllTables(): Unit = { + overrides.clear() + } +} + +/** + * A trivial catalog that returns an error when a relation is requested. Used for testing when all + * relations are already filled in and the analyzer needs only to resolve attribute references. + */ +object EmptyCatalog extends Catalog { + + override val conf: CatalystConf = EmptyConf + + override def tableExists(tableIdent: TableIdentifier): Boolean = { + throw new UnsupportedOperationException + } + + override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + throw new UnsupportedOperationException + } + + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + throw new UnsupportedOperationException + } + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + throw new UnsupportedOperationException + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } + + override def unregisterAllTables(): Unit = { + throw new UnsupportedOperationException + } + + override def refreshTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index e73d367a73..9518309fbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) /** - * Holds the name of a relation that has yet to be looked up in a catalog. + * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. */ case class UnresolvedRelation( tableIdentifier: TableIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index e216fa5528..7ead1ddebe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -52,34 +52,37 @@ class InMemoryCatalog extends ExternalCatalog { names.filter { funcName => regex.pattern.matcher(funcName).matches() } } - private def functionExists(db: String, funcName: String): Boolean = { + private def existsFunction(db: String, funcName: String): Boolean = { requireDbExists(db) catalog(db).functions.contains(funcName) } - private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { + private def existsTable(db: String, table: String): Boolean = { + requireDbExists(db) + catalog(db).tables.contains(table) + } + + private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } private def requireFunctionExists(db: String, funcName: String): Unit = { - if (!functionExists(db, funcName)) { - throw new AnalysisException( - s"Function not found: '$funcName' does not exist in database '$db'") + if (!existsFunction(db, funcName)) { + throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { - if (!tableExists(db, table)) { - throw new AnalysisException( - s"Table not found: '$table' does not exist in database '$db'") + if (!existsTable(db, table)) { + throw new AnalysisException(s"Table '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!partitionExists(db, table, spec)) { + if (!existsPartition(db, table, spec)) { throw new AnalysisException( - s"Partition not found: database '$db' table '$table' does not contain: '$spec'") + s"Partition does not exist in database '$db' table '$table': '$spec'") } } @@ -156,7 +159,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) val table = tableDefinition.name.table - if (tableExists(db, table)) { + if (existsTable(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } @@ -170,7 +173,7 @@ class InMemoryCatalog extends ExternalCatalog { table: String, ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) - if (tableExists(db, table)) { + if (existsTable(db, table)) { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -197,17 +200,13 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } - override def tableExists(db: String, table: String): Boolean = synchronized { - requireDbExists(db) - catalog(db).tables.contains(table) - } - override def listTables(db: String): Seq[String] = synchronized { requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { + requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -296,7 +295,7 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (functionExists(db, func.name.funcName)) { + if (existsFunction(db, func.name.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { catalog(db).functions.put(func.name.funcName, func) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 34265faa74..3ac2bcf7e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -32,34 +31,17 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { +class SessionCatalog(externalCatalog: ExternalCatalog) { import ExternalCatalog._ - def this(externalCatalog: ExternalCatalog) { - this(externalCatalog, new SimpleCatalystConf(true)) - } - - protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. - protected[this] var currentDb = { - val defaultName = "default" - val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) - // Initialize default database if it doesn't already exist - createDatabase(defaultDbDefinition, ignoreIfExists = true) - defaultName - } - - /** - * Format table name, taking into account case sensitivity. - */ - protected[this] def formatTableName(name: String): String = { - if (conf.caseSensitiveAnalysis) name else name.toLowerCase - } + private[this] var currentDb = "default" // ---------------------------------------------------------------------------- // Databases @@ -123,8 +105,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -139,8 +121,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def alterTable(tableDefinition: CatalogTable): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -150,8 +132,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def getTable(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - externalCatalog.getTable(db, table) + externalCatalog.getTable(db, name.table) } // ------------------------------------------------------------- @@ -165,11 +146,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { name: String, tableDefinition: LogicalPlan, ignoreIfExists: Boolean): Unit = { - val table = formatTableName(name) - if (tempTables.containsKey(table) && !ignoreIfExists) { + if (tempTables.containsKey(name) && !ignoreIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } - tempTables.put(table, tableDefinition) + tempTables.put(name, tableDefinition) } /** @@ -186,13 +166,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { throw new AnalysisException("rename does not support moving tables across databases") } val db = oldName.database.getOrElse(currentDb) - val oldTableName = formatTableName(oldName.table) - val newTableName = formatTableName(newName.table) - if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { - externalCatalog.renameTable(db, oldTableName, newTableName) + if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { + externalCatalog.renameTable(db, oldName.table, newName.table) } else { - val table = tempTables.remove(oldTableName) - tempTables.put(newTableName, table) + val table = tempTables.remove(oldName.table) + tempTables.put(newName.table, table) } } @@ -205,11 +183,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { - externalCatalog.dropTable(db, table, ignoreIfNotExists) + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + externalCatalog.dropTable(db, name.table, ignoreIfNotExists) } else { - tempTables.remove(table) + tempTables.remove(name.table) } } @@ -222,42 +199,27 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(table)) { - val metadata = externalCatalog.getTable(db, table) + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + val metadata = externalCatalog.getTable(db, name.table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(table) + tempTables.get(name.table) } - val qualifiedTable = SubqueryAlias(table, relation) + val qualifiedTable = SubqueryAlias(name.table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } /** - * Return whether a table with the specified name exists. - * - * Note: If a database is explicitly specified, then this will return whether the table - * exists in that particular database instead. In that case, even if there is a temporary - * table with the same name, we will return false if the specified database does not - * contain the table. - */ - def tableExists(name: TableIdentifier): Boolean = { - val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { - externalCatalog.tableExists(db, table) - } else { - true // it's a temporary table - } - } - - /** * List all tables in the specified database, including temporary tables. */ - def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + def listTables(db: String): Seq[TableIdentifier] = { + val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } + val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } + dbTables ++ _tempTables + } /** * List all matching tables in the specified database, including temporary tables. @@ -273,19 +235,6 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { } /** - * Refresh the cache entry for a metastore table, if any. - */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Drop all existing temporary tables. - * For testing only. - */ - def clearTempTables(): Unit = { - tempTables.clear() - } - - /** * Return a temporary table exactly as it was stored. * For testing only. */ @@ -314,8 +263,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.createPartitions(db, table, parts, ignoreIfExists) + externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) } /** @@ -327,8 +275,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) + externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) } /** @@ -342,8 +289,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.renamePartitions(db, table, specs, newSpecs) + externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) } /** @@ -357,8 +303,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.alterPartitions(db, table, parts) + externalCatalog.alterPartitions(db, tableName.table, parts) } /** @@ -367,8 +312,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.getPartition(db, table, spec) + externalCatalog.getPartition(db, tableName.table, spec) } /** @@ -377,8 +321,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.listPartitions(db, table) + externalCatalog.listPartitions(db, tableName.table) } // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 34803133f6..c4e49614c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,8 +91,6 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable - def tableExists(db: String, table: String): Boolean - def listTables(db: String): Seq[String] def listTables(db: String, pattern: String): Seq[String] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index afc2f327df..8b568b6dd6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -161,10 +161,14 @@ class AnalysisSuite extends AnalysisTest { } test("resolve relations") { - assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq()) + assertAnalysisError( + UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) + checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) + checkAnalysis( UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) + checkAnalysis( UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 6fa4beed99..39166c4f8e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -18,21 +18,26 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { - protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true) - protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false) + val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) - private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { - val conf = new SimpleCatalystConf(caseSensitive) - val catalog = new SessionCatalog(new InMemoryCatalog, conf) - catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) - new Analyzer(catalog, EmptyFunctionRegistry, conf) { + val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) + val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) + + caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) + caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) + + new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { + override val extendedResolutionRules = EliminateSubqueryAliases :: Nil + } -> + new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 31501864a8..9aa685e1e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -31,11 +30,11 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - private val catalog = new SessionCatalog(new InMemoryCatalog, conf) - private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) - private val relation = LocalRelation( + val relation = LocalRelation( AttributeReference("i", IntegerType)(), AttributeReference("d1", DecimalType(2, 1))(), AttributeReference("d2", DecimalType(5, 2))(), @@ -44,15 +43,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { AttributeReference("b", DoubleType)() ) - private val i: Expression = UnresolvedAttribute("i") - private val d1: Expression = UnresolvedAttribute("d1") - private val d2: Expression = UnresolvedAttribute("d2") - private val u: Expression = UnresolvedAttribute("u") - private val f: Expression = UnresolvedAttribute("f") - private val b: Expression = UnresolvedAttribute("b") + val i: Expression = UnresolvedAttribute("i") + val d1: Expression = UnresolvedAttribute("d1") + val d2: Expression = UnresolvedAttribute("d2") + val u: Expression = UnresolvedAttribute("u") + val f: Expression = UnresolvedAttribute("f") + val b: Expression = UnresolvedAttribute("b") before { - catalog.createTempTable("table", relation, ignoreIfExists = true) + catalog.registerTable(TableIdentifier("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 277c2d717e..a1ea61920d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -225,14 +225,13 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("list tables without pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1").toSet == Set.empty) assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) } test("list tables with pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } + intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 74e995cc5b..e1973ee258 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -397,24 +397,6 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } - test("table exists") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) - assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) - assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) - // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10, Seq()) - catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - // If database is not explicitly specified, check the current database - catalog.setCurrentDatabase("db2") - assert(catalog.tableExists(TableIdentifier("tbl1"))) - assert(catalog.tableExists(TableIdentifier("tbl2"))) - assert(catalog.tableExists(TableIdentifier("tbl3"))) - } - test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) @@ -447,7 +429,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) intercept[AnalysisException] { - catalog.listTables("unknown_db", "*") + catalog.listTables("unknown_db") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index e2c76b700f..2ab31eea8a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -138,11 +137,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } - private val caseInsensitiveConf = new SimpleCatalystConf(false) - private val caseInsensitiveAnalyzer = new Analyzer( - new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf), - EmptyFunctionRegistry, - caseInsensitiveConf) + private val caseInsensitiveAnalyzer = + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 3824c67563..a4c8d1c6d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) - val catalog = new SessionCatalog(new InMemoryCatalog, conf) + val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { |