package org.apache.spark.sql.execution.command

import java.io.File
import java.net.URI
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
  override def afterEach(): Unit = {
    try {
      // drop all databases, tables and functions after each test
    } finally {
      Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))

  protected override def generateTable(
      catalog: SessionCatalog,
      name: TableIdentifier): CatalogTable = {
    val storage =
      CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
    val metadata = new MetadataBuilder()
      .putString("key", "value")
      identifier = name,
      tableType = CatalogTableType.EXTERNAL,
      storage = storage,
      schema = new StructType()
        .add("col1", "int", nullable = true, metadata = metadata)
        .add("col2", "string")
        .add("a", "int")
        .add("b", "int"),
      provider = Some("parquet"),
      partitionColumnNames = Seq("a", "b"),
      createTime = 0L,
      tracksPartitionsInCatalog = true)

  test("alter table: set location (datasource table)") {
    testSetLocation(isDatasourceTable = true)

  test("alter table: set properties (datasource table)") {
    testSetProperties(isDatasourceTable = true)

  test("alter table: unset properties (datasource table)") {
    testUnsetProperties(isDatasourceTable = true)

  test("alter table: set serde (datasource table)") {
    testSetSerde(isDatasourceTable = true)

  test("alter table: set serde partition (datasource table)") {
    testSetSerdePartition(isDatasourceTable = true)

  test("alter table: change column (datasource table)") {
    testChangeColumn(isDatasourceTable = true)

  test("alter table: add partition (datasource table)") {
    testAddPartitions(isDatasourceTable = true)

  test("alter table: drop partition (datasource table)") {
    testDropPartitions(isDatasourceTable = true)

  test("alter table: rename partition (datasource table)") {
    testRenamePartitions(isDatasourceTable = true)

  test("drop table - data source table") {
    testDropTable(isDatasourceTable = true)

  test("create a managed Hive source table") {
    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
    val tabName = "tbl"
    withTable(tabName) {
      val e = intercept[AnalysisException] {
        sql(s"CREATE TABLE $tabName (i INT, j STRING)")
      assert(e.contains("Hive support is required to CREATE Hive TABLE"))

  test("create an external Hive source table") {
    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
    withTempDir { tempDir =>
      val tabName = "tbl"
      withTable(tabName) {
        val e = intercept[AnalysisException] {
               |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
               |LOCATION '${tempDir.toURI}'
        assert(e.contains("Hive support is required to CREATE Hive TABLE"))

  test("Create Hive Table As Select") {
    import testImplicits._
    withTable("t", "t1") {
      var e = intercept[AnalysisException] {
        sql("CREATE TABLE t SELECT 1 as a, 1 as b")
      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))

      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
      e = intercept[AnalysisException] {
        sql("CREATE TABLE t SELECT a, b from t1")
      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))


abstract class DDLSuite extends QueryTest with SQLTestUtils {

  protected def isUsingHiveMetastore: Boolean = {
    spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"

  protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable

  private val escapedIdentifier = "`(.+)`".r

  protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table

  private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
    props.filterNot(p => Seq("serialization.format", "path").contains(p._1))

  private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
    assert(normalizeCatalogTable(actual) == normalizeCatalogTable(expected))

   * Strip backticks, if any, from the string.
  private def cleanIdentifier(ident: String): String = {
    ident match {
      case escapedIdentifier(i) => i
      case plainIdent => plainIdent

  private def assertUnsupported(query: String): Unit = {
    val e = intercept[AnalysisException] {
    assert(e.getMessage.toLowerCase(Locale.ROOT).contains("operation not allowed"))

  private def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = {
    if (expectException) intercept[AnalysisException] { body } else body

  private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
        name, "", CatalogUtils.stringToURI(spark.sessionState.conf.warehousePath), Map()),
      ignoreIfExists = false)

  private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
    catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)

  private def createTablePartition(
      catalog: SessionCatalog,
      spec: TablePartitionSpec,
      tableName: TableIdentifier): Unit = {
    val part = CatalogTablePartition(
      spec, CatalogStorageFormat(None, None, None, None, false, Map()))
    catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)

  private def getDBPath(dbName: String): URI = {
    val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath)
    new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri

  test("the qualified path of a database is stored in the catalog") {
    val catalog = spark.sessionState.catalog

    withTempDir { tmpDir =>
      val path = tmpDir.getCanonicalPath
      // The generated temp path is not qualified.
      val uri = tmpDir.toURI
      sql(s"CREATE DATABASE db1 LOCATION '$uri'")
      val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
      assert("file" === pathInCatalog.getScheme)
      val expectedPath = new Path(path).toUri
      assert(expectedPath.getPath === pathInCatalog.getPath)
      sql("DROP DATABASE db1")

  test("Create Database using Default Warehouse Path") {
    val catalog = spark.sessionState.catalog
    val dbName = "db1"
    try {
      sql(s"CREATE DATABASE $dbName")
      val db1 = catalog.getDatabaseMetadata(dbName)
      assert(db1 == CatalogDatabase(
      sql(s"DROP DATABASE $dbName CASCADE")
    } finally {

  test("Create/Drop Database - location") {
    val catalog = spark.sessionState.catalog
    val databaseNames = Seq("db1", "`database`")
    withTempDir { tmpDir =>
      val path = new Path(tmpDir.getCanonicalPath).toUri
      databaseNames.foreach { dbName =>
        try {
          val dbNameWithoutBackTicks = cleanIdentifier(dbName)
          sql(s"CREATE DATABASE $dbName Location '$path'")
          val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
          val expPath = makeQualifiedPath(tmpDir.toString)
          assert(db1 == CatalogDatabase(
          sql(s"DROP DATABASE $dbName CASCADE")
        } finally {

  test("Create Database - database already exists") {
    val catalog = spark.sessionState.catalog
    val databaseNames = Seq("db1", "`database`")

    databaseNames.foreach { dbName =>
      try {
        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
        sql(s"CREATE DATABASE $dbName")
        val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
        assert(db1 == CatalogDatabase(

        // TODO: HiveExternalCatalog should throw DatabaseAlreadyExistsException
        val e = intercept[AnalysisException] {
          sql(s"CREATE DATABASE $dbName")
        assert(e.contains(s"already exists"))
      } finally {

  private def checkSchemaInCreatedDataSourceTable(
      path: File,
      userSpecifiedSchema: Option[String],
      userSpecifiedPartitionCols: Option[String],
      expectedSchema: StructType,
      expectedPartitionCols: Seq[String]): Unit = {
    val tabName = "tab1"
    withTable(tabName) {
      val partitionClause =
        userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
      val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
      val uri = path.toURI
      val sqlCreateTable =
           |CREATE TABLE $tabName $schemaClause
           |USING parquet
           |OPTIONS (
           |  path '$uri'
      if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) {
        val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage
          "not allowed to specify partition columns when the table schema is not defined"))
      } else {
        val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))

        assert(expectedSchema == tableMetadata.schema)
        assert(expectedPartitionCols == tableMetadata.partitionColumnNames)

  test("Create partitioned data source table without user specified schema") {
    import testImplicits._
    val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")

    // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
    // Case 2: without schema and partitioning columns: None
    Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
      withTempPath { pathToPartitionedTable =>
          userSpecifiedSchema = None,
          userSpecifiedPartitionCols = partitionCols,
          expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
          expectedPartitionCols = Seq("num"))

  test("Create partitioned data source table with user specified schema") {
    import testImplicits._
    val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")

    // Case 1: with partitioning columns but no schema: Option("num")
    // Case 2: without schema and partitioning columns: None
    Seq(Option("num"), None).foreach { partitionCols =>
      withTempPath { pathToPartitionedTable =>
          userSpecifiedSchema = Option("num int, str string"),
          userSpecifiedPartitionCols = partitionCols,
          expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
          expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))

  test("Create non-partitioned data source table without user specified schema") {
    import testImplicits._
    val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")

    // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
    // Case 2: without schema and partitioning columns: None
    Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
      withTempPath { pathToNonPartitionedTable =>
          userSpecifiedSchema = None,
          userSpecifiedPartitionCols = partitionCols,
          expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
          expectedPartitionCols = Seq.empty[String])

  test("Create non-partitioned data source table with user specified schema") {
    import testImplicits._
    val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")

    // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
    // Case 2: without schema and partitioning columns: None
    Seq(Option("num"), None).foreach { partitionCols =>
      withTempPath { pathToNonPartitionedTable =>
          userSpecifiedSchema = Option("num int, str string"),
          userSpecifiedPartitionCols = partitionCols,
          expectedSchema = if (partitionCols.isDefined) {
            // we skipped inference, so the partition col is ordered at the end
            new StructType().add("str", StringType).add("num", IntegerType)
          } else {
            // no inferred partitioning, so schema is in original order
            new StructType().add("num", IntegerType).add("str", StringType)
          expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))

  test("create table - duplicate column names in the table definition") {
    val e = intercept[AnalysisException] {
      sql("CREATE TABLE tbl(a int, a string) USING json")
    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")

    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
      val e2 = intercept[AnalysisException] {
        sql("CREATE TABLE tbl(a int, A string) USING json")
      assert(e2.message == "Found duplicate column(s) in table definition of `tbl`: a")

  test("create table - partition column names not in table definition") {
    val e = intercept[AnalysisException] {
      sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
    assert(e.message == "partition column c is not defined in table tbl, " +
      "defined table columns are: a, b")

  test("create table - bucket column names not in table definition") {
    val e = intercept[AnalysisException] {
      sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
    assert(e.message == "bucket column c is not defined in table tbl, " +
      "defined table columns are: a, b")

  test("create table - column repeated in partition columns") {
    val e = intercept[AnalysisException] {
      sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
    assert(e.message == "Found duplicate column(s) in partition: a")

  test("create table - column repeated in bucket columns") {
    val e = intercept[AnalysisException] {
      sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
    assert(e.message == "Found duplicate column(s) in bucket: a")

  test("Refresh table after changing the data source table partitioning") {
    import testImplicits._

    val tabName = "tab1"
    val catalog = spark.sessionState.catalog
    withTempPath { dir =>
      val path = dir.getCanonicalPath
      val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i))
        .toDF("col1", "col2", "col3", "col4")
      df.write.format("json").partitionBy("col1", "col3").save(path)
      val schema = new StructType()
        .add("col2", StringType).add("col4", LongType)
        .add("col1", IntegerType).add("col3", IntegerType)
      val partitionCols = Seq("col1", "col3")
      val uri = dir.toURI

      withTable(tabName) {
             |CREATE TABLE $tabName
             |USING json
             |OPTIONS (
             |  path '$uri'
        val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
        assert(tableMetadata.schema == schema)
        assert(tableMetadata.partitionColumnNames == partitionCols)

        // Change the schema
        val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
          .toDF("newCol1", "newCol2")

        // No change on the schema
        val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
        assert(tableMetadataBeforeRefresh.schema == schema)
        assert(tableMetadataBeforeRefresh.partitionColumnNames == partitionCols)

        // Refresh does not affect the schema

        val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
        assert(tableMetadataAfterRefresh.schema == schema)
        assert(tableMetadataAfterRefresh.partitionColumnNames == partitionCols)

  test("Alter/Describe Database") {
    val catalog = spark.sessionState.catalog
    val databaseNames = Seq("db1", "`database`")

    databaseNames.foreach { dbName =>
      try {
        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
        val location = getDBPath(dbNameWithoutBackTicks)

        sql(s"CREATE DATABASE $dbName")

          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
          Row("Database Name", dbNameWithoutBackTicks) ::
            Row("Description", "") ::
            Row("Location", CatalogUtils.URIToString(location)) ::
            Row("Properties", "") :: Nil)

        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
          Row("Database Name", dbNameWithoutBackTicks) ::
            Row("Description", "") ::
            Row("Location", CatalogUtils.URIToString(location)) ::
            Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)

        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")

          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
          Row("Database Name", dbNameWithoutBackTicks) ::
            Row("Description", "") ::
            Row("Location", CatalogUtils.URIToString(location)) ::
            Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
      } finally {

  test("Drop/Alter/Describe Database - database does not exists") {
    val databaseNames = Seq("db1", "`database`")

    databaseNames.foreach { dbName =>
      val dbNameWithoutBackTicks = cleanIdentifier(dbName)

      var message = intercept[AnalysisException] {
        sql(s"DROP DATABASE $dbName")
      // TODO: Unify the exception.
      if (isUsingHiveMetastore) {
        assert(message.contains(s"NoSuchObjectException: $dbNameWithoutBackTicks"))
      } else {
        assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))

      message = intercept[AnalysisException] {
        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
      assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))

      message = intercept[AnalysisException] {
        sql(s"DESCRIBE DATABASE EXTENDED $dbName")
      assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))

      sql(s"DROP DATABASE IF EXISTS $dbName")

  test("drop non-empty database in restrict mode") {
    val catalog = spark.sessionState.catalog
    val dbName = "db1"
    sql(s"CREATE DATABASE $dbName")

    // create a table in database
    val tableIdent1 = TableIdentifier("tab1", Some(dbName))
    createTable(catalog, tableIdent1)

    // drop a non-empty database in Restrict mode
    val message = intercept[AnalysisException] {
      sql(s"DROP DATABASE $dbName RESTRICT")
    assert(message.contains(s"Database $dbName is not empty. One or more tables exist"))

    catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)

    sql(s"DROP DATABASE $dbName RESTRICT")

  test("drop non-empty database in cascade mode") {
    val catalog = spark.sessionState.catalog
    val dbName = "db1"
    sql(s"CREATE DATABASE $dbName")

    // create a table in database
    val tableIdent1 = TableIdentifier("tab1", Some(dbName))
    createTable(catalog, tableIdent1)

    // drop a non-empty database in CASCADE mode
    sql(s"DROP DATABASE $dbName CASCADE")

  test("create table in default db") {
    val catalog = spark.sessionState.catalog
    val tableIdent1 = TableIdentifier("tab1", None)
    createTable(catalog, tableIdent1)
    val expectedTableIdent = tableIdent1.copy(database = Some("default"))
    val expectedTable = generateTable(catalog, expectedTableIdent)
    checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))

  test("create table in a specific db") {
    val catalog = spark.sessionState.catalog
    createDatabase(catalog, "dbx")
    val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
    createTable(catalog, tableIdent1)
    val expectedTable = generateTable(catalog, tableIdent1)
    checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))

  test("create table using") {
    val catalog = spark.sessionState.catalog
    withTable("tbl") {
      sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
      assert(table.tableType == CatalogTableType.MANAGED)
      assert(table.schema == new StructType().add("a", "int").add("b", "int"))
      assert(table.provider == Some("parquet"))

  test("create table using - with partitioned by") {
    val catalog = spark.sessionState.catalog
    withTable("tbl") {
      sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
      assert(table.tableType == CatalogTableType.MANAGED)
      assert(table.provider == Some("parquet"))
      // a is ordered last since it is a user-specified partitioning column
      assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
      assert(table.partitionColumnNames == Seq("a"))

  test("create table using - with bucket") {
    val catalog = spark.sessionState.catalog
    withTable("tbl") {
      sql("CREATE TABLE tbl(a INT, b INT) USING parquet " +
      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
      assert(table.tableType == CatalogTableType.MANAGED)
      assert(table.provider == Some("parquet"))
      assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
      assert(table.bucketSpec == Some(BucketSpec(5, Seq("a"), Seq("b"))))

  test("create temporary view using") {
    // when we test the HiveCatalogedDDLSuite, it will failed because the csvFile path above
    // starts with 'jar:', and it is an illegal parameter for Path, so here we copy it
    // to a temp file by withResourceTempPath
    withResourceTempPath("test-data/cars.csv") { tmpFile =>
      withView("testview") {
        sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1 String, c2 String)  USING " +
          "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  " +
          s"OPTIONS (PATH '$tmpFile')")

          sql("select c1, c2 from testview order by c1 limit 1"),
          Row("1997", "Ford") :: Nil)

        // Fails if creating a new view with the same name
        intercept[TempTableAlreadyExistsException] {
               |CREATE TEMPORARY VIEW testview
               |USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
               |OPTIONS (PATH '$tmpFile')

  test("alter table: rename") {
    val catalog = spark.sessionState.catalog
    val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
    val tableIdent2 = TableIdentifier("tab2", Some("dbx"))
    createDatabase(catalog, "dbx")
    createDatabase(catalog, "dby")
    createTable(catalog, tableIdent1)

    assert(catalog.listTables("dbx") == Seq(tableIdent1))
    sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
    assert(catalog.listTables("dbx") == Seq(tableIdent2))

    // The database in destination table name can be omitted, and we will use the database of source
    // table for it.
    sql("ALTER TABLE dbx.tab2 RENAME TO tab1")
    assert(catalog.listTables("dbx") == Seq(tableIdent1))

    // rename without explicitly specifying database
    sql("ALTER TABLE tab1 RENAME TO tab2")
    assert(catalog.listTables("dbx") == Seq(tableIdent2))
    // table to rename does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
    // destination database is different
    intercept[AnalysisException] {
      sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")

  test("alter table: rename cached table") {
    import testImplicits._
    sql("CREATE TABLE students (age INT, name STRING) USING parquet")
    val df = (1 to 2).map { i => (i, i.toString) }.toDF("age", "name")
    assume(spark.table("students").collect().toSeq == df.collect().toSeq, "bad test: wrong data")
    assume(spark.catalog.isCached("students"), "bad test: table was not cached in the first place")
    sql("ALTER TABLE students RENAME TO teachers")
    sql("CREATE TABLE students (age INT, name STRING) USING parquet")
    // Now we have both students and teachers.
    // The cached data for the old students table should not be read by the new students table.
    assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)

  test("rename temporary table - destination table with database name") {
    withTempView("tab1") {
          |USING org.apache.spark.sql.sources.DDLScanSource
          |OPTIONS (
          |  From '1',
          |  To '10',
          |  Table 'test1'

      val e = intercept[AnalysisException] {
        sql("ALTER TABLE tab1 RENAME TO default.tab2")
        "RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " +
          "cannot specify database name 'default' in the destination table"))

      val catalog = spark.sessionState.catalog
      assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))

  test("rename temporary table") {
    withTempView("tab1", "tab2") {
      sql("ALTER TABLE tab1 RENAME TO tab2")
      checkAnswer(spark.table("tab2"), spark.range(10).toDF())
      intercept[NoSuchTableException] { spark.table("tab1") }
      sql("ALTER VIEW tab2 RENAME TO tab1")
      checkAnswer(spark.table("tab1"), spark.range(10).toDF())
      intercept[NoSuchTableException] { spark.table("tab2") }

  test("rename temporary table - destination table already exists") {
    withTempView("tab1", "tab2") {
          |USING org.apache.spark.sql.sources.DDLScanSource
          |OPTIONS (
          |  From '1',
          |  To '10',
          |  Table 'test1'

          |USING org.apache.spark.sql.sources.DDLScanSource
          |OPTIONS (
          |  From '1',
          |  To '10',
          |  Table 'test1'

      val e = intercept[AnalysisException] {
        sql("ALTER TABLE tab1 RENAME TO tab2")
        "RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists"))

      val catalog = spark.sessionState.catalog
      assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2")))

  test("alter table: set location") {
    testSetLocation(isDatasourceTable = false)

  test("alter table: set properties") {
    testSetProperties(isDatasourceTable = false)

  test("alter table: unset properties") {
    testUnsetProperties(isDatasourceTable = false)

  // TODO: move this test to HiveDDLSuite.scala
  ignore("alter table: set serde") {
    testSetSerde(isDatasourceTable = false)

  // TODO: move this test to HiveDDLSuite.scala
  ignore("alter table: set serde partition") {
    testSetSerdePartition(isDatasourceTable = false)

  test("alter table: change column") {
    testChangeColumn(isDatasourceTable = false)

  test("alter table: bucketing is not supported") {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (blood, lemon, grape) INTO 11 BUCKETS")
    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (fuji) SORTED BY (grape) INTO 5 BUCKETS")
    assertUnsupported("ALTER TABLE dbx.tab1 NOT CLUSTERED")
    assertUnsupported("ALTER TABLE dbx.tab1 NOT SORTED")

  test("alter table: skew is not supported") {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
      "(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn'))")
    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
      "(('2008-08-08', 'us'), ('2009-09-09', 'uk')) STORED AS DIRECTORIES")
    assertUnsupported("ALTER TABLE dbx.tab1 NOT SKEWED")
    assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")

  test("alter table: add partition") {
    testAddPartitions(isDatasourceTable = false)

  test("alter table: recover partitions (sequential)") {
    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {

  test("alter table: recover partition (parallel)") {
    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {

  protected def testRecoverPartitions() {
    val catalog = spark.sessionState.catalog
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")

    val tableIdent = TableIdentifier("tab1")
    createTable(catalog, tableIdent)
    val part1 = Map("a" -> "1", "b" -> "5")
    createTablePartition(catalog, part1, tableIdent)
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

    val part2 = Map("a" -> "2", "b" -> "6")
    val root = new Path(catalog.getTableMetadata(tableIdent).location)
    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    // valid
    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // file
    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))

    // invalid
    fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
    fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
    fs.mkdirs(new Path(root, "a=4")) // not enough columns
    fs.createNewFile(new Path(new Path(root, "a=1"), "b=4"))  // file
    fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS"))  // _SUCCESS
    fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary"))  // _temporary
    fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4"))  // start with .

    try {
      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
        Set(part1, part2))
      if (!isUsingHiveMetastore) {
        assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
        assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
      } else {
        // After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
        assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
        assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
    } finally {
      fs.delete(root, true)

  test("alter table: add partition is not supported for views") {
    assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")

  test("alter table: drop partition") {
    testDropPartitions(isDatasourceTable = false)

  test("alter table: drop partition is not supported for views") {
    assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")

  test("alter table: rename partition") {
    testRenamePartitions(isDatasourceTable = false)

  test("show databases") {
    sql("CREATE DATABASE showdb2B")
    sql("CREATE DATABASE showdb1A")

    // check the result as well as its order
    checkDataset(sql("SHOW DATABASES"), Row("default"), Row("showdb1a"), Row("showdb2b"))

      sql("SHOW DATABASES LIKE '*db1A'"),
      Row("showdb1a") :: Nil)

      sql("SHOW DATABASES LIKE 'showdb1A'"),
      Row("showdb1a") :: Nil)

      sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
      Row("showdb1a") ::
        Row("showdb2b") :: Nil)

      sql("SHOW DATABASES LIKE 'non-existentdb'"),

  test("drop view - temporary view") {
    val catalog = spark.sessionState.catalog
       |USING org.apache.spark.sql.sources.DDLScanSource
       |OPTIONS (
       |  From '1',
       |  To '10',
       |  Table 'test1'
    assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
    sql("DROP VIEW tab1")
    assert(catalog.listTables("default") == Nil)

  test("drop table") {
    testDropTable(isDatasourceTable = false)

  protected def testDropTable(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    assert(catalog.listTables("dbx") == Seq(tableIdent))
    sql("DROP TABLE dbx.tab1")
    assert(catalog.listTables("dbx") == Nil)
    sql("DROP TABLE IF EXISTS dbx.tab1")
    intercept[AnalysisException] {
      sql("DROP TABLE dbx.tab1")

  test("drop view") {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    assert(catalog.listTables("dbx") == Seq(tableIdent))

    val e = intercept[AnalysisException] {
      sql("DROP VIEW dbx.tab1")
      e.getMessage.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead"))

  private def convertToDatasourceTable(
      catalog: SessionCatalog,
      tableIdent: TableIdentifier): Unit = {
      provider = Some("csv")))
    assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))

  protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    def getProps: Map[String, String] = {
      if (isUsingHiveMetastore) {
      } else {
    // set table properties
    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
    assert(getProps == Map("andrew" -> "or14", "kor" -> "bel"))
    // set table properties without explicitly specifying database
    sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
    assert(getProps == Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")

  protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    def getProps: Map[String, String] = {
      if (isUsingHiveMetastore) {
      } else {
    // unset table properties
    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
    sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
    assert(getProps == Map("p" -> "an", "c" -> "lan", "x" -> "y"))
    // unset table properties without explicitly specifying database
    assert(getProps == Map("c" -> "lan", "x" -> "y"))
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
    // property to unset does not exist
    val e = intercept[AnalysisException] {
      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
    // property to unset does not exist, but "IF EXISTS" is specified
    assert(getProps == Map("x" -> "y"))

  protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    val partSpec = Map("a" -> "1", "b" -> "2")
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    createTablePartition(catalog, partSpec, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
      normalizeSerdeProp(catalog.getPartition(tableIdent, partSpec).storage.properties).isEmpty)

    // Verify that the location is set to the expected string
    def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
      val storageFormat = spec
        .map { s => catalog.getPartition(tableIdent, s).storage }
        .getOrElse { catalog.getTableMetadata(tableIdent).storage }
      // TODO(gatorsmile): fix the bug in alter table set location.
      // if (isUsingHiveMetastore) {
      //  assert(storageFormat.properties.get("path") === expected)
      // }
      assert(storageFormat.locationUri === Some(expected))
    // set table location
    sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
    verifyLocation(new URI("/path/to/your/lovely/heart"))
    // set table partition location
    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
    verifyLocation(new URI("/path/to/part/ways"), Some(partSpec))
    // set table location without explicitly specifying database
    sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
    verifyLocation(new URI("/swanky/steak/place"))
    // set table partition location without explicitly specifying database
    sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
    verifyLocation(new URI("vienna"), Some(partSpec))
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
    // partition to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE dbx.tab1 PARTITION (b='2') SET LOCATION '/mister/spark'")

  protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
      val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
      if (isUsingHiveMetastore) {
        assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
      } else {
        assert(serdeProp == expectedSerdeProps)
    if (isUsingHiveMetastore) {
      assert(catalog.getTableMetadata(tableIdent).storage.serde ==
    } else {
    checkSerdeProps(Map.empty[String, String])
    // set table serde and/or properties (should fail on datasource tables)
    if (isDatasourceTable) {
      val e1 = intercept[AnalysisException] {
        sql("ALTER TABLE dbx.tab1 SET SERDE 'whatever'")
      val e2 = intercept[AnalysisException] {
        sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
    } else {
      val newSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
      sql(s"ALTER TABLE dbx.tab1 SET SERDE '$newSerde'")
      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(newSerde))
      checkSerdeProps(Map.empty[String, String])
      val serde2 = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"
      sql(s"ALTER TABLE dbx.tab1 SET SERDE '$serde2' " +
        "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(serde2))
      checkSerdeProps(Map("k" -> "v", "kay" -> "vee"))
    // set serde properties only
    sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
    checkSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
    // set things without explicitly specifying database
    sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
    checkSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")

  protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    val spec = Map("a" -> "1", "b" -> "2")
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    createTablePartition(catalog, spec, tableIdent)
    createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent)
    createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent)
    createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
      val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
      if (isUsingHiveMetastore) {
        assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
      } else {
        assert(serdeProp == expectedSerdeProps)
    if (isUsingHiveMetastore) {
      assert(catalog.getPartition(tableIdent, spec).storage.serde ==
    } else {
      assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
    checkPartitionSerdeProps(Map.empty[String, String])
    // set table serde and/or properties (should fail on datasource tables)
    if (isDatasourceTable) {
      val e1 = intercept[AnalysisException] {
        sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'whatever'")
      val e2 = intercept[AnalysisException] {
        sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
    } else {
      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
      assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
      checkPartitionSerdeProps(Map.empty[String, String])
      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
        "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
      assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
      checkPartitionSerdeProps(Map("k" -> "v", "kay" -> "vee"))
    // set serde properties only
    maybeWrapException(isDatasourceTable) {
      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
        "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
      checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
    // set things without explicitly specifying database
    maybeWrapException(isDatasourceTable) {
      sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
      checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y')")

  protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    val part1 = Map("a" -> "1", "b" -> "5")
    val part2 = Map("a" -> "2", "b" -> "6")
    val part3 = Map("a" -> "3", "b" -> "7")
    val part4 = Map("a" -> "4", "b" -> "8")
    val part5 = Map("a" -> "9", "b" -> "9")
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    createTablePartition(catalog, part1, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

    // basic add partition
    sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
      "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
    val partitionLocation = if (isUsingHiveMetastore) {
      val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
      makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
    } else {
      new URI("paris")

    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)

    // add partitions without explicitly specifying database
    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(part1, part2, part3, part4))

    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")

    // partition to add already exists
    intercept[AnalysisException] {
      sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")

    // partition to add already exists when using IF NOT EXISTS
    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(part1, part2, part3, part4))

    // partition spec in ADD PARTITION should be case insensitive by default
    sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(part1, part2, part3, part4, part5))

  protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    val part1 = Map("a" -> "1", "b" -> "5")
    val part2 = Map("a" -> "2", "b" -> "6")
    val part3 = Map("a" -> "3", "b" -> "7")
    val part4 = Map("a" -> "4", "b" -> "8")
    val part5 = Map("a" -> "9", "b" -> "9")
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    createTablePartition(catalog, part1, tableIdent)
    createTablePartition(catalog, part2, tableIdent)
    createTablePartition(catalog, part3, tableIdent)
    createTablePartition(catalog, part4, tableIdent)
    createTablePartition(catalog, part5, tableIdent)
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(part1, part2, part3, part4, part5))
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)

    // basic drop partition
    sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part5))

    // drop partitions without explicitly specifying database
    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

    // table to alter does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')")

    // partition to drop does not exist
    intercept[AnalysisException] {
      sql("ALTER TABLE tab1 DROP PARTITION (a='300')")

    // partition to drop does not exist when using IF EXISTS
    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

    // partition spec in DROP PARTITION should be case insensitive by default
    sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part5))

    // use int literal as partition value for int type partition column
    sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")

  protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    val part1 = Map("a" -> "1", "b" -> "q")
    val part2 = Map("a" -> "2", "b" -> "c")
    val part3 = Map("a" -> "3", "b" -> "p")
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    createTablePartition(catalog, part1, tableIdent)
    createTablePartition(catalog, part2, tableIdent)
    createTablePartition(catalog, part3, tableIdent)
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)

    // basic rename partition
    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))

    // rename without explicitly specifying database
    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))

    // table to alter does not exist
    intercept[NoSuchTableException] {
      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")

    // partition to rename does not exist
    intercept[NoSuchPartitionException] {
      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")

    // partition spec in RENAME PARTITION should be case insensitive by default
    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))

  protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
    val catalog = spark.sessionState.catalog
    val resolver = spark.sessionState.conf.resolver
    val tableIdent = TableIdentifier("tab1", Some("dbx"))
    createDatabase(catalog, "dbx")
    createTable(catalog, tableIdent)
    if (isDatasourceTable) {
      convertToDatasourceTable(catalog, tableIdent)
    def getMetadata(colName: String): Metadata = {
      val column = catalog.getTableMetadata(tableIdent).schema.fields.find { field =>
        resolver(field.name, colName)
    // Ensure that change column will preserve other metadata fields.
    sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'")
    assert(getMetadata("col1").getString("key") == "value")

  test("drop build-in function") {
    Seq("true", "false").foreach { caseSensitive =>
      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
        // partition to add already exists
        var e = intercept[AnalysisException] {
          sql("DROP TEMPORARY FUNCTION year")
        assert(e.getMessage.contains("Cannot drop native function 'year'"))

        e = intercept[AnalysisException] {
        assert(e.getMessage.contains("Cannot drop native function 'YeAr'"))

        e = intercept[AnalysisException] {
          sql("DROP TEMPORARY FUNCTION `YeAr`")
        assert(e.getMessage.contains("Cannot drop native function 'YeAr'"))

  test("describe function") {
      sql("DESCRIBE FUNCTION log"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.Logarithm") ::
        Row("Function: log") ::
        Row("Usage: log(base, expr) - Returns the logarithm of `expr` with `base`.") :: Nil
    // predicate operator
      sql("DESCRIBE FUNCTION or"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.Or") ::
        Row("Function: or") ::
        Row("Usage: expr1 or expr2 - Logical OR.") :: Nil
      sql("DESCRIBE FUNCTION !"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.Not") ::
        Row("Function: !") ::
        Row("Usage: ! expr - Logical not.") :: Nil
    // arithmetic operators
      sql("DESCRIBE FUNCTION +"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.Add") ::
        Row("Function: +") ::
        Row("Usage: expr1 + expr2 - Returns `expr1`+`expr2`.") :: Nil
    // comparison operators
      sql("DESCRIBE FUNCTION <"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.LessThan") ::
        Row("Function: <") ::
        Row("Usage: expr1 < expr2 - Returns true if `expr1` is less than `expr2`.") :: Nil
    // STRING
      sql("DESCRIBE FUNCTION 'concat'"),
      Row("Class: org.apache.spark.sql.catalyst.expressions.Concat") ::
        Row("Function: concat") ::
        Row("Usage: concat(str1, str2, ..., strN) - " +
            "Returns the concatenation of str1, str2, ..., strN.") :: Nil
    // extended mode
      Row("Class: org.apache.spark.sql.catalyst.expressions.BitwiseXor") ::
          """Extended Usage:
            |    Examples:
            |      > SELECT 3 ^ 5;
            |       2
            |  """.stripMargin) ::
        Row("Function: ^") ::
        Row("Usage: expr1 ^ expr2 - Returns the result of " +
          "bitwise exclusive OR of `expr1` and `expr2`.") :: Nil

  test("create a data source table without schema") {
    import testImplicits._
    withTempPath { tempDir =>
      withTable("tab1", "tab2") {
        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)

        val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
        assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))

        sql(s"CREATE TABLE tab2 using json location '${tempDir.toURI}'")
        checkAnswer(spark.table("tab2"), Row("a", "b"))

  test("create table using CLUSTERED BY without schema specification") {
    import testImplicits._
    withTempPath { tempDir =>
      withTable("jsonTable") {
        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)

        val e = intercept[AnalysisException] {
             |CREATE TABLE jsonTable
             |USING org.apache.spark.sql.json
             |OPTIONS (
             |  path '${tempDir.getCanonicalPath}'
             |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
        assert(e.message == "Cannot specify bucketing information if the table schema is not " +
          "specified when creating and will be inferred at runtime")

  test("Create Data Source Table As Select") {
    import testImplicits._
    withTable("t", "t1", "t2") {
      sql("CREATE TABLE t USING parquet SELECT 1 as a, 1 as b")
      checkAnswer(spark.table("t"), Row(1, 1) :: Nil)

      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
      sql("CREATE TABLE t2 USING parquet SELECT a, b from t1")
      checkAnswer(spark.table("t2"), spark.table("t1"))

  test("drop current database") {
    sql("CREATE DATABASE temp")
    sql("USE temp")
    sql("DROP DATABASE temp")
    val e = intercept[AnalysisException] {
        sql("CREATE TABLE t (a INT, b INT) USING parquet")
    assert(e.contains("Database 'temp' not found"))

  test("drop default database") {
    val caseSensitiveOptions = if (isUsingHiveMetastore) Seq("false") else Seq("true", "false")
    caseSensitiveOptions.foreach { caseSensitive =>
      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
        var message = intercept[AnalysisException] {
          sql("DROP DATABASE default")
        assert(message.contains("Can not drop default database"))

        message = intercept[AnalysisException] {
          sql("DROP DATABASE DeFault")
        if (caseSensitive == "true") {
          assert(message.contains("Database 'DeFault' not found"))
        } else {
          assert(message.contains("Can not drop default database"))

  test("truncate table - datasource table") {
    import testImplicits._

    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
    // Test both a Hive compatible and incompatible code path.
    Seq("json", "parquet").foreach { format =>
      withTable("rectangles") {
          "bad test; table was empty to begin with")

        sql("TRUNCATE TABLE rectangles")

        // not supported since the table is not partitioned
        assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")

  test("truncate partitioned table - datasource table") {
    import testImplicits._

    val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")

    withTable("partTable") {
      data.write.partitionBy("width", "length").saveAsTable("partTable")
      // supported since partitions are stored in the metastore
      sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
      assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
      assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)

    withTable("partTable") {
      data.write.partitionBy("width", "length").saveAsTable("partTable")
      // support partial partition spec
      sql("TRUNCATE TABLE partTable PARTITION (width=1)")
      assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)

    withTable("partTable") {
      data.write.partitionBy("width", "length").saveAsTable("partTable")
      // do nothing if no partition is matched for the given partial partition spec
      sql("TRUNCATE TABLE partTable PARTITION (width=100)")
      assert(spark.table("partTable").count() == data.count())

      // throw exception if no partition is matched for the given non-partial partition spec.
      intercept[NoSuchPartitionException] {
        sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")

      // throw exception if the column in partition spec is not a partition column.
      val e = intercept[AnalysisException] {
        sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
      assert(e.message.contains("unknown is not a valid partition column"))

  test("create temporary view with mismatched schema") {
    withTable("tab1") {
      withView("view1") {
        val e = intercept[AnalysisException] {
          sql("CREATE TEMPORARY VIEW view1 (col1, col3) AS SELECT * FROM tab1")
        assert(e.contains("the SELECT clause (num: `1`) does not match")
          && e.contains("CREATE VIEW (num: `2`)"))

  test("create temporary view with specified schema") {
    withView("view1") {
      sql("CREATE TEMPORARY VIEW view1 (col1, col2) AS SELECT 1, 2")
        sql("SELECT * FROM view1"),
        Row(1, 2) :: Nil

  test("block creating duplicate temp table") {
    withView("t_temp") {
      sql("CREATE TEMPORARY VIEW t_temp AS SELECT 1, 2")
      val e = intercept[TempTableAlreadyExistsException] {
        sql("CREATE TEMPORARY TABLE t_temp (c3 int, c4 string) USING JSON")
      assert(e.contains("Temporary table 't_temp' already exists"))

  test("truncate table - external table, temporary table, view (not allowed)") {
    import testImplicits._
    withTempPath { tempDir =>
      withTable("my_ext_tab") {
        (("a", "b") :: Nil).toDF().write.parquet(tempDir.getCanonicalPath)
        (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
        sql(s"CREATE TABLE my_ext_tab using parquet LOCATION '${tempDir.toURI}'")
        sql(s"CREATE VIEW my_view AS SELECT 1")
        intercept[NoSuchTableException] {
          sql("TRUNCATE TABLE my_temp_tab")
        assertUnsupported("TRUNCATE TABLE my_ext_tab")
        assertUnsupported("TRUNCATE TABLE my_view")

  test("truncate table - non-partitioned table (not allowed)") {
    withTable("my_tab") {
      sql("CREATE TABLE my_tab (age INT, name STRING) using parquet")
      sql("INSERT INTO my_tab values (10, 'a')")
      assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")

  test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
    import testImplicits._
    val df = Seq((1, 2, 3)).toDF("a", "b", "c")
    withTable("partitionedTable") {
      df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable")
      // Misses some partition columns
      intercept[AnalysisException] {
      // Wrong order
      intercept[AnalysisException] {
        df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable")
      // Partition columns not specified
      intercept[AnalysisException] {
      assert(sql("select * from partitionedTable").collect().size == 1)
      // Inserts new data successfully when partition columns are correctly specified in
      // partitionBy(...).
      // TODO: Right now, partition columns are always treated in a case-insensitive way.
      // See the write method in DataSource.scala.
      Seq((4, 5, 6)).toDF("a", "B", "c")
        .partitionBy("a", "B")

      Seq((7, 8, 9)).toDF("a", "b", "c")
        .partitionBy("a", "b")

        sql("select a, b, c from partitionedTable"),
        Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil

  test("show functions") {
    withUserDefinedFunction("add_one" -> true) {
      val numFunctions = FunctionRegistry.functionSet.size.toLong
      assert(sql("show functions").count() === numFunctions)
      assert(sql("show system functions").count() === numFunctions)
      assert(sql("show all functions").count() === numFunctions)
      assert(sql("show user functions").count() === 0L)
      spark.udf.register("add_one", (x: Long) => x + 1)
      assert(sql("show functions").count() === numFunctions + 1L)
      assert(sql("show system functions").count() === numFunctions)
      assert(sql("show all functions").count() === numFunctions + 1L)
      assert(sql("show user functions").count() === 1L)

  test("show columns - negative test") {
    // When case sensitivity is true, the user supplied database name in table identifier
    // should match the supplied database name in case sensitive way.
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      withTempDatabase { db =>
        val tabName = s"$db.showcolumn"
        withTable(tabName) {
          sql(s"CREATE TABLE $tabName(col1 int, col2 string) USING parquet ")
          val message = intercept[AnalysisException] {
            sql(s"SHOW COLUMNS IN $db.showcolumn FROM ${db.toUpperCase(Locale.ROOT)}")
          assert(message.contains("SHOW COLUMNS with conflicting databases"))

  test("SPARK-18009 calling toLocalIterator on commands") {
    import scala.collection.JavaConverters._
    val df = sql("show databases")
    val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
    assert(rows.length > 0)

  test("SET LOCATION for managed table") {
    withTable("tbl") {
      withTempDir { dir =>
        sql("CREATE TABLE tbl(i INT) USING parquet")
        sql("INSERT INTO tbl SELECT 1")
        checkAnswer(spark.table("tbl"), Row(1))
        val defaultTablePath = spark.sessionState.catalog

        sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
        // SET LOCATION won't move data from previous table path to new table path.
        assert(spark.table("tbl").count() == 0)
        // the previous table path should be still there.
        assert(new File(defaultTablePath).exists())

        sql("INSERT INTO tbl SELECT 2")
        checkAnswer(spark.table("tbl"), Row(2))
        // newly inserted data will go to the new table path.

        sql("DROP TABLE tbl")
        // the new table path will be removed after DROP TABLE.

  test("insert data to a data source table which has a non-existing location should succeed") {
    withTable("t") {
      withTempDir { dir =>
             |CREATE TABLE t(a string, b int)
             |USING parquet
             |OPTIONS(path "$dir")
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

        spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

        spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

        val newDirFile = new File(dir, "x")
        val newDir = newDirFile.getAbsolutePath
        spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

        val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table1.location == new URI(newDir))

        spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

  test("insert into a data source table with a non-existing partition location should succeed") {
    withTable("t") {
      withTempDir { dir =>
             |CREATE TABLE t(a int, b int, c int, d int)
             |USING parquet
             |PARTITIONED BY(a, b)
             |LOCATION "$dir"
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

        spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
        checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

        val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
        // insert overwrite into a partition which location has been deleted.
        spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
        checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

  test("read data from a data source table which has a non-existing location should succeed") {
    withTable("t") {
      withTempDir { dir =>
             |CREATE TABLE t(a string, b int)
             |USING parquet
             |OPTIONS(path "$dir")
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))

        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

        checkAnswer(spark.table("t"), Nil)

        val newDirFile = new File(dir, "x")
        val newDir = newDirFile.toURI
        spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

        val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table1.location == newDir)
        checkAnswer(spark.table("t"), Nil)

  test("read data from a data source table with non-existing partition location should succeed") {
    withTable("t") {
      withTempDir { dir =>
             |CREATE TABLE t(a int, b int, c int, d int)
             |USING parquet
             |PARTITIONED BY(a, b)
             |LOCATION "$dir"
        spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
        checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

        // select from a partition which location has been deleted.
        spark.sql("REFRESH TABLE t")
        checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)

  test("create datasource table with a non-existing location") {
    withTable("t", "t1") {
      withTempPath { dir =>
        spark.sql(s"CREATE TABLE t(a int, b int) USING parquet LOCATION '$dir'")

        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

        spark.sql("INSERT INTO TABLE t SELECT 1, 2")

        checkAnswer(spark.table("t"), Row(1, 2))
      // partition table
      withTempPath { dir =>
        spark.sql(s"CREATE TABLE t1(a int, b int) USING parquet PARTITIONED BY(a) LOCATION '$dir'")

        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

        spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")

        val partDir = new File(dir, "a=1")

        checkAnswer(spark.table("t1"), Row(2, 1))

  Seq(true, false).foreach { shouldDelete =>
    val tcName = if (shouldDelete) "non-existing" else "existed"
    test(s"CTAS for external data source table with a $tcName location") {
      withTable("t", "t1") {
        withTempDir { dir =>
          if (shouldDelete) dir.delete()
               |CREATE TABLE t
               |USING parquet
               |LOCATION '$dir'
               |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

          checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
        // partition table
        withTempDir { dir =>
          if (shouldDelete) dir.delete()
               |CREATE TABLE t1
               |USING parquet
               |PARTITIONED BY(a, b)
               |LOCATION '$dir'
               |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))

          val partDir = new File(dir, "a=3")

          checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))

  Seq("a b", "a:b", "a%b", "a,b").foreach { specialChars =>
    test(s"data source table:partition column name containing $specialChars") {
      withTable("t") {
        withTempDir { dir =>
               |CREATE TABLE t(a string, `$specialChars` string)
               |USING parquet
               |PARTITIONED BY(`$specialChars`)
               |LOCATION '$dir'

          spark.sql(s"INSERT INTO TABLE t PARTITION(`$specialChars`=2) SELECT 1")
          val partEscaped = s"${ExternalCatalogUtils.escapePathName(specialChars)}=2"
          val partFile = new File(dir, partEscaped)
          assert(partFile.listFiles().length >= 1)
          checkAnswer(spark.table("t"), Row("1", "2") :: Nil)

  Seq("a b", "a:b", "a%b").foreach { specialChars =>
    test(s"location uri contains $specialChars for datasource table") {
      withTable("t", "t1") {
        withTempDir { dir =>
          val loc = new File(dir, specialChars)
               |CREATE TABLE t(a string)
               |USING parquet
               |LOCATION '$loc'

          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
          assert(new Path(table.location).toString.contains(specialChars))

          spark.sql("INSERT INTO TABLE t SELECT 1")
          assert(loc.listFiles().length >= 1)
          checkAnswer(spark.table("t"), Row("1") :: Nil)

        withTempDir { dir =>
          val loc = new File(dir, specialChars)
               |CREATE TABLE t1(a string, b string)
               |USING parquet
               |PARTITIONED BY(b)
               |LOCATION '$loc'

          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
          assert(new Path(table.location).toString.contains(specialChars))

          spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
          val partFile = new File(loc, "b=2")
          assert(partFile.listFiles().length >= 1)
          checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)

          spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
          val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
          val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
          assert(partFile2.listFiles().length >= 1)
          checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)

  Seq("a b", "a:b", "a%b").foreach { specialChars =>
    test(s"location uri contains $specialChars for database") {
      try {
        withTable("t") {
          withTempDir { dir =>
            val loc = new File(dir, specialChars)
            spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
            spark.sql("USE tmpdb")

            import testImplicits._
            val tblloc = new File(loc, "t")
            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
            assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
      } finally {
        spark.sql("DROP DATABASE IF EXISTS tmpdb")

  test("the qualified path of a datasource table is stored in the catalog") {
    withTable("t", "t1") {
      withTempDir { dir =>
             |CREATE TABLE t(a string)
             |USING parquet
             |LOCATION '$dir'
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))

      withTempDir { dir =>
             |CREATE TABLE t1(a string, b string)
             |USING parquet
             |PARTITIONED BY(b)
             |LOCATION '$dir'
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))

  val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")

  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
    test(s"alter datasource table add columns - $provider") {
      withTable("t1") {
        sql(s"CREATE TABLE t1 (c1 int) USING $provider")
        sql("INSERT INTO t1 VALUES (1)")
        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
          Seq(Row(1, null))
          sql("SELECT * FROM t1 WHERE c2 is null"),
          Seq(Row(1, null))

        sql("INSERT INTO t1 VALUES (3, 2)")
          sql("SELECT * FROM t1 WHERE c2 = 2"),
          Seq(Row(3, 2))

  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
    test(s"alter datasource table add columns - partitioned - $provider") {
      withTable("t1") {
        sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
        sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
        sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
          Seq(Row(1, null, 2))
          sql("SELECT * FROM t1 WHERE c3 is null"),
          Seq(Row(1, null, 2))
        sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
          sql("SELECT * FROM t1 WHERE c3 = 3"),
          Seq(Row(2, 3, 1))
          sql("SELECT * FROM t1 WHERE c2 = 1"),
          Seq(Row(2, 3, 1))

  test("alter datasource table add columns - text format not supported") {
    withTable("t1") {
      sql("CREATE TABLE t1 (c1 int) USING text")
      val e = intercept[AnalysisException] {
        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
      assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))

  test("alter table add columns -- not support temp view") {
    withTempView("tmp_v") {
      sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
      val e = intercept[AnalysisException] {
        sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))

  test("alter table add columns -- not support view") {
    withView("v1") {
      sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
      val e = intercept[AnalysisException] {
        sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))

  test("alter table add columns with existing column name") {
    withTable("t1") {
      sql("CREATE TABLE t1 (c1 int) USING PARQUET")
      val e = intercept[AnalysisException] {
        sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
      assert(e.contains("Found duplicate column(s)"))

  Seq(true, false).foreach { caseSensitive =>
    test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
      withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
        withTable("t1") {
          sql("CREATE TABLE t1 (c1 int) USING PARQUET")
          if (!caseSensitive) {
            val e = intercept[AnalysisException] {
              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
            assert(e.contains("Found duplicate column(s)"))
          } else {
            if (isUsingHiveMetastore) {
              // hive catalog will still complains that c1 is duplicate column name because hive
              // identifiers are case insensitive.
              val e = intercept[AnalysisException] {
                sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
            } else {
              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
                .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))