aboutsummaryrefslogblamecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
blob: 6469e501c1f68bfe3828aa29c4abb1686910d1c1 (plain) (tree)


















                                                                           
                   
 


                                       
                                             
                                                                       
                                                                                           



                                                                             
                                            








                                                                      
                        
 
                                                                         



                                                                                    
                                                    
                                                                                       














                                                                                
                                                                                   


                                                                          
                                                                                                 






















                                                                                           

                                                             

                                                                   
                                                          
                                                                                         
                                                                                                  

                                                                                      
                                                                  
     




                                                                                                  










                                    
                                                      

                                                          

                                                    

                                                        
                                                    




                                               
                                                                                       

                            
                                                                       

                                         
                                                                       



                               
                                                        


                                    
                                                                    

                                                     
                                                                    

                                        
                                                                                      


                                     
                                                                 




                                            
                                                                             
                           
                                                                            
                                        
                                                                            

                                          
                                                                            
                           
                                                                            

                                        


                                                                             

                                          
                                            




                                               

                                                                 


                                      
                                                                              




                                               
                                                                              





                                                

                                                                 




                                              

                                                                                      





                                               







                                                                                                   

                                            

                                                                                       




                                                 
                                               








                                               




                                           





















                                                                                          




                                                                                                 













                                                                              


                                                                                       
                                                                                               







                                                                                         
                                                                                   
                                                                               



                                                                                     





                                                                               










                                                                                                   

                                                                   
                            
                                                       


     
                     


                                           


                                                                         





                                      
                                                                

                                 
                                                                    
                                                                     

                                                               
                                                                     
                                            
                                                                



       
                        


                                                                   


                                                                          





                                       



                                                  

                                    



                                                      
 




                                                                        
                                                                  
                                                                      
                                            



                                                          

















                                                       
                                                        









                                                      
                                                       




                                                               


                                                                                      










                                                                   
                                                         









                                                       
                                                        




                                                                  


                                                                                             



       









                                                                                     
                                                                                       




















                                                                                                  
                                                             










                                                                                           


                                                        
 
















                                                                                                  
 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.internal

import java.io.File

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType


/**
 * Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
 */
