From eebc8c1c95fb7752d09a5846b7cac65f7702c8f2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Mar 2016 16:25:15 -0700 Subject: [SPARK-13923][SPARK-14014][SQL] Session catalog follow-ups ## What changes were proposed in this pull request? This patch addresses the remaining comments left in #11750 and #11918 after they are merged. For a full list of changes in this patch, just trace the commits. ## How was this patch tested? `SessionCatalogSuite` and `CatalogTestCases` Author: Andrew Or Closes #12006 from andrewor14/session-catalog-followup. --- .../sql/catalyst/catalog/InMemoryCatalog.scala | 18 +- .../sql/catalyst/catalog/SessionCatalog.scala | 74 ++--- .../spark/sql/catalyst/catalog/interface.scala | 14 +- .../spark/sql/catalyst/analysis/AnalysisTest.scala | 2 +- .../catalyst/analysis/DecimalPrecisionSuite.scala | 2 +- .../sql/catalyst/catalog/CatalogTestCases.scala | 6 +- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 30 +-- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 4 +- .../org/apache/spark/sql/hive/HiveCatalog.scala | 297 -------------------- .../org/apache/spark/sql/hive/HiveContext.scala | 4 +- .../spark/sql/hive/HiveExternalCatalog.scala | 298 +++++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 22 +- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 6 +- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 6 +- .../apache/spark/sql/hive/client/HiveClient.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 11 +- .../sql/hive/execution/CreateTableAsSelect.scala | 6 +- .../sql/hive/execution/CreateViewAsSelect.scala | 4 +- .../org/apache/spark/sql/hive/test/TestHive.scala | 4 +- .../apache/spark/sql/hive/HiveCatalogSuite.scala | 49 ---- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 49 ++++ .../org/apache/spark/sql/hive/HiveQlSuite.scala | 16 +- .../apache/spark/sql/hive/ListTablesSuite.scala | 2 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- 26 files changed, 469 insertions(+), 463 deletions(-) delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index e216fa5528..2bbb970ec9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -155,7 +155,7 @@ class InMemoryCatalog extends ExternalCatalog { tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) - val table = tableDefinition.name.table + val table = tableDefinition.identifier.table if (tableExists(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") @@ -182,14 +182,14 @@ class InMemoryCatalog extends ExternalCatalog { override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { requireTableExists(db, oldName) val oldDesc = catalog(db).tables(oldName) - oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db))) + oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) catalog(db).tables.put(newName, oldDesc) catalog(db).tables.remove(oldName) } override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized { - requireTableExists(db, tableDefinition.name.table) - catalog(db).tables(tableDefinition.name.table).table = tableDefinition + requireTableExists(db, tableDefinition.identifier.table) + catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition } override def getTable(db: String, table: String): CatalogTable = synchronized { @@ -296,10 +296,10 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (functionExists(db, func.name.funcName)) { + if (functionExists(db, func.identifier.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { - catalog(db).functions.put(func.name.funcName, func) + catalog(db).functions.put(func.identifier.funcName, func) } } @@ -310,14 +310,14 @@ class InMemoryCatalog extends ExternalCatalog { override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { requireFunctionExists(db, oldName) - val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db))) + val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db))) catalog(db).functions.remove(oldName) catalog(db).functions.put(newName, newFunc) } override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized { - requireFunctionExists(db, funcDefinition.name.funcName) - catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition) + requireFunctionExists(db, funcDefinition.identifier.funcName) + catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition) } override def getFunction(db: String, funcName: String): CatalogFunction = synchronized { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 34265faa74..a9cf80764d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.catalyst.catalog -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} @@ -31,6 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * An internal catalog that is used by a Spark Session. This internal catalog serves as a * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. + * + * This class is not thread-safe. */ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { import ExternalCatalog._ @@ -39,8 +39,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { this(externalCatalog, new SimpleCatalystConf(true)) } - protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan] + protected[this] val tempFunctions = new mutable.HashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first @@ -122,9 +122,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * If no such database is specified, create it in the current database. */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val db = tableDefinition.identifier.database.getOrElse(currentDb) + val table = formatTableName(tableDefinition.identifier.table) + val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -138,9 +138,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * this becomes a no-op. */ def alterTable(tableDefinition: CatalogTable): Unit = { - val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val db = tableDefinition.identifier.database.getOrElse(currentDb) + val table = formatTableName(tableDefinition.identifier.table) + val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -164,9 +164,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { def createTempTable( name: String, tableDefinition: LogicalPlan, - ignoreIfExists: Boolean): Unit = { + overrideIfExists: Boolean): Unit = { val table = formatTableName(name) - if (tempTables.containsKey(table) && !ignoreIfExists) { + if (tempTables.contains(table) && !overrideIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } tempTables.put(table, tableDefinition) @@ -188,10 +188,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { val db = oldName.database.getOrElse(currentDb) val oldTableName = formatTableName(oldName.table) val newTableName = formatTableName(newName.table) - if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { + if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { externalCatalog.renameTable(db, oldTableName, newTableName) } else { - val table = tempTables.remove(oldTableName) + val table = tempTables(oldTableName) + tempTables.remove(oldTableName) tempTables.put(newTableName, table) } } @@ -206,7 +207,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { + if (name.database.isDefined || !tempTables.contains(table)) { externalCatalog.dropTable(db, table, ignoreIfNotExists) } else { tempTables.remove(table) @@ -224,11 +225,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { val db = name.database.getOrElse(currentDb) val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(table)) { + if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(table) + tempTables(table) } val qualifiedTable = SubqueryAlias(table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that @@ -247,7 +248,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { def tableExists(name: TableIdentifier): Boolean = { val db = name.database.getOrElse(currentDb) val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { + if (name.database.isDefined || !tempTables.contains(table)) { externalCatalog.tableExists(db, table) } else { true // it's a temporary table @@ -266,7 +267,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { val dbTables = externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) } val regex = pattern.replaceAll("\\*", ".*").r - val _tempTables = tempTables.keys().asScala + val _tempTables = tempTables.keys.toSeq .filter { t => regex.pattern.matcher(t).matches() } .map { t => TableIdentifier(t) } dbTables ++ _tempTables @@ -290,7 +291,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * For testing only. */ private[catalog] def getTempTable(name: String): Option[LogicalPlan] = { - Option(tempTables.get(name)) + tempTables.get(name) } // ---------------------------------------------------------------------------- @@ -399,9 +400,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * If no such database is specified, create it in the current database. */ def createFunction(funcDefinition: CatalogFunction): Unit = { - val db = funcDefinition.name.database.getOrElse(currentDb) + val db = funcDefinition.identifier.database.getOrElse(currentDb) val newFuncDefinition = funcDefinition.copy( - name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))) externalCatalog.createFunction(db, newFuncDefinition) } @@ -424,9 +425,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * this becomes a no-op. */ def alterFunction(funcDefinition: CatalogFunction): Unit = { - val db = funcDefinition.name.database.getOrElse(currentDb) + val db = funcDefinition.identifier.database.getOrElse(currentDb) val newFuncDefinition = funcDefinition.copy( - name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))) externalCatalog.alterFunction(db, newFuncDefinition) } @@ -439,10 +440,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * This assumes no database is specified in `funcDefinition`. */ def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { - require(funcDefinition.name.database.isEmpty, + require(funcDefinition.identifier.database.isEmpty, "attempted to create a temporary function while specifying a database") - val name = funcDefinition.name.funcName - if (tempFunctions.containsKey(name) && !ignoreIfExists) { + val name = funcDefinition.identifier.funcName + if (tempFunctions.contains(name) && !ignoreIfExists) { throw new AnalysisException(s"Temporary function '$name' already exists.") } tempFunctions.put(name, funcDefinition) @@ -455,7 +456,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate // dropFunction and dropTempFunction. def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = { - if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) { + if (!tempFunctions.contains(name) && !ignoreIfNotExists) { throw new AnalysisException( s"Temporary function '$name' cannot be dropped because it does not exist!") } @@ -476,11 +477,12 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { throw new AnalysisException("rename does not support moving functions across databases") } val db = oldName.database.getOrElse(currentDb) - if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) { + if (oldName.database.isDefined || !tempFunctions.contains(oldName.funcName)) { externalCatalog.renameFunction(db, oldName.funcName, newName.funcName) } else { - val func = tempFunctions.remove(oldName.funcName) - val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName)) + val func = tempFunctions(oldName.funcName) + val newFunc = func.copy(identifier = func.identifier.copy(funcName = newName.funcName)) + tempFunctions.remove(oldName.funcName) tempFunctions.put(newName.funcName, newFunc) } } @@ -494,10 +496,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def getFunction(name: FunctionIdentifier): CatalogFunction = { val db = name.database.getOrElse(currentDb) - if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) { + if (name.database.isDefined || !tempFunctions.contains(name.funcName)) { externalCatalog.getFunction(db, name.funcName) } else { - tempFunctions.get(name.funcName) + tempFunctions(name.funcName) } } @@ -510,7 +512,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { val dbFunctions = externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) } val regex = pattern.replaceAll("\\*", ".*").r - val _tempFunctions = tempFunctions.keys().asScala + val _tempFunctions = tempFunctions.keys.toSeq .filter { f => regex.pattern.matcher(f).matches() } .map { f => FunctionIdentifier(f) } dbFunctions ++ _tempFunctions @@ -520,7 +522,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { * Return a temporary function. For testing only. */ private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = { - Option(tempFunctions.get(name)) + tempFunctions.get(name) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 34803133f6..8bb8e09a28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -169,10 +169,10 @@ abstract class ExternalCatalog { /** * A function defined in the catalog. * - * @param name name of the function + * @param identifier name of the function * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" */ -case class CatalogFunction(name: FunctionIdentifier, className: String) +case class CatalogFunction(identifier: FunctionIdentifier, className: String) /** @@ -216,7 +216,7 @@ case class CatalogTablePartition( * future once we have a better understanding of how we want to handle skewed columns. */ case class CatalogTable( - name: TableIdentifier, + identifier: TableIdentifier, tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], @@ -230,12 +230,12 @@ case class CatalogTable( viewText: Option[String] = None) { /** Return the database this table was specified to belong to, assuming it exists. */ - def database: String = name.database.getOrElse { - throw new AnalysisException(s"table $name did not specify database") + def database: String = identifier.database.getOrElse { + throw new AnalysisException(s"table $identifier did not specify database") } /** Return the fully qualified name of this table, assuming the database was specified. */ - def qualifiedName: String = name.unquotedString + def qualifiedName: String = identifier.unquotedString /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( @@ -290,6 +290,6 @@ case class CatalogRelation( // TODO: implement this override def output: Seq[Attribute] = Seq.empty - require(metadata.name.database == Some(db), + require(metadata.identifier.database == Some(db), "provided database does not much the one specified in the table definition") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 6fa4beed99..34cb97699d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest { private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { val conf = new SimpleCatalystConf(caseSensitive) val catalog = new SessionCatalog(new InMemoryCatalog, conf) - catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) + catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true) new Analyzer(catalog, EmptyFunctionRegistry, conf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 31501864a8..6c08ccc34c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { private val b: Expression = UnresolvedAttribute("b") before { - catalog.createTempTable("table", relation, ignoreIfExists = true) + catalog.createTempTable("table", relation, overrideIfExists = true) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 277c2d717e..959bd564d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -210,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { } test("get table") { - assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1") + assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } test("get table when database/table does not exist") { @@ -452,7 +452,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { assert(catalog.getFunction("db2", "func1").className == funcClass) catalog.renameFunction("db2", "func1", newName) intercept[AnalysisException] { catalog.getFunction("db2", "func1") } - assert(catalog.getFunction("db2", newName).name.funcName == newName) + assert(catalog.getFunction("db2", newName).identifier.funcName == newName) assert(catalog.getFunction("db2", newName).className == funcClass) intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") } } @@ -549,7 +549,7 @@ abstract class CatalogTestUtils { def newTable(name: String, database: Option[String] = None): CatalogTable = { CatalogTable( - name = TableIdentifier(name, database), + identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL_TABLE, storage = storageFormat, schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 74e995cc5b..2948c5f8bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -197,17 +197,17 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable1 = Range(1, 10, 1, 10, Seq()) val tempTable2 = Range(1, 20, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) - catalog.createTempTable("tbl2", tempTable2, ignoreIfExists = false) + catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) + catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false) assert(catalog.getTempTable("tbl1") == Some(tempTable1)) assert(catalog.getTempTable("tbl2") == Some(tempTable2)) assert(catalog.getTempTable("tbl3") == None) // Temporary table already exists intercept[AnalysisException] { - catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) + catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) } // Temporary table already exists but we override it - catalog.createTempTable("tbl1", tempTable2, ignoreIfExists = true) + catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true) assert(catalog.getTempTable("tbl1") == Some(tempTable2)) } @@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable = Range(1, 10, 2, 10, Seq()) - sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) @@ -255,7 +255,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) // If database is specified, temp tables are never dropped - sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) @@ -299,7 +299,7 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable = Range(1, 10, 2, 10, Seq()) - sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) @@ -327,7 +327,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(newTbl1.properties.get("toh") == Some("frem")) // Alter table without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.alterTable(tbl1.copy(name = TableIdentifier("tbl1"))) + sessionCatalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1"))) val newestTbl1 = externalCatalog.getTable("db2", "tbl1") assert(newestTbl1 == tbl1) } @@ -368,7 +368,7 @@ class SessionCatalogSuite extends SparkFunSuite { val sessionCatalog = new SessionCatalog(externalCatalog) val tempTable1 = Range(1, 10, 1, 10, Seq()) val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") - sessionCatalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) + sessionCatalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) @@ -406,7 +406,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables val tempTable = Range(1, 10, 1, 10, Seq()) - catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) + catalog.createTempTable("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database catalog.setCurrentDatabase("db2") @@ -418,8 +418,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) - catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false) + catalog.createTempTable("tbl1", tempTable, overrideIfExists = false) + catalog.createTempTable("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl4"))) assert(catalog.listTables("db2").toSet == @@ -435,8 +435,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables with pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) - catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) - catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false) + catalog.createTempTable("tbl1", tempTable, overrideIfExists = false) + catalog.createTempTable("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet) assert(catalog.listTables("db2", "tbl*").toSet == @@ -826,7 +826,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.createFunction(newFunc("func1", Some("db2"))) sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4")) assert(sessionCatalog.getTempFunction("func4") == - Some(tempFunc.copy(name = FunctionIdentifier("func4")))) + Some(tempFunc.copy(identifier = FunctionIdentifier("func4")))) assert(sessionCatalog.getTempFunction("func1") == None) assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3")) // Then, if no such temporary function exist, rename the function in the current database diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e413e77bc1..c94600925f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -671,7 +671,7 @@ class SQLContext private[sql]( sessionState.catalog.createTempTable( sessionState.sqlParser.parseTableIdentifier(tableName).table, df.logicalPlan, - ignoreIfExists = true) + overrideIfExists = true) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 24923bbb10..877e159fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -107,7 +107,7 @@ case class CreateTempTableUsing( sqlContext.sessionState.catalog.createTempTable( tableIdent.table, Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan, - ignoreIfExists = true) + overrideIfExists = true) Seq.empty[Row] } @@ -138,7 +138,7 @@ case class CreateTempTableUsingAsSelect( sqlContext.sessionState.catalog.createTempTable( tableIdent.table, Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan, - ignoreIfExists = true) + overrideIfExists = true) Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala deleted file mode 100644 index 0722fb02a8..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import scala.util.control.NonFatal - -import org.apache.hadoop.hive.ql.metadata.HiveException -import org.apache.thrift.TException - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchItemException -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.hive.client.HiveClient - - -/** - * A persistent implementation of the system catalog using Hive. - * All public methods must be synchronized for thread-safety. - */ -private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog with Logging { - import ExternalCatalog._ - - // Exceptions thrown by the hive client that we would like to wrap - private val clientExceptions = Set( - classOf[HiveException].getCanonicalName, - classOf[TException].getCanonicalName) - - /** - * Whether this is an exception thrown by the hive client that should be wrapped. - * - * Due to classloader isolation issues, pattern matching won't work here so we need - * to compare the canonical names of the exceptions, which we assume to be stable. - */ - private def isClientException(e: Throwable): Boolean = { - var temp: Class[_] = e.getClass - var found = false - while (temp != null && !found) { - found = clientExceptions.contains(temp.getCanonicalName) - temp = temp.getSuperclass - } - found - } - - /** - * Run some code involving `client` in a [[synchronized]] block and wrap certain - * exceptions thrown in the process in [[AnalysisException]]. - */ - private def withClient[T](body: => T): T = synchronized { - try { - body - } catch { - case e: NoSuchItemException => - throw new AnalysisException(e.getMessage) - case NonFatal(e) if isClientException(e) => - throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) - } - } - - private def requireDbMatches(db: String, table: CatalogTable): Unit = { - if (table.name.database != Some(db)) { - throw new AnalysisException( - s"Provided database $db does not much the one specified in the " + - s"table definition (${table.name.database.getOrElse("n/a")})") - } - } - - private def requireTableExists(db: String, table: String): Unit = { - withClient { getTable(db, table) } - } - - // -------------------------------------------------------------------------- - // Databases - // -------------------------------------------------------------------------- - - override def createDatabase( - dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { - client.createDatabase(dbDefinition, ignoreIfExists) - } - - override def dropDatabase( - db: String, - ignoreIfNotExists: Boolean, - cascade: Boolean): Unit = withClient { - client.dropDatabase(db, ignoreIfNotExists, cascade) - } - - /** - * Alter a database whose name matches the one specified in `dbDefinition`, - * assuming the database exists. - * - * Note: As of now, this only supports altering database properties! - */ - override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { - val existingDb = getDatabase(dbDefinition.name) - if (existingDb.properties == dbDefinition.properties) { - logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + - s"the provided database properties are the same as the old ones. Hive does not " + - s"currently support altering other database fields.") - } - client.alterDatabase(dbDefinition) - } - - override def getDatabase(db: String): CatalogDatabase = withClient { - client.getDatabase(db) - } - - override def databaseExists(db: String): Boolean = withClient { - client.getDatabaseOption(db).isDefined - } - - override def listDatabases(): Seq[String] = withClient { - client.listDatabases("*") - } - - override def listDatabases(pattern: String): Seq[String] = withClient { - client.listDatabases(pattern) - } - - override def setCurrentDatabase(db: String): Unit = withClient { - client.setCurrentDatabase(db) - } - - // -------------------------------------------------------------------------- - // Tables - // -------------------------------------------------------------------------- - - override def createTable( - db: String, - tableDefinition: CatalogTable, - ignoreIfExists: Boolean): Unit = withClient { - requireDbExists(db) - requireDbMatches(db, tableDefinition) - client.createTable(tableDefinition, ignoreIfExists) - } - - override def dropTable( - db: String, - table: String, - ignoreIfNotExists: Boolean): Unit = withClient { - requireDbExists(db) - client.dropTable(db, table, ignoreIfNotExists) - } - - override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { - val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db))) - client.alterTable(oldName, newTable) - } - - /** - * Alter a table whose name that matches the one specified in `tableDefinition`, - * assuming the table exists. - * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! - */ - override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient { - requireDbMatches(db, tableDefinition) - requireTableExists(db, tableDefinition.name.table) - client.alterTable(tableDefinition) - } - - override def getTable(db: String, table: String): CatalogTable = withClient { - client.getTable(db, table) - } - - override def tableExists(db: String, table: String): Boolean = withClient { - client.getTableOption(db, table).isDefined - } - - override def listTables(db: String): Seq[String] = withClient { - requireDbExists(db) - client.listTables(db) - } - - override def listTables(db: String, pattern: String): Seq[String] = withClient { - requireDbExists(db) - client.listTables(db, pattern) - } - - // -------------------------------------------------------------------------- - // Partitions - // -------------------------------------------------------------------------- - - override def createPartitions( - db: String, - table: String, - parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit = withClient { - requireTableExists(db, table) - client.createPartitions(db, table, parts, ignoreIfExists) - } - - override def dropPartitions( - db: String, - table: String, - parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = withClient { - requireTableExists(db, table) - // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we - // need to implement it here ourselves. This is currently somewhat expensive because - // we make multiple synchronous calls to Hive for each partition we want to drop. - val partsToDrop = - if (ignoreIfNotExists) { - parts.filter { spec => - try { - getPartition(db, table, spec) - true - } catch { - // Filter out the partitions that do not actually exist - case _: AnalysisException => false - } - } - } else { - parts - } - if (partsToDrop.nonEmpty) { - client.dropPartitions(db, table, partsToDrop) - } - } - - override def renamePartitions( - db: String, - table: String, - specs: Seq[TablePartitionSpec], - newSpecs: Seq[TablePartitionSpec]): Unit = withClient { - client.renamePartitions(db, table, specs, newSpecs) - } - - override def alterPartitions( - db: String, - table: String, - newParts: Seq[CatalogTablePartition]): Unit = withClient { - client.alterPartitions(db, table, newParts) - } - - override def getPartition( - db: String, - table: String, - spec: TablePartitionSpec): CatalogTablePartition = withClient { - client.getPartition(db, table, spec) - } - - override def listPartitions( - db: String, - table: String): Seq[CatalogTablePartition] = withClient { - client.getAllPartitions(db, table) - } - - // -------------------------------------------------------------------------- - // Functions - // -------------------------------------------------------------------------- - - override def createFunction( - db: String, - funcDefinition: CatalogFunction): Unit = withClient { - client.createFunction(db, funcDefinition) - } - - override def dropFunction(db: String, name: String): Unit = withClient { - client.dropFunction(db, name) - } - - override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient { - client.renameFunction(db, oldName, newName) - } - - override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient { - client.alterFunction(db, funcDefinition) - } - - override def getFunction(db: String, funcName: String): CatalogFunction = withClient { - client.getFunction(db, funcName) - } - - override def listFunctions(db: String, pattern: String): Seq[String] = withClient { - client.listFunctions(db, pattern) - } - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ca3ce43591..c0b6d16d3c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -86,7 +86,7 @@ class HiveContext private[hive]( @transient private[hive] val executionHive: HiveClientImpl, @transient private[hive] val metadataHive: HiveClient, isRootContext: Boolean, - @transient private[sql] val hiveCatalog: HiveCatalog) + @transient private[sql] val hiveCatalog: HiveExternalCatalog) extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging { self => @@ -98,7 +98,7 @@ class HiveContext private[hive]( execHive, metaHive, true, - new HiveCatalog(metaHive)) + new HiveExternalCatalog(metaHive)) } def this(sc: SparkContext) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala new file mode 100644 index 0000000000..f75509fe80 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.util.control.NonFatal + +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.thrift.TException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchItemException +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.HiveClient + + +/** + * A persistent implementation of the system catalog using Hive. + * All public methods must be synchronized for thread-safety. + */ +private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging { + import ExternalCatalog._ + + // Exceptions thrown by the hive client that we would like to wrap + private val clientExceptions = Set( + classOf[HiveException].getCanonicalName, + classOf[TException].getCanonicalName) + + /** + * Whether this is an exception thrown by the hive client that should be wrapped. + * + * Due to classloader isolation issues, pattern matching won't work here so we need + * to compare the canonical names of the exceptions, which we assume to be stable. + */ + private def isClientException(e: Throwable): Boolean = { + var temp: Class[_] = e.getClass + var found = false + while (temp != null && !found) { + found = clientExceptions.contains(temp.getCanonicalName) + temp = temp.getSuperclass + } + found + } + + /** + * Run some code involving `client` in a [[synchronized]] block and wrap certain + * exceptions thrown in the process in [[AnalysisException]]. + */ + private def withClient[T](body: => T): T = synchronized { + try { + body + } catch { + case e: NoSuchItemException => + throw new AnalysisException(e.getMessage) + case NonFatal(e) if isClientException(e) => + throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) + } + } + + private def requireDbMatches(db: String, table: CatalogTable): Unit = { + if (table.identifier.database != Some(db)) { + throw new AnalysisException( + s"Provided database $db does not much the one specified in the " + + s"table definition (${table.identifier.database.getOrElse("n/a")})") + } + } + + private def requireTableExists(db: String, table: String): Unit = { + withClient { getTable(db, table) } + } + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + override def createDatabase( + dbDefinition: CatalogDatabase, + ignoreIfExists: Boolean): Unit = withClient { + client.createDatabase(dbDefinition, ignoreIfExists) + } + + override def dropDatabase( + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = withClient { + client.dropDatabase(db, ignoreIfNotExists, cascade) + } + + /** + * Alter a database whose name matches the one specified in `dbDefinition`, + * assuming the database exists. + * + * Note: As of now, this only supports altering database properties! + */ + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { + val existingDb = getDatabase(dbDefinition.name) + if (existingDb.properties == dbDefinition.properties) { + logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + + s"the provided database properties are the same as the old ones. Hive does not " + + s"currently support altering other database fields.") + } + client.alterDatabase(dbDefinition) + } + + override def getDatabase(db: String): CatalogDatabase = withClient { + client.getDatabase(db) + } + + override def databaseExists(db: String): Boolean = withClient { + client.getDatabaseOption(db).isDefined + } + + override def listDatabases(): Seq[String] = withClient { + client.listDatabases("*") + } + + override def listDatabases(pattern: String): Seq[String] = withClient { + client.listDatabases(pattern) + } + + override def setCurrentDatabase(db: String): Unit = withClient { + client.setCurrentDatabase(db) + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + override def createTable( + db: String, + tableDefinition: CatalogTable, + ignoreIfExists: Boolean): Unit = withClient { + requireDbExists(db) + requireDbMatches(db, tableDefinition) + client.createTable(tableDefinition, ignoreIfExists) + } + + override def dropTable( + db: String, + table: String, + ignoreIfNotExists: Boolean): Unit = withClient { + requireDbExists(db) + client.dropTable(db, table, ignoreIfNotExists) + } + + override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { + val newTable = client.getTable(db, oldName) + .copy(identifier = TableIdentifier(newName, Some(db))) + client.alterTable(oldName, newTable) + } + + /** + * Alter a table whose name that matches the one specified in `tableDefinition`, + * assuming the table exists. + * + * Note: As of now, this only supports altering table properties, serde properties, + * and num buckets! + */ + override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient { + requireDbMatches(db, tableDefinition) + requireTableExists(db, tableDefinition.identifier.table) + client.alterTable(tableDefinition) + } + + override def getTable(db: String, table: String): CatalogTable = withClient { + client.getTable(db, table) + } + + override def tableExists(db: String, table: String): Boolean = withClient { + client.getTableOption(db, table).isDefined + } + + override def listTables(db: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db) + } + + override def listTables(db: String, pattern: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db, pattern) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + override def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = withClient { + requireTableExists(db, table) + client.createPartitions(db, table, parts, ignoreIfExists) + } + + override def dropPartitions( + db: String, + table: String, + parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit = withClient { + requireTableExists(db, table) + // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we + // need to implement it here ourselves. This is currently somewhat expensive because + // we make multiple synchronous calls to Hive for each partition we want to drop. + val partsToDrop = + if (ignoreIfNotExists) { + parts.filter { spec => + try { + getPartition(db, table, spec) + true + } catch { + // Filter out the partitions that do not actually exist + case _: AnalysisException => false + } + } + } else { + parts + } + if (partsToDrop.nonEmpty) { + client.dropPartitions(db, table, partsToDrop) + } + } + + override def renamePartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = withClient { + client.renamePartitions(db, table, specs, newSpecs) + } + + override def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit = withClient { + client.alterPartitions(db, table, newParts) + } + + override def getPartition( + db: String, + table: String, + spec: TablePartitionSpec): CatalogTablePartition = withClient { + client.getPartition(db, table, spec) + } + + override def listPartitions( + db: String, + table: String): Seq[CatalogTablePartition] = withClient { + client.getAllPartitions(db, table) + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + override def createFunction( + db: String, + funcDefinition: CatalogFunction): Unit = withClient { + client.createFunction(db, funcDefinition) + } + + override def dropFunction(db: String, name: String): Unit = withClient { + client.dropFunction(db, name) + } + + override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient { + client.renameFunction(db, oldName, newName) + } + + override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient { + client.alterFunction(db, funcDefinition) + } + + override def getFunction(db: String, funcName: String): CatalogFunction = withClient { + client.getFunction(db, funcName) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = withClient { + client.listFunctions(db, pattern) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7066d7363..eedd12d76a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -102,7 +102,7 @@ private[hive] object HiveSerDe { * Legacy catalog for interacting with the Hive metastore. * * This is still used for things like creating data source tables, but in the future will be - * cleaned up to integrate more nicely with [[HiveCatalog]]. + * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) extends Logging { @@ -124,8 +124,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - t.name.database.getOrElse(getCurrentDatabase).toLowerCase, - t.name.table.toLowerCase) + t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase, + t.identifier.table.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -299,7 +299,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte def newSparkSQLSpecificMetastoreTable(): CatalogTable = { CatalogTable( - name = TableIdentifier(tblName, Option(dbName)), + identifier = TableIdentifier(tblName, Option(dbName)), tableType = tableType, schema = Nil, storage = CatalogStorageFormat( @@ -319,7 +319,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte assert(relation.partitionSchema.isEmpty) CatalogTable( - name = TableIdentifier(tblName, Option(dbName)), + identifier = TableIdentifier(tblName, Option(dbName)), tableType = tableType, storage = CatalogStorageFormat( locationUri = Some(relation.location.paths.map(_.toUri.toString).head), @@ -431,7 +431,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte alias match { // because hive use things like `_c0` to build the expanded text // currently we cannot support view from "create view v1(c1) as ..." - case None => SubqueryAlias(table.name.table, hive.parseSql(viewText)) + case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText)) case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) } } else { @@ -611,7 +611,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( - table.copy(name = TableIdentifier(tblName, Some(dbName))), + table.copy(identifier = TableIdentifier(tblName, Some(dbName))), child, allowExisting, replace) @@ -633,7 +633,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte if (hive.convertCTAS && table.storage.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). - if (table.name.database.isDefined) { + if (table.identifier.database.isDefined) { throw new AnalysisException( "Cannot specify database name in a CTAS statement " + "when spark.sql.hive.convertCTAS is set to true.") @@ -641,7 +641,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( - TableIdentifier(desc.name.table), + TableIdentifier(desc.identifier.table), conf.defaultDataSourceName, temporary = false, Array.empty[String], @@ -662,7 +662,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( - desc.copy(name = TableIdentifier(tblName, Some(dbName))), + desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), child, allowExisting) } @@ -792,7 +792,7 @@ private[hive] case class MetastoreRelation( // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.name.table) + tTable.setTableName(table.identifier.table) tTable.setDbName(table.database) val tableParameters = new java.util.HashMap[String, String]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index e5bcb9b1db..b3ec95fc73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect( override def output: Seq[Attribute] = Seq.empty[Attribute] override lazy val resolved: Boolean = - tableDesc.name.database.isDefined && + tableDesc.identifier.database.isDefined && tableDesc.schema.nonEmpty && tableDesc.storage.serde.isDefined && tableDesc.storage.inputFormat.isDefined && @@ -183,7 +183,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val tableIdentifier = extractTableIdent(viewNameParts) val originalText = query.source val tableDesc = CatalogTable( - name = tableIdentifier, + identifier = tableIdentifier, tableType = CatalogTableType.VIRTUAL_VIEW, schema = schema, storage = CatalogStorageFormat( @@ -352,7 +352,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging // TODO add bucket support var tableDesc: CatalogTable = CatalogTable( - name = tableIdentifier, + identifier = tableIdentifier, tableType = if (externalTable.isDefined) { CatalogTableType.EXTERNAL_TABLE diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index aa44cba4b5..ec7bf61be1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType class HiveSessionCatalog( - externalCatalog: HiveCatalog, + externalCatalog: HiveExternalCatalog, client: HiveClient, context: HiveContext, conf: SQLConf) @@ -41,11 +41,11 @@ class HiveSessionCatalog( override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { + if (name.database.isDefined || !tempTables.contains(table)) { val newName = name.copy(table = table) metastoreCatalog.lookupRelation(newName, alias) } else { - val relation = tempTables.get(table) + val relation = tempTables(table) val tableWithQualifiers = SubqueryAlias(table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f4d30358ca..ee56f9d75d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -88,7 +88,7 @@ private[hive] trait HiveClient { def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit /** Alter a table whose name matches the one specified in `table`, assuming it exists. */ - final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table) + final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table) /** Updates the given table with new metadata, optionally renaming the table. */ def alterTable(tableName: String, table: CatalogTable): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e4e15d13df..a31178e347 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -298,7 +298,7 @@ private[hive] class HiveClientImpl( logDebug(s"Looking up $dbName.$tableName") Option(client.getTable(dbName, tableName, false)).map { h => CatalogTable( - name = TableIdentifier(h.getTableName, Option(h.getDbName)), + identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE @@ -544,13 +544,14 @@ private[hive] class HiveClientImpl( } override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState { - val catalogFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db))) + val catalogFunc = getFunction(db, oldName) + .copy(identifier = FunctionIdentifier(newName, Some(db))) val hiveFunc = toHiveFunction(catalogFunc, db) client.alterFunction(db, oldName, hiveFunc) } override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState { - client.alterFunction(db, func.name.funcName, toHiveFunction(func, db)) + client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db)) } override def getFunctionOption( @@ -611,7 +612,7 @@ private[hive] class HiveClientImpl( private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { new HiveFunction( - f.name.funcName, + f.identifier.funcName, db, f.className, null, @@ -639,7 +640,7 @@ private[hive] class HiveClientImpl( } private def toHiveTable(table: CatalogTable): HiveTable = { - val hiveTable = new HiveTable(table.database, table.name.table) + val hiveTable = new HiveTable(table.database, table.identifier.table) hiveTable.setTableType(table.tableType match { case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 5a61eef0f2..29f7dc2997 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -38,7 +38,7 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends RunnableCommand { - private val tableIdentifier = tableDesc.name + private val tableIdentifier = tableDesc.identifier override def children: Seq[LogicalPlan] = Seq(query) @@ -93,6 +93,8 @@ case class CreateTableAsSelect( } override def argString: String = { - s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]" + s"[Database:${tableDesc.database}}, " + + s"TableName: ${tableDesc.identifier.table}, " + + s"InsertIntoHiveTable]" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 9ff520da1d..33cd8b4480 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect( assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) assert(tableDesc.viewText.isDefined) - private val tableIdentifier = tableDesc.name + private val tableIdentifier = tableDesc.identifier override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] @@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect( } val viewText = tableDesc.viewText.get - val viewName = quote(tableDesc.name.table) + val viewName = quote(tableDesc.identifier.table) s"SELECT $viewOutput FROM ($viewText) $viewName" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a1785ca038..4afc8d18a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -78,7 +78,7 @@ class TestHiveContext private[hive]( executionHive: HiveClientImpl, metadataHive: HiveClient, isRootContext: Boolean, - hiveCatalog: HiveCatalog, + hiveCatalog: HiveExternalCatalog, val warehousePath: File, val scratchDirPath: File, metastoreTemporaryConf: Map[String, String]) @@ -110,7 +110,7 @@ class TestHiveContext private[hive]( executionHive, metadataHive, true, - new HiveCatalog(metadataHive), + new HiveExternalCatalog(metadataHive), warehousePath, scratchDirPath, metastoreTemporaryConf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala deleted file mode 100644 index 427f5747a0..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.util.VersionInfo - -import org.apache.spark.SparkConf -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} -import org.apache.spark.util.Utils - -/** - * Test suite for the [[HiveCatalog]]. - */ -class HiveCatalogSuite extends CatalogTestCases { - - private val client: HiveClient = { - IsolatedClientLoader.forVersion( - hiveMetastoreVersion = HiveContext.hiveExecutionVersion, - hadoopVersion = VersionInfo.getVersion, - sparkConf = new SparkConf(), - hadoopConf = new Configuration()).createClient() - } - - protected override val utils: CatalogTestUtils = new CatalogTestUtils { - override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" - override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" - override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client) - } - - protected override def resetState(): Unit = client.reset() - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala new file mode 100644 index 0000000000..3334c16f0b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} +import org.apache.spark.util.Utils + +/** + * Test suite for the [[HiveExternalCatalog]]. + */ +class HiveExternalCatalogSuite extends CatalogTestCases { + + private val client: HiveClient = { + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = new SparkConf(), + hadoopConf = new Configuration()).createClient() + } + + protected override val utils: CatalogTestUtils = new CatalogTestUtils { + override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" + override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" + override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client) + } + + protected override def resetState(): Unit = client.reset() + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 1c775db9b6..0aaf57649c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val (desc, exists) = extractTableDesc(s1) assert(exists) - assert(desc.name.database == Some("mydb")) - assert(desc.name.table == "page_view") + assert(desc.identifier.database == Some("mydb")) + assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == @@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val (desc, exists) = extractTableDesc(s2) assert(exists) - assert(desc.name.database == Some("mydb")) - assert(desc.name.table == "page_view") + assert(desc.identifier.database == Some("mydb")) + assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == @@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" val (desc, exists) = extractTableDesc(s3) assert(exists == false) - assert(desc.name.database == None) - assert(desc.name.table == "page_view") + assert(desc.identifier.database == None) + assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.MANAGED_TABLE) assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) @@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { | ORDER BY key, value""".stripMargin val (desc, exists) = extractTableDesc(s5) assert(exists == false) - assert(desc.name.database == None) - assert(desc.name.table == "ctas2") + assert(desc.identifier.database == None) + assert(desc.identifier.table == "ctas2") assert(desc.tableType == CatalogTableType.MANAGED_TABLE) assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 5272f4192e..e8188e5f02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -34,7 +34,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft super.beforeAll() // The catalog in HiveContext is a case insensitive one. sessionState.catalog.createTempTable( - "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true) + "ListTablesSuiteTable", df.logicalPlan, overrideIfExists = true) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 71652897e6..3c299daa77 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -722,7 +722,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { val schema = StructType(StructField("int", IntegerType, true) :: Nil) val hiveTable = CatalogTable( - name = TableIdentifier(tableName, Some("default")), + identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED_TABLE, schema = Seq.empty, storage = CatalogStorageFormat( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d59bca4c7e..8b0719209d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -148,7 +148,7 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: createTable") { val table = CatalogTable( - name = TableIdentifier("src", Some("default")), + identifier = TableIdentifier("src", Some("default")), tableType = CatalogTableType.MANAGED_TABLE, schema = Seq(CatalogColumn("key", "int")), storage = CatalogStorageFormat( -- cgit v1.2.3