/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import java.net.URI
import java.util.Locale
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
object SessionCatalog {
val DEFAULT_DATABASE = "default"
}
/**
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
*
* This class must be thread-safe.
*/
class SessionCatalog(
val externalCatalog: ExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader) extends Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec
// For testing only.
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf) {
this(
externalCatalog,
new GlobalTempViewManager("global_temp"),
functionRegistry,
conf,
new Configuration(),
CatalystSqlParser,
DummyFunctionResourceLoader)
}
// For testing only.
def this(externalCatalog: ExternalCatalog) {
this(
externalCatalog,
new SimpleFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
}
/** List of temporary tables, mapping from table name to their logical plan. */
@GuardedBy("this")
protected val tempTables = new mutable.HashMap[String, LogicalPlan]
// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
* i.e. if this name only contains characters, numbers, and _.
*
* This method is intended to have the same behavior of
* org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
*/
private def validateName(name: String): Unit = {
val validNameFormat = "([\\w_]+)".r
if (!validNameFormat.pattern.matcher(name).matches()) {
throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
"Valid names only contain alphabet characters, numbers and _.")
}
}
/**
* Format table name, taking into account case sensitivity.
*/
protected[this] def formatTableName(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}
/**
* Format database name, taking into account case sensitivity.
*/
protected[this] def formatDatabaseName(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}
/**
* A cache of qualified table names to table relation plans.
*/
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}
/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri
}
private def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new NoSuchDatabaseException(db)
}
}
private def requireTableExists(name: TableIdentifier): Unit = {
if (!tableExists(name)) {
val db = name.database.getOrElse(currentDb)
throw new NoSuchTableException(db = db, table = name.table)
}
}
private def requireTableNotExists(name: TableIdentifier): Unit = {
if (tableExists(name)) {
val db = name.database.getOrElse(currentDb)
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}
private def checkDuplication(fields: Seq[StructField]): Unit = {
val columnNames = if (conf.caseSensitiveAnalysis) {
fields.map(_.name)
} else {
fields.map(_.name.toLowerCase)
}
if (columnNames.distinct.length != columnNames.length) {
val duplicateColumns = columnNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
}
}
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
// All methods in this category interact directly with the underlying catalog.
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
if (dbName == globalTempViewManager.database) {
throw new AnalysisException(
s"${globalTempViewManager.database} is a system preserved database, " +
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
throw new AnalysisException(s"Can not drop default database")
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
}
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
}
def getDatabaseMetadata(db: String): CatalogDatabase = {
val dbName = formatDatabaseName(db)
requireDbExists(dbName)
externalCatalog.getDatabase(dbName)
}
def databaseExists(db: String): Boolean = {
val dbName = formatDatabaseName(db)
externalCatalog.databaseExists(dbName)
}
def listDatabases(): Seq[String] = {
externalCatalog.listDatabases()
}
def listDatabases(pattern: String): Seq[String] = {
externalCatalog.listDatabases(pattern)
}
def getCurrentDatabase: String = synchronized { currentDb }
def setCurrentDatabase(db: String): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == globalTempViewManager.database) {
throw new AnalysisException(
s"${globalTempViewManager.database} is a system preserved database, " +
"you cannot use it as current database. To access global temporary views, you should " +
"use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
s"${globalTempViewManager.database}.viewName.")
}
requireDbExists(dbName)
synchronized { currentDb = dbName }
}
/**
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toUri
}
// ----------------------------------------------------------------------------
// Tables
// ----------------------------------------------------------------------------
// There are two kinds of tables, temporary tables and metastore tables.
// Temporary tables are isolated across sessions and do not belong to any
// particular database. Metastore tables can be used across multiple
// sessions as their metadata is persisted in the underlying catalog.
// ----------------------------------------------------------------------------
// ----------------------------------------------------
// | Methods that interact with metastore tables only |
// ----------------------------------------------------
/**
* Create a metastore table in the database specified in `tableDefinition`.
* If no such database is specified, create it in the current database.
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = TableIdentifier(table, Some(db)))
} else {
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
}
requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}
/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
* If no database is specified in `tableDefinition`, assume the table is in the
* current database.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier)
requireDbExists(db)
requireTableExists(tableIdentifier)
externalCatalog.alterTable(newTableDefinition)
}
/**
* Alter the schema of a table identified by the provided table identifier. The new schema
* should still contain the existing bucket columns and partition columns used by the table. This
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
* as the schema itself).
*
* @param identifier TableIdentifier
* @param newSchema Updated schema to be used for the table (must contain existing partition and
* bucket columns, and partition columns need to be at the end)
*/
def alterTableSchema(
identifier: TableIdentifier,
newSchema: StructType): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
requireDbExists(db)
requireTableExists(tableIdentifier)
checkDuplication(newSchema)
val catalogTable = externalCatalog.getTable(db, table)
val oldSchema = catalogTable.schema
// not supporting dropping columns yet
val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
if (nonExistentColumnNames.nonEmpty) {
throw new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
}
// assuming the newSchema has all partition columns at the end as required
externalCatalog.alterTableSchema(db, table, newSchema)
}
private def columnNameResolved(schema: StructType, colName: String): Boolean = {
schema.fields.map(_.name).exists(conf.resolver(_, colName))
}
/**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
externalCatalog.tableExists(db, table)
}
/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then return None if it doesn't exist.
*/
def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
externalCatalog.getTableOption(db, table)
}
/**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
*/
def loadTable(
name: TableIdentifier,
loadPath: String,
isOverwrite: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal)
}
/**
* Load files stored in given path into the partition of an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
*/
def loadPartition(
name: TableIdentifier,
loadPath: String,
spec: TablePartitionSpec,
isOverwrite: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
}
def defaultTablePath(tableIdent: TableIdentifier): URI = {
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
val dbLocation = getDatabaseMetadata(dbName).locationUri
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
}
// ----------------------------------------------
// | Methods that interact with temp views only |
// ----------------------------------------------
/**
* Create a local temporary view.
*/
def createTempView(
name: String,
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempTables.contains(table) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
tempTables.put(table, tableDefinition)
}
/**
* Create a global temporary view.
*/
def createGlobalTempView(
name: String,
viewDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = {
globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
}
/**
* Alter the definition of a local/global temp view matching the given name, returns true if a
* temp view is matched and altered, false otherwise.
*/
def alterTempViewDefinition(
name: TableIdentifier,
viewDefinition: LogicalPlan): Boolean = synchronized {
val viewName = formatTableName(name.table)
if (name.database.isEmpty) {
if (tempTables.contains(viewName)) {
createTempView(viewName, viewDefinition, overrideIfExists = true)
true
} else {
false
}
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.update(viewName, viewDefinition)
} else {
false
}
}
/**
* Return a local temporary view exactly as it was stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempTables.get(formatTableName(name))
}
/**
* Return a global temporary view exactly as it was stored.
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
globalTempViewManager.get(formatTableName(name))
}
/**
* Drop a local temporary view.
*
* Returns true if this view is dropped successfully, false otherwise.
*/
def dropTempView(name: String): Boolean = synchronized {
tempTables.remove(formatTableName(name)).isDefined
}
/**
* Drop a global temporary view.
*
* Returns true if this view is dropped successfully, false otherwise.
*/
def dropGlobalTempView(name: String): Boolean = {
globalTempViewManager.remove(formatTableName(name))
}
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
/**
* Retrieve the metadata of an existing temporary view or permanent table/view.
*
* If a database is specified in `name`, this will return the metadata of table/view in that
* database.
* If no database is specified, this will first attempt to get the metadata of a temporary view
* with the same name, then, if that does not exist, return the metadata of table/view in the
* current database.
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
if (name.database.isEmpty) {
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
} else {
getTableMetadata(name)
}
}
/**
* Rename a table.
*
* If a database is specified in `oldName`, this will rename the table in that database.
* If no database is specified, this will first attempt to rename a temporary table with
* the same name, then, if that does not exist, rename the table in the current database.
*
* This assumes the database specified in `newName` matches the one in `oldName`.
*/
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
newName.database.map(formatDatabaseName).foreach { newDb =>
if (db != newDb) {
throw new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
}
}
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName.table)
if (db == globalTempViewManager.database) {
globalTempViewManager.rename(oldTableName, newTableName)
} else {
requireDbExists(db)
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
requireTableExists(TableIdentifier(oldTableName, Some(db)))
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
validateName(newTableName)
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " +
s"name '${newName.database.get}' in the destination table")
}
if (tempTables.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
"destination table already exists")
}
val table = tempTables(oldTableName)
tempTables.remove(oldTableName)
tempTables.put(newTableName, table)
}
}
}
/**
* Drop a table.
*
* If a database is specified in `name`, this will drop the table from that database.
* If no database is specified, this will first attempt to drop a temporary table with
* the same name, then, if that does not exist, drop the table from the current database.
*/
def dropTable(
name: TableIdentifier,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (db == globalTempViewManager.database) {
val viewExists = globalTempViewManager.remove(table)
if (!viewExists && !ignoreIfNotExists) {
throw new NoSuchTableException(globalTempViewManager.database, table)
}
} else {
if (name.database.isDefined || !tempTables.contains(table)) {
requireDbExists(db)
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
if (tableExists(TableIdentifier(table, Option(db)))) {
externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
} else if (!ignoreIfNotExists) {
throw new NoSuchTableException(db = db, table = table)
}
} else {
tempTables.remove(table)
}
}
}
/**
* Return a [[LogicalPlan]] that represents the given table or view.
*
* If a database is specified in `name`, this will return the table/view from that database.
* If no database is specified, this will first attempt to return a temporary table/view with
* the same name, then, if that does not exist, return the table/view from the current database.
*
* Note that, the global temp view database is also valid here, this will return the global temp
* view matching the given name.
*
* If the relation is a view, we generate a [[View]] operator from the view description, and
* wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view.
*
* @param name The name of the table/view that we look up.
*/
def lookupRelation(name: TableIdentifier): LogicalPlan = {
synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(table, viewDef)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
if (metadata.tableType == CatalogTableType.VIEW) {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
val child = View(
desc = metadata,
output = metadata.schema.toAttributes,
child = parser.parsePlan(viewText))
SubqueryAlias(table, child)
} else {
val tableRelation = CatalogRelation(
metadata,
// we assume all the columns are nullable.
metadata.dataSchema.asNullable.toAttributes,
metadata.partitionSchema.asNullable.toAttributes)
SubqueryAlias(table, tableRelation)
}
} else {
SubqueryAlias(table, tempTables(table))
}
}
}
/**
* Return whether a table with the specified name is a temporary table.
*
* Note: The temporary table cache is checked only when database is not
* explicitly specified.
*/
def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
val table = formatTableName(name.table)
if (name.database.isEmpty) {
tempTables.contains(table)
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).isDefined
} else {
false
}
}
/**
* List all tables in the specified database, including local temporary tables.
*
* Note that, if the specified database is global temporary view database, we will list global
* temporary views.
*/
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
/**
* List all matching tables in the specified database, including local temporary tables.
*
* Note that, if the specified database is global temporary view database, we will list global
* temporary views.
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbName = formatDatabaseName(db)
val dbTables = if (dbName == globalTempViewManager.database) {
globalTempViewManager.listViewNames(pattern).map { name =>
TableIdentifier(name, Some(globalTempViewManager.database))
}
} else {
requireDbExists(dbName)
externalCatalog.listTables(dbName, pattern).map { name =>
TableIdentifier(name, Some(dbName))
}
}
val localTempViews = synchronized {
StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
TableIdentifier(name)
}
}
dbTables ++ localTempViews
}
/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)
// Go through temporary tables and invalidate them.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}
// Also invalidate the table relation cache.
val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}
/**
* Drop all existing temporary tables.
* For testing only.
*/
def clearTempTables(): Unit = synchronized {
tempTables.clear()
}
// ----------------------------------------------------------------------------
// Partitions
// ----------------------------------------------------------------------------
// All methods in this category interact directly with the underlying catalog.
// These methods are concerned with only metastore tables.
// ----------------------------------------------------------------------------
// TODO: We need to figure out how these methods interact with our data source
// tables. For such tables, we do not store values of partitioning columns in
// the metastore. For now, partition values of a data source table will be
// automatically discovered when we load the table.
/**
* Create partitions in an existing table, assuming it exists.
* If no database is specified, assume the table is in the current database.
*/
def createPartitions(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
/**
* Drop partitions from a table, assuming they exist.
* If no database is specified, assume the table is in the current database.
*/
def dropPartitions(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}
/**
* Override the specs of one or many existing table partitions, assuming they exist.
*
* This assumes index i of `specs` corresponds to index i of `newSpecs`.
* If no database is specified, assume the table is in the current database.
*/
def renamePartitions(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val tableMetadata = getTableMetadata(tableName)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
requireNonEmptyValueInPartitionSpec(specs)
requireNonEmptyValueInPartitionSpec(newSpecs)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
/**
* Alter one or many table partitions whose specs that match those specified in `parts`,
* assuming the partitions exist.
*
* If no database is specified, assume the table is in the current database.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.alterPartitions(db, table, parts)
}
/**
* Retrieve the metadata of a table partition, assuming it exists.
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.getPartition(db, table, spec)
}
/**
* List the names of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
*/
def listPartitionNames(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitionNames(db, table, partialSpec)
}
/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
*/
def listPartitions(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitions(db, table, partialSpec)
}
/**
* List the metadata of partitions that belong to the specified table, assuming it exists, that
* satisfy the given partition-pruning predicate expressions.
*/
def listPartitionsByFilter(
tableName: TableIdentifier,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone)
}
/**
* Verify if the input partition spec has any empty value.
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
}
}
}
/**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
*/
private def requireExactMatchedPartitionSpec(
specs: Seq[TablePartitionSpec],
table: CatalogTable): Unit = {
val defined = table.partitionColumnNames.sorted
specs.foreach { s =>
if (s.keys.toSeq.sorted != defined) {
throw new AnalysisException(
s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
s"table '${table.identifier}'")
}
}
}
/**
* Verify if the input partition spec partially matches the existing defined partition spec
* That is, the columns of partition spec should be part of the defined partition spec.
*/
private def requirePartialMatchedPartitionSpec(
specs: Seq[TablePartitionSpec],
table: CatalogTable): Unit = {
val defined = table.partitionColumnNames
specs.foreach { s =>
if (!s.keys.forall(defined.contains)) {
throw new AnalysisException(
s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " +
s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
s"in table '${table.identifier}'")
}
}
}
// ----------------------------------------------------------------------------
// Functions
// ----------------------------------------------------------------------------
// There are two kinds of functions, temporary functions and metastore
// functions (permanent UDFs). Temporary functions are isolated across
// sessions. Metastore functions can be used across multiple sessions as
// their metadata is persisted in the underlying catalog.
// ----------------------------------------------------------------------------
// -------------------------------------------------------
// | Methods that interact with metastore functions only |
// -------------------------------------------------------
/**
* Create a metastore function in the database specified in `funcDefinition`.
* If no such database is specified, create it in the current database.
*/
def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
requireDbExists(db)
val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
val newFuncDefinition = funcDefinition.copy(identifier = identifier)
if (!functionExists(identifier)) {
externalCatalog.createFunction(db, newFuncDefinition)
} else if (!ignoreIfExists) {
throw new FunctionAlreadyExistsException(db = db, func = identifier.toString)
}
}
/**
* Drop a metastore function.
* If no database is specified, assume the function is in the current database.
*/
def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
requireDbExists(db)
val identifier = name.copy(database = Some(db))
if (functionExists(identifier)) {
// TODO: registry should just take in FunctionIdentifier for type safety
if (functionRegistry.functionExists(identifier.unquotedString)) {
// If we have loaded this function into the FunctionRegistry,
// also drop it from there.
// For a permanent function, because we loaded it to the FunctionRegistry
// when it's first used, we also need to drop it from the FunctionRegistry.
functionRegistry.dropFunction(identifier.unquotedString)
}
externalCatalog.dropFunction(db, name.funcName)
} else if (!ignoreIfNotExists) {
throw new NoSuchFunctionException(db = db, func = identifier.toString)
}
}
/**
* Retrieve the metadata of a metastore function.
*
* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
requireDbExists(db)
externalCatalog.getFunction(db, name.funcName)
}
/**
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
requireDbExists(db)
functionRegistry.functionExists(name.unquotedString) ||
externalCatalog.functionExists(db, name.funcName)
}
// ----------------------------------------------------------------
// | Methods that interact with temporary and metastore functions |
// ----------------------------------------------------------------
/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
// TODO: at least support UDAFs here
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
}
/**
* Loads resources such as JARs and Files for a function. Every resource is represented
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
resources.foreach(functionResourceLoader.loadResource)
}
/**
* Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
*/
def registerFunction(
funcDefinition: CatalogFunction,
ignoreIfExists: Boolean,
functionBuilder: Option[FunctionBuilder] = None): Unit = {
val func = funcDefinition.identifier
if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists")
}
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
val builder =
functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
functionRegistry.registerFunction(func.unquotedString, info, builder)
}
/**
* Drop a temporary function.
*/
def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
throw new NoSuchTempFunctionException(name)
}
}
/**
* Returns whether it is a temporary function. If not existed, returns false.
*/
def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
// copied from HiveSessionCatalog
val hiveFunctions = Seq("histogram_numeric")
// A temporary function is a function that has been registered in functionRegistry
// without a database name, and is neither a built-in function nor a Hive function
name.database.isEmpty &&
functionRegistry.functionExists(name.funcName) &&
!FunctionRegistry.builtin.functionExists(name.funcName) &&
!hiveFunctions.contains(name.funcName.toLowerCase(Locale.ROOT))
}
protected def failFunctionLookup(name: String): Nothing = {
throw new NoSuchFunctionException(db = currentDb, func = name)
}
/**
* Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists.
*/
def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized {
// TODO: just make function registry take in FunctionIdentifier instead of duplicating this
val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
functionRegistry.lookupFunction(name.funcName)
.orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
.getOrElse {
val db = qualifiedName.database.get
requireDbExists(db)
if (externalCatalog.functionExists(db, name.funcName)) {
val metadata = externalCatalog.getFunction(db, name.funcName)
new ExpressionInfo(
metadata.className,
qualifiedName.database.orNull,
qualifiedName.identifier)
} else {
failFunctionLookup(name.funcName)
}
}
}
/**
* Return an [[Expression]] that represents the specified function, assuming it exists.
*
* For a temporary function or a permanent function that has been loaded,
* this method will simply lookup the function through the
* FunctionRegistry and create an expression based on the builder.
*
* For a permanent function that has not been loaded, we will first fetch its metadata
* from the underlying external catalog. Then, we will load all resources associated
* with this function (i.e. jars and files). Finally, we create a function builder
* based on the function class and put the builder into the FunctionRegistry.
* The name of this function in the FunctionRegistry will be `databaseName.functionName`.
*/
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
if (name.database.isEmpty && functionRegistry.functionExists(name.funcName)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name.funcName, children)
}
// If the name itself is not qualified, add the current database to it.
val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
if (functionRegistry.functionExists(qualifiedName.unquotedString)) {
// This function has been already loaded into the function registry.
// Unlike the above block, we find this function by using the qualified name.
return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
// TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(currentDb, name.funcName)
} catch {
case e: AnalysisException => failFunctionLookup(name.funcName)
case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
/**
* List all functions in the specified database, including temporary functions. This
* returns the function identifier and the scope in which it was defined (system or user
* defined).
*/
def listFunctions(db: String): Seq[(FunctionIdentifier, String)] = listFunctions(db, "*")
/**
* List all matching functions in the specified database, including temporary functions. This
* returns the function identifier and the scope in which it was defined (system or user
* defined).
*/
def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
val dbName = formatDatabaseName(db)
requireDbExists(dbName)
val dbFunctions = externalCatalog.listFunctions(dbName, pattern)
.map { f => FunctionIdentifier(f, Some(dbName)) }
val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
.map { f => FunctionIdentifier(f) }
val functions = dbFunctions ++ loadedFunctions
functions.map {
case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM")
case f => (f, "USER")
}
}
// -----------------
// | Other methods |
// -----------------
/**
* Drop all existing databases (except "default"), tables, partitions and functions,
* and set the current database to "default".
*
* This is mainly used for tests.
*/
def reset(): Unit = synchronized {
setCurrentDatabase(DEFAULT_DATABASE)
externalCatalog.setCurrentDatabase(DEFAULT_DATABASE)
listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
listTables(DEFAULT_DATABASE).foreach { table =>
dropTable(table, ignoreIfNotExists = false, purge = false)
}
listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
if (func.database.isDefined) {
dropFunction(func, ignoreIfNotExists = false)
} else {
dropTempFunction(func.funcName, ignoreIfNotExists = false)
}
}
tempTables.clear()
globalTempViewManager.clear()
functionRegistry.clear()
// restore built-in functions
FunctionRegistry.builtin.listFunction().foreach { f =>
val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
}
}
/**
* Copy the current state of the catalog to another catalog.
*
* This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied
* state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be
* because the target [[SessionCatalog]] should not be published at this point. The caller must
* synchronize on the target if this assumption does not hold.
*/
private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized {
target.currentDb = currentDb
// copy over temporary tables
tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
}
}