class CatalogSuite
  extends SparkFunSuite
  with BeforeAndAfterEach
  with SharedSQLContext {
  import testImplicits._

  private def sessionCatalog: SessionCatalog = spark.sessionState.catalog

  private val utils = new CatalogTestUtils {
    override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
    override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
    override val defaultProvider: String = "parquet"
    override def newEmptyCatalog(): ExternalCatalog = spark.sharedState.externalCatalog
  }

  private def createDatabase(name: String): Unit = {
    sessionCatalog.createDatabase(utils.newDb(name), ignoreIfExists = false)
  }

  private def dropDatabase(name: String): Unit = {
    sessionCatalog.dropDatabase(name, ignoreIfNotExists = false, cascade = true)
  }

  private def createTable(name: String, db: Option[String] = None): Unit = {
    sessionCatalog.createTable(utils.newTable(name, db), ignoreIfExists = false)
  }

  private def createTempTable(name: String): Unit = {
    sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true)
  }

  private def dropTable(name: String, db: Option[String] = None): Unit = {
    sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
  }

  private def createFunction(name: String, db: Option[String] = None): Unit = {
    sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false)
  }

  private def createTempFunction(name: String): Unit = {
    val info = new ExpressionInfo("className", name)
    val tempFunc = (e: Seq[Expression]) => e.head
    sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
  }

  private def dropFunction(name: String, db: Option[String] = None): Unit = {
    sessionCatalog.dropFunction(FunctionIdentifier(name, db), ignoreIfNotExists = false)
  }

  private def dropTempFunction(name: String): Unit = {
    sessionCatalog.dropTempFunction(name, ignoreIfNotExists = false)
  }

  private def testListColumns(tableName: String, dbName: Option[String]): Unit = {
    val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, dbName))
    val columns = dbName
      .map { db => spark.catalog.listColumns(db, tableName) }
      .getOrElse { spark.catalog.listColumns(tableName) }
    assume(tableMetadata.schema.nonEmpty, "bad test")
    assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
    assume(tableMetadata.bucketSpec.isDefined, "bad test")
    assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
    val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
    columns.collect().foreach { col =>
      assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
      assert(col.isBucket == bucketColumnNames.contains(col.name))
    }

    dbName.foreach { db =>
      val expected = columns.collect().map(_.name).toSet
      assert(spark.catalog.listColumns(s"$db.$tableName").collect().map(_.name).toSet == expected)
    }
  }

  override def afterEach(): Unit = {
    try {
      sessionCatalog.reset()
    } finally {
      super.afterEach()
    }
  }

  test("current database") {
    assert(spark.catalog.currentDatabase == "default")
    assert(sessionCatalog.getCurrentDatabase == "default")
    createDatabase("my_db")
    spark.catalog.setCurrentDatabase("my_db")
    assert(spark.catalog.currentDatabase == "my_db")
    assert(sessionCatalog.getCurrentDatabase == "my_db")
    val e = intercept[AnalysisException] {
      spark.catalog.setCurrentDatabase("unknown_db")
    }
    assert(e.getMessage.contains("unknown_db"))
  }

  test("list databases") {
    assert(spark.catalog.listDatabases().collect().map(_.name).toSet == Set("default"))
    createDatabase("my_db1")
    createDatabase("my_db2")
    assert(spark.catalog.listDatabases().collect().map(_.name).toSet ==
      Set("default", "my_db1", "my_db2"))
    dropDatabase("my_db1")
    assert(spark.catalog.listDatabases().collect().map(_.name).toSet ==
      Set("default", "my_db2"))
  }

  test("list tables") {
    assert(spark.catalog.listTables().collect().isEmpty)
    createTable("my_table1")
    createTable("my_table2")
    createTempTable("my_temp_table")
    assert(spark.catalog.listTables().collect().map(_.name).toSet ==
      Set("my_table1", "my_table2", "my_temp_table"))
    dropTable("my_table1")
    assert(spark.catalog.listTables().collect().map(_.name).toSet ==
      Set("my_table2", "my_temp_table"))
    dropTable("my_temp_table")
    assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
  }

  test("list tables with database") {
    assert(spark.catalog.listTables("default").collect().isEmpty)
    createDatabase("my_db1")
    createDatabase("my_db2")
    createTable("my_table1", Some("my_db1"))
    createTable("my_table2", Some("my_db2"))
    createTempTable("my_temp_table")
    assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
      Set("my_temp_table"))
    assert(spark.catalog.listTables("my_db1").collect().map(_.name).toSet ==
      Set("my_table1", "my_temp_table"))
    assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
      Set("my_table2", "my_temp_table"))
    dropTable("my_table1", Some("my_db1"))
    assert(spark.catalog.listTables("my_db1").collect().map(_.name).toSet ==
      Set("my_temp_table"))
    assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
      Set("my_table2", "my_temp_table"))
    dropTable("my_temp_table")
    assert(spark.catalog.listTables("default").collect().map(_.name).isEmpty)
    assert(spark.catalog.listTables("my_db1").collect().map(_.name).isEmpty)
    assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
      Set("my_table2"))
    val e = intercept[AnalysisException] {
      spark.catalog.listTables("unknown_db")
    }
    assert(e.getMessage.contains("unknown_db"))
  }

  test("list functions") {
    assert(Set("+", "current_database", "window").subsetOf(
      spark.catalog.listFunctions().collect().map(_.name).toSet))
    createFunction("my_func1")
    createFunction("my_func2")
    createTempFunction("my_temp_func")
    val funcNames1 = spark.catalog.listFunctions().collect().map(_.name).toSet
    assert(funcNames1.contains("my_func1"))
    assert(funcNames1.contains("my_func2"))
    assert(funcNames1.contains("my_temp_func"))
    dropFunction("my_func1")
    dropTempFunction("my_temp_func")
    val funcNames2 = spark.catalog.listFunctions().collect().map(_.name).toSet
    assert(!funcNames2.contains("my_func1"))
    assert(funcNames2.contains("my_func2"))
    assert(!funcNames2.contains("my_temp_func"))
  }

  test("list functions with database") {
    assert(Set("+", "current_database", "window").subsetOf(
      spark.catalog.listFunctions().collect().map(_.name).toSet))
    createDatabase("my_db1")
    createDatabase("my_db2")
    createFunction("my_func1", Some("my_db1"))
    createFunction("my_func2", Some("my_db2"))
    createTempFunction("my_temp_func")
    val funcNames1 = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
    val funcNames2 = spark.catalog.listFunctions("my_db2").collect().map(_.name).toSet
    assert(funcNames1.contains("my_func1"))
    assert(!funcNames1.contains("my_func2"))
    assert(funcNames1.contains("my_temp_func"))
    assert(!funcNames2.contains("my_func1"))
    assert(funcNames2.contains("my_func2"))
    assert(funcNames2.contains("my_temp_func"))

    // Make sure database is set properly.
    assert(
      spark.catalog.listFunctions("my_db1").collect().map(_.database).toSet == Set("my_db1", null))
    assert(
      spark.catalog.listFunctions("my_db2").collect().map(_.database).toSet == Set("my_db2", null))

    // Remove the function and make sure they no longer appear.
    dropFunction("my_func1", Some("my_db1"))
    dropTempFunction("my_temp_func")
    val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
    val funcNames2b = spark.catalog.listFunctions("my_db2").collect().map(_.name).toSet
    assert(!funcNames1b.contains("my_func1"))
    assert(!funcNames1b.contains("my_temp_func"))
    assert(funcNames2b.contains("my_func2"))
    assert(!funcNames2b.contains("my_temp_func"))
    val e = intercept[AnalysisException] {
      spark.catalog.listFunctions("unknown_db")
    }
    assert(e.getMessage.contains("unknown_db"))
  }

  test("list columns") {
    createTable("tab1")
    testListColumns("tab1", dbName = None)
  }

  test("list columns in temporary table") {
    createTempTable("temp1")
    spark.catalog.listColumns("temp1")
  }

  test("list columns in database") {
    createDatabase("db1")
    createTable("tab1", Some("db1"))
    testListColumns("tab1", dbName = Some("db1"))
  }

  test("Database.toString") {
    assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
      "Database[name='cool_db', description='cool_desc', path='cool_path']")
    assert(new Database("cool_db", null, "cool_path").toString ==
      "Database[name='cool_db', path='cool_path']")
  }

  test("Table.toString") {
    assert(new Table("volley", "databasa", "one", "world", isTemporary = true).toString ==
      "Table[name='volley', database='databasa', description='one', " +
        "tableType='world', isTemporary='true']")
    assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
      "Table[name='volley', tableType='world', isTemporary='true']")
  }

  test("Function.toString") {
    assert(
      new Function("nama", "databasa", "commenta", "classNameAh", isTemporary = true).toString ==
      "Function[name='nama', database='databasa', description='commenta', " +
        "className='classNameAh', isTemporary='true']")
    assert(new Function("nama", null, null, "classNameAh", isTemporary = false).toString ==
      "Function[name='nama', className='classNameAh', isTemporary='false']")
  }

  test("Column.toString") {
    assert(new Column("namama", "descaca", "datatapa",
      nullable = true, isPartition = false, isBucket = true).toString ==
        "Column[name='namama', description='descaca', dataType='datatapa', " +
          "nullable='true', isPartition='false', isBucket='true']")
    assert(new Column("namama", null, "datatapa",
      nullable = false, isPartition = true, isBucket = true).toString ==
      "Column[name='namama', dataType='datatapa', " +
        "nullable='false', isPartition='true', isBucket='true']")
  }

  test("catalog classes format in Dataset.show") {
    val db = new Database("nama", "descripta", "locata")
    val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false)
    val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false)
    val column = new Column(
      "nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
    val dbFields = ScalaReflection.getConstructorParameterValues(db)
    val tableFields = ScalaReflection.getConstructorParameterValues(table)
    val functionFields = ScalaReflection.getConstructorParameterValues(function)
    val columnFields = ScalaReflection.getConstructorParameterValues(column)
    assert(dbFields == Seq("nama", "descripta", "locata"))
    assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
    assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false))
    assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
    val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
    val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)
    val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)
    val columnString = CatalogImpl.makeDataset(Seq(column), spark).showString(10)
    dbFields.foreach { f => assert(dbString.contains(f.toString)) }
    tableFields.foreach { f => assert(tableString.contains(f.toString)) }
    functionFields.foreach { f => assert(functionString.contains(f.toString)) }
    columnFields.foreach { f => assert(columnString.contains(f.toString)) }
  }

  test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
    withTable("same_name") {
      spark.range(10).write.saveAsTable("same_name")
      sql("CACHE TABLE same_name")
      assert(spark.catalog.isCached("default.same_name"))
      spark.catalog.dropTempView("same_name")
      assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
      assert(spark.catalog.isCached("default.same_name"))
    }
  }

  test("get database") {
    intercept[AnalysisException](spark.catalog.getDatabase("db10"))
    withTempDatabase { db =>
      assert(spark.catalog.getDatabase(db).name === db)
    }
  }

  test("get table") {
    withTempDatabase { db =>
      withTable(s"tbl_x", s"$db.tbl_y") {
        // Try to find non existing tables.
        intercept[AnalysisException](spark.catalog.getTable("tbl_x"))
        intercept[AnalysisException](spark.catalog.getTable("tbl_y"))
        intercept[AnalysisException](spark.catalog.getTable(db, "tbl_y"))

        // Create objects.
        createTempTable("tbl_x")
        createTable("tbl_y", Some(db))

        // Find a temporary table
        assert(spark.catalog.getTable("tbl_x").name === "tbl_x")

        // Find a qualified table
        assert(spark.catalog.getTable(db, "tbl_y").name === "tbl_y")
        assert(spark.catalog.getTable(s"$db.tbl_y").name === "tbl_y")

        // Find an unqualified table using the current database
        intercept[AnalysisException](spark.catalog.getTable("tbl_y"))
        spark.catalog.setCurrentDatabase(db)
        assert(spark.catalog.getTable("tbl_y").name === "tbl_y")
      }
    }
  }

  test("get function") {
    withTempDatabase { db =>
      withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
        // Try to find non existing functions.
        intercept[AnalysisException](spark.catalog.getFunction("fn1"))
        intercept[AnalysisException](spark.catalog.getFunction("fn2"))
        intercept[AnalysisException](spark.catalog.getFunction(db, "fn2"))

        // Create objects.
        createTempFunction("fn1")
        createFunction("fn2", Some(db))

        // Find a temporary function
        val fn1 = spark.catalog.getFunction("fn1")
        assert(fn1.name === "fn1")
        assert(fn1.database === null)
        assert(fn1.isTemporary)

        // Find a qualified function
        val fn2 = spark.catalog.getFunction(db, "fn2")
        assert(fn2.name === "fn2")
        assert(fn2.database === db)
        assert(!fn2.isTemporary)

        val fn2WithQualifiedName = spark.catalog.getFunction(s"$db.fn2")
        assert(fn2WithQualifiedName.name === "fn2")
        assert(fn2WithQualifiedName.database === db)
        assert(!fn2WithQualifiedName.isTemporary)

        // Find an unqualified function using the current database
        intercept[AnalysisException](spark.catalog.getFunction("fn2"))
        spark.catalog.setCurrentDatabase(db)
        val unqualified = spark.catalog.getFunction("fn2")
        assert(unqualified.name === "fn2")
        assert(unqualified.database === db)
        assert(!unqualified.isTemporary)
      }
    }
  }

  test("database exists") {
    assert(!spark.catalog.databaseExists("db10"))
    createDatabase("db10")
    assert(spark.catalog.databaseExists("db10"))
    dropDatabase("db10")
  }

  test("table exists") {
    withTempDatabase { db =>
      withTable(s"tbl_x", s"$db.tbl_y") {
        // Try to find non existing tables.
        assert(!spark.catalog.tableExists("tbl_x"))
        assert(!spark.catalog.tableExists("tbl_y"))
        assert(!spark.catalog.tableExists(db, "tbl_y"))
        assert(!spark.catalog.tableExists(s"$db.tbl_y"))

        // Create objects.
        createTempTable("tbl_x")
        createTable("tbl_y", Some(db))

        // Find a temporary table
        assert(spark.catalog.tableExists("tbl_x"))

        // Find a qualified table
        assert(spark.catalog.tableExists(db, "tbl_y"))
        assert(spark.catalog.tableExists(s"$db.tbl_y"))

        // Find an unqualified table using the current database
        assert(!spark.catalog.tableExists("tbl_y"))
        spark.catalog.setCurrentDatabase(db)
        assert(spark.catalog.tableExists("tbl_y"))

        // Unable to find the table, although the temp view with the given name exists
        assert(!spark.catalog.tableExists(db, "tbl_x"))
      }
    }
  }

  test("function exists") {
    withTempDatabase { db =>
      withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
        // Try to find non existing functions.
        assert(!spark.catalog.functionExists("fn1"))
        assert(!spark.catalog.functionExists("fn2"))
        assert(!spark.catalog.functionExists(db, "fn2"))
        assert(!spark.catalog.functionExists(s"$db.fn2"))

        // Create objects.
        createTempFunction("fn1")
        createFunction("fn2", Some(db))

        // Find a temporary function
        assert(spark.catalog.functionExists("fn1"))

        // Find a qualified function
        assert(spark.catalog.functionExists(db, "fn2"))
        assert(spark.catalog.functionExists(s"$db.fn2"))

        // Find an unqualified function using the current database
        assert(!spark.catalog.functionExists("fn2"))
        spark.catalog.setCurrentDatabase(db)
        assert(spark.catalog.functionExists("fn2"))

        // Unable to find the function, although the temp function with the given name exists
        assert(!spark.catalog.functionExists(db, "fn1"))
      }
    }
  }

  test("createTable with 'path' in options") {
    withTable("t") {
      withTempDir { dir =>
        spark.catalog.createTable(
          tableName = "t",
          source = "json",
          schema = new StructType().add("i", "int"),
          options = Map("path" -> dir.getAbsolutePath))
        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
        assert(table.tableType == CatalogTableType.EXTERNAL)
        assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))

        Seq((1)).toDF("i").write.insertInto("t")
        assert(dir.exists() && dir.listFiles().nonEmpty)

        sql("DROP TABLE t")
        // the table path and data files are still there after DROP TABLE, if custom table path is
        // specified.
        assert(dir.exists() && dir.listFiles().nonEmpty)
      }
    }
  }

  test("createTable without 'path' in options") {
    withTable("t") {
      spark.catalog.createTable(
        tableName = "t",
        source = "json",
        schema = new StructType().add("i", "int"),
        options = Map.empty[String, String])
      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
      assert(table.tableType == CatalogTableType.MANAGED)
      val tablePath = new File(table.storage.locationUri.get)
      assert(tablePath.exists() && tablePath.listFiles().isEmpty)

      Seq((1)).toDF("i").write.insertInto("t")
      assert(tablePath.listFiles().nonEmpty)

      sql("DROP TABLE t")
      // the table path is removed after DROP TABLE, if custom table path is not specified.
      assert(!tablePath.exists())
    }
  }

  test("clone Catalog") {
    // need to test tempTables are cloned
    assert(spark.catalog.listTables().collect().isEmpty)

    createTempTable("my_temp_table")
    assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))

    // inheritance
    val forkedSession = spark.cloneSession()
    assert(spark ne forkedSession)
    assert(spark.catalog ne forkedSession.catalog)
    assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))

    // independence
    dropTable("my_temp_table") // drop table in original session
    assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
    assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))
    forkedSession.sessionState.catalog
      .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = true)
    assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
  }
}