aboutsummaryrefslogblamecommitdiff
path: root/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
blob: 8b0fdf49cefabb453283da2b0efe38af85802120 (plain) (tree)


















                                                                           
                          
                                                  
                
                       
 
                               

                                  

                                              


                                                       
                                                   
                                        
                                             
                                                    
                                                                         
                                              
                                                                   
                                                  
                                                             
                                                            
                                                      
                                                                   
                                                  
                                              
                                                    
                                                        





                                                                
                                                                                    

                                        
                                        

                              
 


                                                       
                                 


                                                    


                                                                    

                                                        
























                                                                                     






                                                                                       

                                                                             


     







                                                                                                  

   





                                                                                                 
                                                                                  
                               
                                                                                                    

                                                                            
     




                                                                                                

   





































                                                                                          
                             


















                                                                               

                                                   

                                                         
                                                
                       

                                          


                                                                   


                                                         
            



                                                                                                    

                                                                            
 
                                                         
                                                                                    

                                           
       
 
                                                  





                                                                                                   


                                                                                                  





                                                                                            
       

     
 
                                                                                           


                                                                                                 


                                                                                              
                                                                                             






                                                                                                  





                                                                                     



                                                                                               
                                                                          















                                                                                                
 






                                                                                                  
                            

              

       









                                                         
 
                                                          
                                                      























                                                                                              


























                                                                                                 

     
 


                                                                                               

                                                                                                  
     
                                                                                         


                                                     
                                                        











































                                                                                               


                                                                                                 
                                                  




                                                                                                










                                                                                                 
                                                                                    




                                                                                   
                                                                             






                                                                         




                         

                                          
                       
                                                         


                                                                                             






                                                                                                   
                                                                                        


                                                                                              









                                                                             


                                        
                                                                                  
                                                            




                                                        


                                                                                               
                                                         
     


                                                                                 



                                                                                  

                                                                                                   
     


                                                                             
                                                            

                                          





                                                                                          

                                                                               

                                                                          
                                                         
                                                                                

                                                                     





                                                                                      



                                                                        
 
                                                                   
                               
              


























                                                                                                    
                                                                                              








                                                                                          

       

                                                                                  
              
                                                                       
       



                                                                                                   
                                                                                                
                                                                                                 
                                       
                             



                                                                

                               
     

   


















                                                                                                   
                                                                               
                                                

   
                                                                                             



                                                              

                                                                                                 



                                                                                                
                                                                              
                               
                       

     



                                                       
                                                                                   
                    
                                              

                                               

                                                         

       
 
                                                                      


                                                                                 
                                                            



















                                                                                                   
                                       


                                                                              
     
 


                                                                                                    

   




                                                                          

















                                                                                                   
                                                   
       
            
                                                 









                                                                                              

                                                                     











                                                                                              
                                                                             
                                 

   









                                                                                  




                           
                                               




                                 
                 







                                    

                                               



                                                                       




                                                                                                  



                         

            

                           

                        







                                     
                                      



                                                                       




                                                                                                  







                                 
            

   



                                                                               






                                                                                                    





                                                                                                




                                                                                                  





                                                              



                                                                             





                                                   








                                                                                                  
                                                                                     

                                                                                           
                                                                               

                                                                                                   
                                                                       





                                     
                                 

                                               
                                 

                                                                                         






                                                             

                                                                                         









                                                                                                
                                                  
                                           
                                                                                           
                                                                                    
                                                                                             


































                                                                                                   
             
                                                               
                 


                                                                              
         
       
                                             
     

                   





                                                                
                                                                                          




                                                                                           
                                                      





                                                                     

                                                                                               

   
     





                                                                             


                                                                                                 


     

                                                                                     







                                                                                             

                                                                            








                                                                                

                              

                                                                                                
                                                                          
                                                                                          
                                                                       
     

   


                                      

                                                                           
                                         
                                                     
 
                                                                   
 

                                                                      
                                                                         
       
                                                                                                

   






                                                                               
                       


                                                                                
                                                                                  
                                              

                                                                                    


                                                                          
                                   



                                                                                                

                                         


                                               
                                                                                        
                                       


                                    
                                                                                   
                       


                                       
                                                                                     
                       



                                     

                            


                                                       












                                                                                  
                                                          

                                                             
                                                                   
 
                                                                       

                                                        
 






                                                                                            


                                                                          
                                                                                  

                                                                                               

                                             


                                                                                                   
                                                            




                                                                                                  



                                                                     


                                                                                            
                                                                                                 




                                                                  
              



















                                                                                               
                                                                                             


                                                                             
                                                                                              







                                                                                 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.hive

import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.util
import java.util.Locale

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}


