aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-03-23 22:21:15 -0700
committerAndrew Or <andrew@databricks.com>2016-03-23 22:21:15 -0700
commitc44d140cae99d0b880e6d25f158125ad3adc6a05 (patch)
tree7f0e5324e67efeff2cccf661cd27a21c3618098c /sql/catalyst
parentcf823bead18c5be86b36da59b4bbf935c4804d04 (diff)
downloadspark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.gz
spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.bz2
spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.zip
Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog"
This reverts commit 5dfc01976bb0d72489620b4f32cc12d620bb6260.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala218
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala123
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala25
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala20
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala5
13 files changed, 319 insertions, 174 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 54543eebb7..07b0f5ee70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -37,22 +36,23 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.types._
/**
- * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
- * Used for testing when all relations are already filled in and the analyzer needs only
- * to resolve attribute references.
+ * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
+ * when all relations are already filled in and the analyzer needs only to resolve attribute
+ * references.
*/
object SimpleAnalyzer
- extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
-class SimpleAnalyzer(conf: CatalystConf)
- extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)
+ extends Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = true))
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
- * [[UnresolvedRelation]]s into fully typed objects using information in a
- * [[SessionCatalog]] and a [[FunctionRegistry]].
+ * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
+ * a [[FunctionRegistry]].
*/
class Analyzer(
- catalog: SessionCatalog,
+ catalog: Catalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
new file mode 100644
index 0000000000..2f0a4dbc10
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+
+
+/**
+ * An interface for looking up relations by name. Used by an [[Analyzer]].
+ */
+trait Catalog {
+
+ val conf: CatalystConf
+
+ def tableExists(tableIdent: TableIdentifier): Boolean
+
+ def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
+
+ def setCurrentDatabase(databaseName: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ /**
+ * Returns tuples of (tableName, isTemporary) for all tables in the given database.
+ * isTemporary is a Boolean value indicates if a table is a temporary or not.
+ */
+ def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
+
+ def refreshTable(tableIdent: TableIdentifier): Unit
+
+ def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
+
+ def unregisterTable(tableIdent: TableIdentifier): Unit
+
+ def unregisterAllTables(): Unit
+
+ /**
+ * Get the table name of TableIdentifier for temporary tables.
+ */
+ protected def getTableName(tableIdent: TableIdentifier): String = {
+ // It is not allowed to specify database name for temporary tables.
+ // We check it here and throw exception if database is defined.
+ if (tableIdent.database.isDefined) {
+ throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
+ "for temporary tables. If the table name has dots (.) in it, please quote the " +
+ "table name with backticks (`).")
+ }
+ if (conf.caseSensitiveAnalysis) {
+ tableIdent.table
+ } else {
+ tableIdent.table.toLowerCase
+ }
+ }
+}
+
+class SimpleCatalog(val conf: CatalystConf) extends Catalog {
+ private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
+
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ tables.put(getTableName(tableIdent), plan)
+ }
+
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ tables.remove(getTableName(tableIdent))
+ }
+
+ override def unregisterAllTables(): Unit = {
+ tables.clear()
+ }
+
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ tables.containsKey(getTableName(tableIdent))
+ }
+
+ override def lookupRelation(
+ tableIdent: TableIdentifier,
+ alias: Option[String] = None): LogicalPlan = {
+ val tableName = getTableName(tableIdent)
+ val table = tables.get(tableName)
+ if (table == null) {
+ throw new AnalysisException("Table not found: " + tableName)
+ }
+ val qualifiedTable = SubqueryAlias(tableName, table)
+
+ // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
+ // properly qualified with this alias.
+ alias
+ .map(a => SubqueryAlias(a, qualifiedTable))
+ .getOrElse(qualifiedTable)
+ }
+
+ override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
+ tables.keySet().asScala.map(_ -> true).toSeq
+ }
+
+ override def refreshTable(tableIdent: TableIdentifier): Unit = {
+ throw new UnsupportedOperationException
+ }
+}
+
+/**
+ * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with
+ * new logical plans. This can be used to bind query result to virtual tables, or replace tables
+ * with in-memory cached versions. Note that the set of overrides is stored in memory and thus
+ * lost when the JVM exits.
+ */
+trait OverrideCatalog extends Catalog {
+ private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
+
+ private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
+ if (tableIdent.database.isDefined) {
+ None
+ } else {
+ Option(overrides.get(getTableName(tableIdent)))
+ }
+ }
+
+ abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ getOverriddenTable(tableIdent) match {
+ case Some(_) => true
+ case None => super.tableExists(tableIdent)
+ }
+ }
+
+ abstract override def lookupRelation(
+ tableIdent: TableIdentifier,
+ alias: Option[String] = None): LogicalPlan = {
+ getOverriddenTable(tableIdent) match {
+ case Some(table) =>
+ val tableName = getTableName(tableIdent)
+ val qualifiedTable = SubqueryAlias(tableName, table)
+
+ // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
+ // are properly qualified with this alias.
+ alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
+
+ case None => super.lookupRelation(tableIdent, alias)
+ }
+ }
+
+ abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
+ overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
+ }
+
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ overrides.put(getTableName(tableIdent), plan)
+ }
+
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ if (tableIdent.database.isEmpty) {
+ overrides.remove(getTableName(tableIdent))
+ }
+ }
+
+ override def unregisterAllTables(): Unit = {
+ overrides.clear()
+ }
+}
+
+/**
+ * A trivial catalog that returns an error when a relation is requested. Used for testing when all
+ * relations are already filled in and the analyzer needs only to resolve attribute references.
+ */
+object EmptyCatalog extends Catalog {
+
+ override val conf: CatalystConf = EmptyConf
+
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ throw new UnsupportedOperationException
+ }
+
+ override def lookupRelation(
+ tableIdent: TableIdentifier,
+ alias: Option[String] = None): LogicalPlan = {
+ throw new UnsupportedOperationException
+ }
+
+ override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def unregisterAllTables(): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def refreshTable(tableIdent: TableIdentifier): Unit = {
+ throw new UnsupportedOperationException
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index e73d367a73..9518309fbf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
/**
- * Holds the name of a relation that has yet to be looked up in a catalog.
+ * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index e216fa5528..7ead1ddebe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -52,34 +52,37 @@ class InMemoryCatalog extends ExternalCatalog {
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}
- private def functionExists(db: String, funcName: String): Boolean = {
+ private def existsFunction(db: String, funcName: String): Boolean = {
requireDbExists(db)
catalog(db).functions.contains(funcName)
}
- private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+ private def existsTable(db: String, table: String): Boolean = {
+ requireDbExists(db)
+ catalog(db).tables.contains(table)
+ }
+
+ private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}
private def requireFunctionExists(db: String, funcName: String): Unit = {
- if (!functionExists(db, funcName)) {
- throw new AnalysisException(
- s"Function not found: '$funcName' does not exist in database '$db'")
+ if (!existsFunction(db, funcName)) {
+ throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
}
}
private def requireTableExists(db: String, table: String): Unit = {
- if (!tableExists(db, table)) {
- throw new AnalysisException(
- s"Table not found: '$table' does not exist in database '$db'")
+ if (!existsTable(db, table)) {
+ throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
- if (!partitionExists(db, table, spec)) {
+ if (!existsPartition(db, table, spec)) {
throw new AnalysisException(
- s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
+ s"Partition does not exist in database '$db' table '$table': '$spec'")
}
}
@@ -156,7 +159,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
- if (tableExists(db, table)) {
+ if (existsTable(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
@@ -170,7 +173,7 @@ class InMemoryCatalog extends ExternalCatalog {
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
requireDbExists(db)
- if (tableExists(db, table)) {
+ if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
@@ -197,17 +200,13 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}
- override def tableExists(db: String, table: String): Boolean = synchronized {
- requireDbExists(db)
- catalog(db).tables.contains(table)
- }
-
override def listTables(db: String): Seq[String] = synchronized {
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
+ requireDbExists(db)
filterPattern(listTables(db), pattern)
}
@@ -296,7 +295,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (functionExists(db, func.name.funcName)) {
+ if (existsFunction(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 34265faa74..3ac2bcf7e8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -32,34 +31,17 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
*/
-class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
+class SessionCatalog(externalCatalog: ExternalCatalog) {
import ExternalCatalog._
- def this(externalCatalog: ExternalCatalog) {
- this(externalCatalog, new SimpleCatalystConf(true))
- }
-
- protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
- protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+ private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+ private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
- protected[this] var currentDb = {
- val defaultName = "default"
- val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
- // Initialize default database if it doesn't already exist
- createDatabase(defaultDbDefinition, ignoreIfExists = true)
- defaultName
- }
-
- /**
- * Format table name, taking into account case sensitivity.
- */
- protected[this] def formatTableName(name: String): String = {
- if (conf.caseSensitiveAnalysis) name else name.toLowerCase
- }
+ private[this] var currentDb = "default"
// ----------------------------------------------------------------------------
// Databases
@@ -123,8 +105,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
- val table = formatTableName(tableDefinition.name.table)
- val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
+ val newTableDefinition = tableDefinition.copy(
+ name = TableIdentifier(tableDefinition.name.table, Some(db)))
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
}
@@ -139,8 +121,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
- val table = formatTableName(tableDefinition.name.table)
- val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
+ val newTableDefinition = tableDefinition.copy(
+ name = TableIdentifier(tableDefinition.name.table, Some(db)))
externalCatalog.alterTable(db, newTableDefinition)
}
@@ -150,8 +132,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def getTable(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(currentDb)
- val table = formatTableName(name.table)
- externalCatalog.getTable(db, table)
+ externalCatalog.getTable(db, name.table)
}
// -------------------------------------------------------------
@@ -165,11 +146,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
name: String,
tableDefinition: LogicalPlan,
ignoreIfExists: Boolean): Unit = {
- val table = formatTableName(name)
- if (tempTables.containsKey(table) && !ignoreIfExists) {
+ if (tempTables.containsKey(name) && !ignoreIfExists) {
throw new AnalysisException(s"Temporary table '$name' already exists.")
}
- tempTables.put(table, tableDefinition)
+ tempTables.put(name, tableDefinition)
}
/**
@@ -186,13 +166,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
throw new AnalysisException("rename does not support moving tables across databases")
}
val db = oldName.database.getOrElse(currentDb)
- val oldTableName = formatTableName(oldName.table)
- val newTableName = formatTableName(newName.table)
- if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
- externalCatalog.renameTable(db, oldTableName, newTableName)
+ if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
+ externalCatalog.renameTable(db, oldName.table, newName.table)
} else {
- val table = tempTables.remove(oldTableName)
- tempTables.put(newTableName, table)
+ val table = tempTables.remove(oldName.table)
+ tempTables.put(newName.table, table)
}
}
@@ -205,11 +183,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
- val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.containsKey(table)) {
- externalCatalog.dropTable(db, table, ignoreIfNotExists)
+ if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+ externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
} else {
- tempTables.remove(table)
+ tempTables.remove(name.table)
}
}
@@ -222,42 +199,27 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
val db = name.database.getOrElse(currentDb)
- val table = formatTableName(name.table)
val relation =
- if (name.database.isDefined || !tempTables.containsKey(table)) {
- val metadata = externalCatalog.getTable(db, table)
+ if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+ val metadata = externalCatalog.getTable(db, name.table)
CatalogRelation(db, metadata, alias)
} else {
- tempTables.get(table)
+ tempTables.get(name.table)
}
- val qualifiedTable = SubqueryAlias(table, relation)
+ val qualifiedTable = SubqueryAlias(name.table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
}
/**
- * Return whether a table with the specified name exists.
- *
- * Note: If a database is explicitly specified, then this will return whether the table
- * exists in that particular database instead. In that case, even if there is a temporary
- * table with the same name, we will return false if the specified database does not
- * contain the table.
- */
- def tableExists(name: TableIdentifier): Boolean = {
- val db = name.database.getOrElse(currentDb)
- val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.containsKey(table)) {
- externalCatalog.tableExists(db, table)
- } else {
- true // it's a temporary table
- }
- }
-
- /**
* List all tables in the specified database, including temporary tables.
*/
- def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
+ def listTables(db: String): Seq[TableIdentifier] = {
+ val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
+ val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
+ dbTables ++ _tempTables
+ }
/**
* List all matching tables in the specified database, including temporary tables.
@@ -273,19 +235,6 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
}
/**
- * Refresh the cache entry for a metastore table, if any.
- */
- def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
-
- /**
- * Drop all existing temporary tables.
- * For testing only.
- */
- def clearTempTables(): Unit = {
- tempTables.clear()
- }
-
- /**
* Return a temporary table exactly as it was stored.
* For testing only.
*/
@@ -314,8 +263,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
+ externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
}
/**
@@ -327,8 +275,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
+ externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
}
/**
@@ -342,8 +289,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.renamePartitions(db, table, specs, newSpecs)
+ externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
}
/**
@@ -357,8 +303,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.alterPartitions(db, table, parts)
+ externalCatalog.alterPartitions(db, tableName.table, parts)
}
/**
@@ -367,8 +312,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.getPartition(db, table, spec)
+ externalCatalog.getPartition(db, tableName.table, spec)
}
/**
@@ -377,8 +321,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
val db = tableName.database.getOrElse(currentDb)
- val table = formatTableName(tableName.table)
- externalCatalog.listPartitions(db, table)
+ externalCatalog.listPartitions(db, tableName.table)
}
// ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 34803133f6..c4e49614c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -91,8 +91,6 @@ abstract class ExternalCatalog {
def getTable(db: String, table: String): CatalogTable
- def tableExists(db: String, table: String): Boolean
-
def listTables(db: String): Seq[String]
def listTables(db: String, pattern: String): Seq[String]
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index afc2f327df..8b568b6dd6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -161,10 +161,14 @@ class AnalysisSuite extends AnalysisTest {
}
test("resolve relations") {
- assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
+ assertAnalysisError(
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
+
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
+
checkAnalysis(
UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
+
checkAnalysis(
UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 6fa4beed99..39166c4f8e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -18,21 +18,26 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {
- protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true)
- protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false)
+ val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
+ val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+ val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
- private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
- val conf = new SimpleCatalystConf(caseSensitive)
- val catalog = new SessionCatalog(new InMemoryCatalog, conf)
- catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
- new Analyzer(catalog, EmptyFunctionRegistry, conf) {
+ val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
+ val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
+
+ caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+ caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+
+ new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
+ override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
+ } ->
+ new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 31501864a8..9aa685e1e8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -31,11 +30,11 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
- private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
- private val catalog = new SessionCatalog(new InMemoryCatalog, conf)
- private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+ val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+ val catalog = new SimpleCatalog(conf)
+ val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
- private val relation = LocalRelation(
+ val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
AttributeReference("d1", DecimalType(2, 1))(),
AttributeReference("d2", DecimalType(5, 2))(),
@@ -44,15 +43,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
AttributeReference("b", DoubleType)()
)
- private val i: Expression = UnresolvedAttribute("i")
- private val d1: Expression = UnresolvedAttribute("d1")
- private val d2: Expression = UnresolvedAttribute("d2")
- private val u: Expression = UnresolvedAttribute("u")
- private val f: Expression = UnresolvedAttribute("f")
- private val b: Expression = UnresolvedAttribute("b")
+ val i: Expression = UnresolvedAttribute("i")
+ val d1: Expression = UnresolvedAttribute("d1")
+ val d2: Expression = UnresolvedAttribute("d2")
+ val u: Expression = UnresolvedAttribute("u")
+ val f: Expression = UnresolvedAttribute("f")
+ val b: Expression = UnresolvedAttribute("b")
before {
- catalog.createTempTable("table", relation, ignoreIfExists = true)
+ catalog.registerTable(TableIdentifier("table"), relation)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 277c2d717e..a1ea61920d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -225,14 +225,13 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("list tables without pattern") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] { catalog.listTables("unknown_db") }
assert(catalog.listTables("db1").toSet == Set.empty)
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
}
test("list tables with pattern") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
+ intercept[AnalysisException] { catalog.listTables("unknown_db") }
assert(catalog.listTables("db1", "*").toSet == Set.empty)
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 74e995cc5b..e1973ee258 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -397,24 +397,6 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
}
- test("table exists") {
- val catalog = new SessionCatalog(newBasicCatalog())
- assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
- assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
- assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
- assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
- assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
- // If database is explicitly specified, do not check temporary tables
- val tempTable = Range(1, 10, 1, 10, Seq())
- catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false)
- assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
- // If database is not explicitly specified, check the current database
- catalog.setCurrentDatabase("db2")
- assert(catalog.tableExists(TableIdentifier("tbl1")))
- assert(catalog.tableExists(TableIdentifier("tbl2")))
- assert(catalog.tableExists(TableIdentifier("tbl3")))
- }
-
test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
@@ -447,7 +429,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.listTables("db2", "*1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
intercept[AnalysisException] {
- catalog.listTables("unknown_db", "*")
+ catalog.listTables("unknown_db")
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index e2c76b700f..2ab31eea8a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -138,11 +137,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
}
- private val caseInsensitiveConf = new SimpleCatalystConf(false)
- private val caseInsensitiveAnalyzer = new Analyzer(
- new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf),
- EmptyFunctionRegistry,
- caseInsensitiveConf)
+ private val caseInsensitiveAnalyzer =
+ new Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = false))
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 3824c67563..a4c8d1c6d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.SimpleCatalystConf
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
- val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+ val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
object Optimize extends RuleExecutor[LogicalPlan] {