/**
 * A persistent implementation of the system catalog using Hive.
 * All public methods must be synchronized for thread-safety.
 */
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
  extends ExternalCatalog with Logging {

  import CatalogTypes.TablePartitionSpec
  import HiveExternalCatalog._
  import CatalogTableType._

  /**
   * A Hive client used to interact with the metastore.
   */
  lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

  // Exceptions thrown by the hive client that we would like to wrap
  private val clientExceptions = Set(
    classOf[HiveException].getCanonicalName,
    classOf[TException].getCanonicalName,
    classOf[InvocationTargetException].getCanonicalName)

  /**
   * Whether this is an exception thrown by the hive client that should be wrapped.
   *
   * Due to classloader isolation issues, pattern matching won't work here so we need
   * to compare the canonical names of the exceptions, which we assume to be stable.
   */
  private def isClientException(e: Throwable): Boolean = {
    var temp: Class[_] = e.getClass
    var found = false
    while (temp != null && !found) {
      found = clientExceptions.contains(temp.getCanonicalName)
      temp = temp.getSuperclass
    }
    found
  }

  /**
   * Run some code involving `client` in a [[synchronized]] block and wrap certain
   * exceptions thrown in the process in [[AnalysisException]].
   */
  private def withClient[T](body: => T): T = synchronized {
    try {
      body
    } catch {
      case NonFatal(exception) if isClientException(exception) =>
        val e = exception match {
          // Since we are using shim, the exceptions thrown by the underlying method of
          // Method.invoke() are wrapped by InvocationTargetException
          case i: InvocationTargetException => i.getCause
          case o => o
        }
        throw new AnalysisException(
          e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
    }
  }

  /**
   * Get the raw table metadata from hive metastore directly. The raw table metadata may contains
   * special data source properties and should not be exposed outside of `HiveExternalCatalog`. We
   * should interpret these special data source properties and restore the original table metadata
   * before returning it.
   */
  private def getRawTable(db: String, table: String): CatalogTable = withClient {
    client.getTable(db, table)
  }

  /**
   * If the given table properties contains datasource properties, throw an exception. We will do
   * this check when create or alter a table, i.e. when we try to write table metadata to Hive
   * metastore.
   */
  private def verifyTableProperties(table: CatalogTable): Unit = {
    val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
    if (invalidKeys.nonEmpty) {
      throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
        s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
        invalidKeys.mkString("[", ", ", "]"))
    }
    // External users are not allowed to set/switch the table type. In Hive metastore, the table
    // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
    if (table.properties.contains("EXTERNAL")) {
      throw new AnalysisException("Cannot set or change the preserved property key: 'EXTERNAL'")
    }
  }

  // --------------------------------------------------------------------------
  // Databases
  // --------------------------------------------------------------------------

  override def createDatabase(
      dbDefinition: CatalogDatabase,
      ignoreIfExists: Boolean): Unit = withClient {
    client.createDatabase(dbDefinition, ignoreIfExists)
  }

  override def dropDatabase(
      db: String,
      ignoreIfNotExists: Boolean,
      cascade: Boolean): Unit = withClient {
    client.dropDatabase(db, ignoreIfNotExists, cascade)
  }

  /**
   * Alter a database whose name matches the one specified in `dbDefinition`,
   * assuming the database exists.
   *
   * Note: As of now, this only supports altering database properties!
   */
  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
    val existingDb = getDatabase(dbDefinition.name)
    if (existingDb.properties == dbDefinition.properties) {
      logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
        s"the provided database properties are the same as the old ones. Hive does not " +
        s"currently support altering other database fields.")
    }
    client.alterDatabase(dbDefinition)
  }

  override def getDatabase(db: String): CatalogDatabase = withClient {
    client.getDatabase(db)
  }

  override def databaseExists(db: String): Boolean = withClient {
    client.databaseExists(db)
  }

  override def listDatabases(): Seq[String] = withClient {
    client.listDatabases("*")
  }

  override def listDatabases(pattern: String): Seq[String] = withClient {
    client.listDatabases(pattern)
  }

  override def setCurrentDatabase(db: String): Unit = withClient {
    client.setCurrentDatabase(db)
  }

  // --------------------------------------------------------------------------
  // Tables
  // --------------------------------------------------------------------------

  override def createTable(
      tableDefinition: CatalogTable,
      ignoreIfExists: Boolean): Unit = withClient {
    assert(tableDefinition.identifier.database.isDefined)
    val db = tableDefinition.identifier.database.get
    val table = tableDefinition.identifier.table
    requireDbExists(db)
    verifyTableProperties(tableDefinition)

    if (tableExists(db, table) && !ignoreIfExists) {
      throw new TableAlreadyExistsException(db = db, table = table)
    }

    if (tableDefinition.tableType == VIEW) {
      client.createTable(tableDefinition, ignoreIfExists)
    } else {
      // Ideally we should not create a managed table with location, but Hive serde table can
      // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
      // to create the table directory and write out data before we create this table, to avoid
      // exposing a partial written table.
      val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
        tableDefinition.storage.locationUri.isEmpty

      val tableLocation = if (needDefaultTableLocation) {
        Some(CatalogUtils.stringToURI(defaultTablePath(tableDefinition.identifier)))
      } else {
        tableDefinition.storage.locationUri
      }

      if (DDLUtils.isHiveTable(tableDefinition)) {
        val tableWithDataSourceProps = tableDefinition.copy(
          // We can't leave `locationUri` empty and count on Hive metastore to set a default table
          // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
          // table location for tables in default database, while we expect to use the location of
          // default database.
          storage = tableDefinition.storage.copy(locationUri = tableLocation),
          // Here we follow data source tables and put table metadata like table schema, partition
          // columns etc. in table properties, so that we can work around the Hive metastore issue
          // about not case preserving and make Hive serde table support mixed-case column names.
          properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
        client.createTable(tableWithDataSourceProps, ignoreIfExists)
      } else {
        createDataSourceTable(
          tableDefinition.withNewStorage(locationUri = tableLocation),
          ignoreIfExists)
      }
    }
  }

  private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
    // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
    val provider = table.provider.get

    // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
    // support, no column nullability, etc., we should do some extra works before saving table
    // metadata into Hive metastore:
    //  1. Put table metadata like table schema, partition columns, etc. in table properties.
    //  2. Check if this table is hive compatible.
    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
    //         spec to empty and save table metadata to Hive.
    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
    //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
    val tableProperties = tableMetaToTableProps(table)

    // put table provider and partition provider in table properties.
    tableProperties.put(DATASOURCE_PROVIDER, provider)
    if (table.tracksPartitionsInCatalog) {
      tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
    }

    // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
    // However, in older version of Spark we already store table location in storage properties
    // with key "path". Here we keep this behaviour for backward compatibility.
    val storagePropsWithLocation = table.storage.properties ++
      table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))

    // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
    // bucket specification to empty. Note that partition columns are retained, so that we can
    // call partition-related Hive API later.
    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
      table.copy(
        // Hive only allows directory paths as location URIs while Spark SQL data source tables
        // also allow file paths. For non-hive-compatible format, we should not set location URI
        // to avoid hive metastore to throw exception.
        storage = table.storage.copy(
          locationUri = None,
          properties = storagePropsWithLocation),
        schema = table.partitionSchema,
        bucketSpec = None,
        properties = table.properties ++ tableProperties)
    }

    // converts the table metadata to Hive compatible format, i.e. set the serde information.
    def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
      val location = if (table.tableType == EXTERNAL) {
        // When we hit this branch, we are saving an external data source table with hive
        // compatible format, which means the data source is file-based and must have a `path`.
        require(table.storage.locationUri.isDefined,
          "External file-based data source table must have a `path` entry in storage properties.")
        Some(table.location)
      } else {
        None
      }

      table.copy(
        storage = table.storage.copy(
          locationUri = location,
          inputFormat = serde.inputFormat,
          outputFormat = serde.outputFormat,
          serde = serde.serde,
          properties = storagePropsWithLocation
        ),
        properties = table.properties ++ tableProperties)
    }

    val qualifiedTableName = table.identifier.quotedString
    val maybeSerde = HiveSerDe.sourceToSerDe(provider)
    val skipHiveMetadata = table.storage.properties
      .getOrElse("skipHiveMetadata", "false").toBoolean

    val (hiveCompatibleTable, logMessage) = maybeSerde match {
      case _ if skipHiveMetadata =>
        val message =
          s"Persisting data source table $qualifiedTableName into Hive metastore in" +
            "Spark SQL specific format, which is NOT compatible with Hive."
        (None, message)

      // our bucketing is un-compatible with hive(different hash function)
      case _ if table.bucketSpec.nonEmpty =>
        val message =
          s"Persisting bucketed data source table $qualifiedTableName into " +
            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
        (None, message)

      case Some(serde) =>
        val message =
          s"Persisting file based data source table $qualifiedTableName into " +
            s"Hive metastore in Hive compatible format."
        (Some(newHiveCompatibleMetastoreTable(serde)), message)

      case _ =>
        val message =
          s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
            s"Persisting data source table $qualifiedTableName into Hive metastore in " +
            s"Spark SQL specific format, which is NOT compatible with Hive."
        (None, message)
    }

    (hiveCompatibleTable, logMessage) match {
      case (Some(table), message) =>
        // We first try to save the metadata of the table in a Hive compatible way.
        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
        // specific way.
        try {
          logInfo(message)
          saveTableIntoHive(table, ignoreIfExists)
        } catch {
          case NonFatal(e) =>
            val warningMessage =
              s"Could not persist ${table.identifier.quotedString} in a Hive " +
                "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
            logWarning(warningMessage, e)
            saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
        }

      case (None, message) =>
        logWarning(message)
        saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
    }
  }

  /**
   * Data source tables may be non Hive compatible and we need to store table metadata in table
   * properties to workaround some Hive metastore limitations.
   * This method puts table schema, partition column names, bucket specification into a map, which
   * can be used as table properties later.
   */
  private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
    val partitionColumns = table.partitionColumnNames
    val bucketSpec = table.bucketSpec

    val properties = new mutable.HashMap[String, String]
    // Serialized JSON schema string may be too long to be stored into a single metastore table
    // property. In this case, we split the JSON string and store each part as a separate table
    // property.
    val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
    val schemaJsonString = table.schema.json
    // Split the JSON string.
    val parts = schemaJsonString.grouped(threshold).toSeq
    properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
    parts.zipWithIndex.foreach { case (part, index) =>
      properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
    }

    if (partitionColumns.nonEmpty) {
      properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
        properties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
      }
    }

    if (bucketSpec.isDefined) {
      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

      properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
      properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
        properties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
      }

      if (sortColumnNames.nonEmpty) {
        properties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
          properties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
        }
      }
    }

    properties
  }

  private def defaultTablePath(tableIdent: TableIdentifier): String = {
    val dbLocation = getDatabase(tableIdent.database.get).locationUri
    new Path(new Path(dbLocation), tableIdent.table).toString
  }

  private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    assert(DDLUtils.isDatasourceTable(tableDefinition),
      "saveTableIntoHive only takes data source table.")
    // If this is an external data source table...
    if (tableDefinition.tableType == EXTERNAL &&
      // ... that is not persisted as Hive compatible format (external tables in Hive compatible
      // format always set `locationUri` to the actual data location and should NOT be hacked as
      // following.)
      tableDefinition.storage.locationUri.isEmpty) {
      // !! HACK ALERT !!
      //
      // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary
      // directory that doesn't exist yet but can definitely be successfully created, and then
      // delete it right after creating the external data source table. This location will be
      // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't
      // really use it. Also, since we only do this workaround for external tables, deleting the
      // directory after the fact doesn't do any harm.
      //
      // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details.
      val tempPath = {
        val dbLocation = new Path(getDatabase(tableDefinition.database).locationUri)
        new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__")
      }

      try {
        client.createTable(
          tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)),
          ignoreIfExists)
      } finally {
        FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
      }
    } else {
      client.createTable(tableDefinition, ignoreIfExists)
    }
  }

  override def dropTable(
      db: String,
      table: String,
      ignoreIfNotExists: Boolean,
      purge: Boolean): Unit = withClient {
    requireDbExists(db)
    client.dropTable(db, table, ignoreIfNotExists, purge)
  }

  override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
    val rawTable = getRawTable(db, oldName)

    // Note that Hive serde tables don't use path option in storage properties to store the value
    // of table location, but use `locationUri` field to store it directly. And `locationUri` field
    // will be updated automatically in Hive metastore by the `alterTable` call at the end of this
    // method. Here we only update the path option if the path option already exists in storage
    // properties, to avoid adding a unnecessary path option for Hive serde tables.
    val hasPathOption = CaseInsensitiveMap(rawTable.storage.properties).contains("path")
    val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
      // If it's a managed table with path option and we are renaming it, then the path option
      // becomes inaccurate and we need to update it according to the new table name.
      val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
      updateLocationInStorageProps(rawTable, Some(newTablePath))
    } else {
      rawTable.storage
    }

    val newTable = rawTable.copy(
      identifier = TableIdentifier(newName, Some(db)),
      storage = storageWithNewPath)

    client.alterTable(oldName, newTable)
  }

  private def getLocationFromStorageProps(table: CatalogTable): Option[String] = {
    CaseInsensitiveMap(table.storage.properties).get("path")
  }

  private def updateLocationInStorageProps(
      table: CatalogTable,
      newPath: Option[String]): CatalogStorageFormat = {
    // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
    // while `CatalogTable` should be serializable.
    val propsWithoutPath = table.storage.properties.filter {
      case (k, v) => k.toLowerCase(Locale.ROOT) != "path"
    }
    table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _))
  }

  /**
   * Alter a table whose name that matches the one specified in `tableDefinition`,
   * assuming the table exists.
   *
   * Note: As of now, this doesn't support altering table schema, partition column names and bucket
   * specification. We will ignore them even if users do specify different values for these fields.
   */
  override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
    assert(tableDefinition.identifier.database.isDefined)
    val db = tableDefinition.identifier.database.get
    requireTableExists(db, tableDefinition.identifier.table)
    verifyTableProperties(tableDefinition)

    // convert table statistics to properties so that we can persist them through hive api
    val withStatsProps = if (tableDefinition.stats.isDefined) {
      val stats = tableDefinition.stats.get
      var statsProperties: Map[String, String] =
        Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
      if (stats.rowCount.isDefined) {
        statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
      }
      val colNameTypeMap: Map[String, DataType] =
        tableDefinition.schema.fields.map(f => (f.name, f.dataType)).toMap
      stats.colStats.foreach { case (colName, colStat) =>
        colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
          statsProperties += (columnStatKeyPropName(colName, k) -> v)
        }
      }
      tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
    } else {
      tableDefinition
    }

    if (tableDefinition.tableType == VIEW) {
      client.alterTable(withStatsProps)
    } else {
      val oldTableDef = getRawTable(db, withStatsProps.identifier.table)

      val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
        tableDefinition.storage
      } else {
        // We can't alter the table storage of data source table directly for 2 reasons:
        //   1. internally we use path option in storage properties to store the value of table
        //      location, but the given `tableDefinition` is from outside and doesn't have the path
        //      option, we need to add it manually.
        //   2. this data source table may be created on a file, not a directory, then we can't set
        //      the `locationUri` field and save it to Hive metastore, because Hive only allows
        //      directory as table location.
        //
        // For example, an external data source table is created with a single file '/path/to/file'.
        // Internally, we will add a path option with value '/path/to/file' to storage properties,
        // and set the `locationUri` to a special value due to SPARK-15269(please see
        // `saveTableIntoHive` for more details). When users try to get the table metadata back, we
        // will restore the `locationUri` field from the path option and remove the path option from
        // storage properties. When users try to alter the table storage, the given
        // `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
        // option is not set.
        //
        // Here we need 2 extra steps:
        //   1. add path option to storage properties, to match the internal format, i.e. using path
        //      option to store the value of table location.
        //   2. set the `locationUri` field back to the old one from the existing table metadata,
        //      if users don't want to alter the table location. This step is necessary as the
        //      `locationUri` is not always same with the path option, e.g. in the above example
        //      `locationUri` is a special value and we should respect it. Note that, if users
        //       want to alter the table location to a file path, we will fail. This should be fixed
        //       in the future.

        val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_))
        val storageWithPathOption = tableDefinition.storage.copy(
          properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))

        val oldLocation = getLocationFromStorageProps(oldTableDef)
        if (oldLocation == newLocation) {
          storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
        } else {
          storageWithPathOption
        }
      }

      val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
        TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG
      } else {
        TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM
      }

      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
      // to retain the spark specific format if it is. Also add old data source properties to table
      // properties, to retain the data source table format.
      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
      val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
      val newDef = withStatsProps.copy(
        storage = newStorage,
        schema = oldTableDef.schema,
        partitionColumnNames = oldTableDef.partitionColumnNames,
        bucketSpec = oldTableDef.bucketSpec,
        properties = newTableProps)

      client.alterTable(newDef)
    }
  }

  override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
    requireTableExists(db, table)
    val rawTable = getRawTable(db, table)
    val withNewSchema = rawTable.copy(schema = schema)
    // Add table metadata such as table schema, partition columns, etc. to table properties.
    val updatedTable = withNewSchema.copy(
      properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
    try {
      client.alterTable(updatedTable)
    } catch {
      case NonFatal(e) =>
        val warningMessage =
          s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
            "compatible way. Updating Hive metastore in Spark SQL specific format."
        logWarning(warningMessage, e)
        client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
    }
  }

  override def getTable(db: String, table: String): CatalogTable = withClient {
    restoreTableMetadata(getRawTable(db, table))
  }

  override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
    client.getTableOption(db, table).map(restoreTableMetadata)
  }

  /**
   * Restores table metadata from the table properties. This method is kind of a opposite version
   * of [[createTable]].
   *
   * It reads table schema, provider, partition column names and bucket specification from table
   * properties, and filter out these special entries from table properties.
   */
  private def restoreTableMetadata(inputTable: CatalogTable): CatalogTable = {
    if (conf.get(DEBUG_MODE)) {
      return inputTable
    }

    var table = inputTable

    if (table.tableType != VIEW) {
      table.properties.get(DATASOURCE_PROVIDER) match {
        // No provider in table properties, which means this is a Hive serde table.
        case None =>
          table = restoreHiveSerdeTable(table)

        // This is a regular data source table.
        case Some(provider) =>
          table = restoreDataSourceTable(table, provider)
      }
    }

    // construct Spark's statistics from information in Hive metastore
    val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))

    if (statsProps.nonEmpty) {
      val colStats = new mutable.HashMap[String, ColumnStat]

      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
      // but given the number of columns it usually not enormous, this is probably OK as a start.
      // If we want to map this a linear operation, we'd need a stronger contract between the
      // naming convention used for serialization.
      table.schema.foreach { field =>
        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
          // If "version" field is defined, then the column stat is defined.
          val keyPrefix = columnStatKeyPropName(field.name, "")
          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
            (k.drop(keyPrefix.length), v)
          }

          ColumnStat.fromMap(table.identifier.table, field, colStatMap).foreach {
            colStat => colStats += field.name -> colStat
          }
        }
      }

      table = table.copy(
        stats = Some(CatalogStatistics(
          sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
          rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
          colStats = colStats.toMap)))
    }

    // Get the original table properties as defined by the user.
    table.copy(
      properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
  }

  private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
    val hiveTable = table.copy(
      provider = Some(DDLUtils.HIVE_PROVIDER),
      tracksPartitionsInCatalog = true)

    // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
    // schema from table properties.
    if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
      val schemaFromTableProps = getSchemaFromTableProperties(table)
      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
        hiveTable.copy(
          schema = schemaFromTableProps,
          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
          bucketSpec = getBucketSpecFromTableProperties(table))
      } else {
        // Hive metastore may change the table schema, e.g. schema inference. If the table
        // schema we read back is different(ignore case and nullability) from the one in table
        // properties which was written when creating table, we should respect the table schema
        // from hive.
        logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
          "different from the schema when this table was created by Spark SQL" +
          s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
          "from Hive metastore which is not case preserving.")
        hiveTable.copy(schemaPreservesCase = false)
      }
    } else {
      hiveTable.copy(schemaPreservesCase = false)
    }
  }

  private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
    // Internally we store the table location in storage properties with key "path" for data
    // source tables. Here we set the table location to `locationUri` field and filter out the
    // path option in storage properties, to avoid exposing this concept externally.
    val storageWithLocation = {
      val tableLocation = getLocationFromStorageProps(table)
      // We pass None as `newPath` here, to remove the path option in storage properties.
      updateLocationInStorageProps(table, newPath = None).copy(
        locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
    }
    val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

    table.copy(
      provider = Some(provider),
      storage = storageWithLocation,
      schema = getSchemaFromTableProperties(table),
      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
      bucketSpec = getBucketSpecFromTableProperties(table),
      tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
  }

  override def tableExists(db: String, table: String): Boolean = withClient {
    client.tableExists(db, table)
  }

  override def listTables(db: String): Seq[String] = withClient {
    requireDbExists(db)
    client.listTables(db)
  }

  override def listTables(db: String, pattern: String): Seq[String] = withClient {
    requireDbExists(db)
    client.listTables(db, pattern)
  }

  override def loadTable(
      db: String,
      table: String,
      loadPath: String,
      isOverwrite: Boolean,
      isSrcLocal: Boolean): Unit = withClient {
    requireTableExists(db, table)
    client.loadTable(
      loadPath,
      s"$db.$table",
      isOverwrite,
      isSrcLocal)
  }

  override def loadPartition(
      db: String,
      table: String,
      loadPath: String,
      partition: TablePartitionSpec,
      isOverwrite: Boolean,
      inheritTableSpecs: Boolean,
      isSrcLocal: Boolean): Unit = withClient {
    requireTableExists(db, table)

    val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
    getTable(db, table).partitionColumnNames.foreach { colName =>
      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
      // and Hive will validate the column names in partition spec to make sure they are partition
      // columns. Here we Lowercase the column names before passing the partition spec to Hive
      // client, to satisfy Hive.
      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
    }

    client.loadPartition(
      loadPath,
      db,
      table,
      orderedPartitionSpec,
      isOverwrite,
      inheritTableSpecs,
      isSrcLocal)
  }

  override def loadDynamicPartitions(
      db: String,
      table: String,
      loadPath: String,
      partition: TablePartitionSpec,
      replace: Boolean,
      numDP: Int): Unit = withClient {
    requireTableExists(db, table)

    val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
    getTable(db, table).partitionColumnNames.foreach { colName =>
      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
      // and Hive will validate the column names in partition spec to make sure they are partition
      // columns. Here we Lowercase the column names before passing the partition spec to Hive
      // client, to satisfy Hive.
      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
    }

    client.loadDynamicPartitions(
      loadPath,
      db,
      table,
      orderedPartitionSpec,
      replace,
      numDP)
  }

  // --------------------------------------------------------------------------
  // Partitions
  // --------------------------------------------------------------------------

  // Hive metastore is not case preserving and the partition columns are always lower cased. We need
  // to lower case the column names in partition specification before calling partition related Hive
  // APIs, to match this behaviour.
  private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
    spec.map { case (k, v) => k.toLowerCase -> v }
  }

  // Build a map from lower-cased partition column names to exact column names for a given table
  private def buildLowerCasePartColNameMap(table: CatalogTable): Map[String, String] = {
    val actualPartColNames = table.partitionColumnNames
    actualPartColNames.map(colName => (colName.toLowerCase, colName)).toMap
  }

  // Hive metastore is not case preserving and the column names of the partition specification we
  // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
  // partition columns.
  private def restorePartitionSpec(
      spec: TablePartitionSpec,
      partColMap: Map[String, String]): TablePartitionSpec = {
    spec.map { case (k, v) => partColMap(k.toLowerCase) -> v }
  }

  private def restorePartitionSpec(
      spec: TablePartitionSpec,
      partCols: Seq[String]): TablePartitionSpec = {
    spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
  }

  override def createPartitions(
      db: String,
      table: String,
      parts: Seq[CatalogTablePartition],
      ignoreIfExists: Boolean): Unit = withClient {
    requireTableExists(db, table)

    val tableMeta = getTable(db, table)
    val partitionColumnNames = tableMeta.partitionColumnNames
    val tablePath = new Path(tableMeta.location)
    val partsWithLocation = parts.map { p =>
      // Ideally we can leave the partition location empty and let Hive metastore to set it.
      // However, Hive metastore is not case preserving and will generate wrong partition location
      // with lower cased partition column names. Here we set the default partition location
      // manually to avoid this problem.
      val partitionPath = p.storage.locationUri.map(uri => new Path(uri)).getOrElse {
        ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
      }
      p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
    }
    val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
    client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
  }

  override def dropPartitions(
      db: String,
      table: String,
      parts: Seq[TablePartitionSpec],
      ignoreIfNotExists: Boolean,
      purge: Boolean,
      retainData: Boolean): Unit = withClient {
    requireTableExists(db, table)
    client.dropPartitions(
      db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
  }

  override def renamePartitions(
      db: String,
      table: String,
      specs: Seq[TablePartitionSpec],
      newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
    client.renamePartitions(
      db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))

    val tableMeta = getTable(db, table)
    val partitionColumnNames = tableMeta.partitionColumnNames
    // Hive metastore is not case preserving and keeps partition columns with lower cased names.
    // When Hive rename partition for managed tables, it will create the partition location with
    // a default path generate by the new spec with lower cased partition column names. This is
    // unexpected and we need to rename them manually and alter the partition location.
    val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
    if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
      val tablePath = new Path(tableMeta.location)
      val fs = tablePath.getFileSystem(hadoopConf)
      val newParts = newSpecs.map { spec =>
        val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
        val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
        partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
      }
      alterPartitions(db, table, newParts)
    }
  }

  /**
   * Rename the partition directory w.r.t. the actual partition columns.
   *
   * It will recursively rename the partition directory from the first partition column, to be most
   * compatible with different file systems. e.g. in some file systems, renaming `a=1/b=2` to
   * `A=1/B=2` will result to `a=1/B=2`, while in some other file systems, the renaming works, but
   * will leave an empty directory `a=1`.
   */
  private def renamePartitionDirectory(
      fs: FileSystem,
      tablePath: Path,
      partCols: Seq[String],
      newSpec: TablePartitionSpec): Path = {
    import ExternalCatalogUtils.getPartitionPathString

    var currentFullPath = tablePath
    partCols.foreach { col =>
      val partValue = newSpec(col)
      val expectedPartitionString = getPartitionPathString(col, partValue)
      val expectedPartitionPath = new Path(currentFullPath, expectedPartitionString)

      if (fs.exists(expectedPartitionPath)) {
        // It is possible that some parental partition directories already exist or doesn't need to
        // be renamed. e.g. the partition columns are `a` and `B`, then we don't need to rename
        // `/table_path/a=1`. Or we already have a partition directory `A=1/B=2`, and we rename
        // another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should
        // just move `a=1/b=3` into `A=1` with new name `B=3`.
      } else {
        val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue)
        val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
        try {
          fs.rename(actualPartitionPath, expectedPartitionPath)
        } catch {
          case e: IOException =>
            throw new SparkException("Unable to rename partition path from " +
              s"$actualPartitionPath to $expectedPartitionPath", e)
        }
      }
      currentFullPath = expectedPartitionPath
    }

    currentFullPath
  }

  override def alterPartitions(
      db: String,
      table: String,
      newParts: Seq[CatalogTablePartition]): Unit = withClient {
    val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
    // Note: Before altering table partitions in Hive, you *must* set the current database
    // to the one that contains the table of interest. Otherwise you will end up with the
    // most helpful error message ever: "Unable to alter partition. alter is not possible."
    // See HIVE-2742 for more detail.
    client.setCurrentDatabase(db)
    client.alterPartitions(db, table, lowerCasedParts)
  }

  override def getPartition(
      db: String,
      table: String,
      spec: TablePartitionSpec): CatalogTablePartition = withClient {
    val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
  }

  /**
   * Returns the specified partition or None if it does not exist.
   */
  override def getPartitionOption(
      db: String,
      table: String,
      spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
    client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
    }
  }

  /**
   * Returns the partition names from hive metastore for a given table in a database.
   */
  override def listPartitionNames(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient {
    val catalogTable = getTable(db, table)
    val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
    val clientPartitionNames =
      client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
    clientPartitionNames.map { partitionPath =>
      val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
      partSpec.map { case (partName, partValue) =>
        partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
      }.mkString("/")
    }
  }

  /**
   * Returns the partitions from hive metastore for a given table in a database.
   */
  override def listPartitions(
      db: String,
      table: String,
      partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
    val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
    client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
      part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
    }
  }

  override def listPartitionsByFilter(
      db: String,
      table: String,
      predicates: Seq[Expression],
      defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
    val rawTable = getRawTable(db, table)
    val catalogTable = restoreTableMetadata(rawTable)

    val partColNameMap = buildLowerCasePartColNameMap(catalogTable)

    val clientPrunedPartitions =
      client.getPartitionsByFilter(rawTable, predicates).map { part =>
        part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
      }
    prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
  }

  // --------------------------------------------------------------------------
  // Functions
  // --------------------------------------------------------------------------

  override def createFunction(
      db: String,
      funcDefinition: CatalogFunction): Unit = withClient {
    requireDbExists(db)
    // Hive's metastore is case insensitive. However, Hive's createFunction does
    // not normalize the function name (unlike the getFunction part). So,
    // we are normalizing the function name.
    val functionName = funcDefinition.identifier.funcName.toLowerCase(Locale.ROOT)
    requireFunctionNotExists(db, functionName)
    val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
    client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
  }

  override def dropFunction(db: String, name: String): Unit = withClient {
    requireFunctionExists(db, name)
    client.dropFunction(db, name)
  }

  override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
    requireFunctionExists(db, oldName)
    requireFunctionNotExists(db, newName)
    client.renameFunction(db, oldName, newName)
  }

  override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
    requireFunctionExists(db, funcName)
    client.getFunction(db, funcName)
  }

  override def functionExists(db: String, funcName: String): Boolean = withClient {
    requireDbExists(db)
    client.functionExists(db, funcName)
  }

  override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
    requireDbExists(db)
    client.listFunctions(db, pattern)
  }

}

object HiveExternalCatalog {
  val SPARK_SQL_PREFIX = "spark.sql."

  val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
  val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
  val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
  val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
  val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
  val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
  val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
  val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
  val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
  val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."

  val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
  val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
  val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
  val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."

  val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
  val TABLE_PARTITION_PROVIDER_CATALOG = "catalog"
  val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem"

  /**
   * Returns the fully qualified name used in table properties for a particular column stat.
   * For example, for column "mycol", and "min" stat, this should return
   * "spark.sql.statistics.colStats.mycol.min".
   */
  private def columnStatKeyPropName(columnName: String, statKey: String): String = {
    STATISTICS_COL_STATS_PREFIX + columnName + "." + statKey
  }

  // A persisted data source table always store its schema in the catalog.
  private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
    val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
    val props = metadata.properties
    val schema = props.get(DATASOURCE_SCHEMA)
    if (schema.isDefined) {
      // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
      // After SPARK-6024, we removed this flag.
      // Although we are not using `spark.sql.sources.schema` any more, we need to still support.
      DataType.fromJson(schema.get).asInstanceOf[StructType]
    } else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) {
      // If there is no schema information in table properties, it means the schema of this table
      // was empty when saving into metastore, which is possible in older version(prior to 2.1) of
      // Spark. We should respect it.
      new StructType()
    } else {
      val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
      if (numSchemaParts.isDefined) {
        val parts = (0 until numSchemaParts.get.toInt).map { index =>
          val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
          if (part == null) {
            throw new AnalysisException(errorMessage +
              s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
          }
          part
        }
        // Stick all parts back to a single schema string.
        DataType.fromJson(parts.mkString).asInstanceOf[StructType]
      } else {
        throw new AnalysisException(errorMessage)
      }
    }
  }

  private def getColumnNamesByType(
      props: Map[String, String],
      colType: String,
      typeName: String): Seq[String] = {
    for {
      numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
      index <- 0 until numCols.toInt
    } yield props.getOrElse(
      s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
      throw new AnalysisException(
        s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
      )
    )
  }

  private def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
    getColumnNamesByType(metadata.properties, "part", "partitioning columns")
  }

  private def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
    metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
      BucketSpec(
        numBuckets.toInt,
        getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
        getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
    }
  }
}