diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-10-14 16:05:37 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-14 16:05:37 -0700 |
commit | 56d7da14ab8f89bf4f303b27f51fd22d23967ffb (patch) | |
tree | d21fc4a4d5a197788a8b8e165d89d3e95bff13be /sql/hive | |
parent | 9a430a027faafb083ca569698effb697af26a1db (diff) | |
download | spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.gz spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.bz2 spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.zip |
[SPARK-10104] [SQL] Consolidate different forms of table identifiers
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes #8453 from cloud-fan/table-name.
Diffstat (limited to 'sql/hive')
13 files changed, 86 insertions, 161 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e620d7fb82..4d8a3f728e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -358,7 +358,7 @@ class HiveContext private[hive]( @Experimental def analyze(tableName: String) { val tableIdent = SqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) + val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1f8223e1ff..5819cb9d08 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.execution.{FileRelation, datasources} @@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) - // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) { - def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase) + case class QualifiedTableName(database: String, name: String) + + private def getQualifiedTableName(tableIdent: TableIdentifier) = { + QualifiedTableName( + tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, + tableIdent.table.toLowerCase) + } + + private def getQualifiedTableName(hiveTable: HiveTable) = { + QualifiedTableName( + hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, + hiveTable.name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } def invalidateTable(tableIdent: TableIdentifier): Unit = { - val databaseName = tableIdent.database.getOrElse(client.currentDatabase) - val tableName = tableIdent.table - - cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) - } - - val caseSensitive: Boolean = false - - /** - * Creates a data source table (a table created with USING clause) in Hive's metastore. - * Returns true when the table has been created. Otherwise, false. - */ - // TODO: Remove this in SPARK-10104. - def createDataSourceTable( - tableName: String, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - createDataSourceTable( - SqlParser.parseTableIdentifier(tableName), - userSpecifiedSchema, - partitionColumns, - provider, - options, - isExternal) + cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } def createDataSourceTable( @@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive provider: String, options: Map[String, String], isExternal: Boolean): Unit = { - val (dbName, tblName) = { - val database = tableIdent.database.getOrElse(client.currentDatabase) - processDatabaseAndTableName(database, tableIdent.table) - } + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) val tableProperties = new mutable.HashMap[String, String] tableProperties.put("spark.sql.sources.provider", provider) @@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // TODO: Support persisting partitioned data source relations in Hive compatible format val qualifiedTableName = tableIdent.quotedString - val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match { + val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match { case (Some(serde), relation: HadoopFsRelation) if relation.paths.length == 1 && relation.partitionColumns.isEmpty => val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) @@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive (None, message) } - (hiveCompitiableTable, logMessage) match { + (hiveCompatibleTable, logMessage) match { case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatiable way. + // We first try to save the metadata of the table in a Hive compatible way. // If Hive throws an error, we fall back to save its metadata in the Spark SQL // specific way. try { @@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } - def hiveDefaultTableFilePath(tableName: String): String = { - hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName)) - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val database = tableIdent.database.getOrElse(client.currentDatabase) - - new Path( - new Path(client.getDatabase(database).location), - tableIdent.table.toLowerCase).toString + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + new Path(new Path(client.getDatabase(dbName).location), tblName).toString } - def tableExists(tableIdentifier: Seq[String]): Boolean = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = - tableIdent - .lift(tableIdent.size - 2) - .getOrElse(client.currentDatabase) - val tblName = tableIdent.last - client.getTableOption(databaseName, tblName).isDefined + override def tableExists(tableIdent: TableIdentifier): Boolean = { + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + client.getTableOption(dbName, tblName).isDefined } - def lookupRelation( - tableIdentifier: Seq[String], + override def lookupRelation( + tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - client.currentDatabase) - val tblName = tableIdent.last - val table = client.getTable(databaseName, tblName) + val qualifiedTableName = getQualifiedTableName(tableIdent) + val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) if (table.properties.get("spark.sql.sources.provider").isDefined) { - val dataSourceTable = - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + val dataSourceTable = cachedDataSourceTables(qualifiedTableName) + val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - val withAlias = - alias.map(a => Subquery(a, dataSourceTable)).getOrElse( - Subquery(tableIdent.last, dataSourceTable)) - - withAlias + alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) } else if (table.tableType == VirtualView) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { @@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText)) } } else { - MetastoreRelation(databaseName, tblName, alias)(table)(hive) + MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive) } } @@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive client.listTables(db).map(tableName => (tableName, false)) } - protected def processDatabaseAndTableName( - databaseName: Option[String], - tableName: String): (Option[String], String) = { - if (!caseSensitive) { - (databaseName.map(_.toLowerCase), tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - - protected def processDatabaseAndTableName( - databaseName: String, - tableName: String): (String, String) = { - if (!caseSensitive) { - (databaseName.toLowerCase, tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") } - val (dbName, tblName) = processDatabaseAndTableName( - table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( table.copy( @@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( TableIdentifier(desc.name), - hive.conf.defaultDataSourceName, + conf.defaultDataSourceName, temporary = false, Array.empty[String], mode, @@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive table } - val (dbName, tblName) = - processDatabaseAndTableName( - desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( desc.copy( @@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } @@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + override def unregisterTable(tableIdent: TableIdentifier): Unit = { throw new UnsupportedOperationException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 1d50501940..d4ff5cc0f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{logical, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.HiveShim._ @@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging { throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ") } - protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = { - val (db, tableName) = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) - } - - (db, tableName) - } - - protected def extractTableIdent(tableNameParts: Node): Seq[String] = { + protected def extractTableIdent(tableNameParts: Node): TableIdentifier = { tableNameParts.getChildren.asScala.map { case Token(part, Nil) => cleanIdentifier(part) } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) + case Seq(tableOnly) => TableIdentifier(tableOnly) + case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName)) case other => sys.error("Hive only supports tables names like 'tableName' " + s"or 'databaseName.tableName', found '$other'") } @@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { - val (db, viewName) = extractDbNameTableName(viewNameParts) + val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts) val originalText = context.getTokenRewriteStream .toString(query.getTokenStartIndex, query.getTokenStopIndex) val tableDesc = HiveTable( - specifiedDatabase = db, + specifiedDatabase = dbName, name = viewName, schema = schema, partitionColumns = Seq.empty[HiveColumn], @@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined) + UnresolvedRelation(TableIdentifier(tableName.getText), None), + isExtended = extended.isDefined) } } // All other cases. @@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLELOCATION", "TOK_TABLEPROPERTIES"), children) - val (db, tableName) = extractDbNameTableName(tableNameParts) + val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) // TODO add bucket support var tableDesc: HiveTable = HiveTable( - specifiedDatabase = db, - name = tableName, + specifiedDatabase = dbName, + name = tblName, schema = Seq.empty[HiveColumn], partitionColumns = Seq.empty[HiveColumn], properties = Map[String, String](), @@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nonAliasClauses) } - val tableIdent = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) - case other => sys.error("Hive only supports tables names like 'tableName' " + - s"or 'databaseName.tableName', found '$other'") - } + val tableIdent = extractTableIdent(tableNameParts) val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } val relation = UnresolvedRelation(tableIdent, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 8422287e17..e72a60b42e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} @@ -37,8 +38,7 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends RunnableCommand { - def database: String = tableDesc.database - def tableName: String = tableDesc.name + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) override def children: Seq[LogicalPlan] = Seq(query) @@ -72,18 +72,18 @@ case class CreateTableAsSelect( hiveContext.catalog.client.createTable(withSchema) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { + hiveContext.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(Seq(database, tableName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { - throw new AnalysisException(s"$database.$tableName already exists.") + throw new AnalysisException(s"$tableIdentifier already exists.") } } else { hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd @@ -93,6 +93,6 @@ case class CreateTableAsSelect( } override def argString: String = { - s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]" + s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 2b504ac974..2c81115ee4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} import org.apache.spark.sql.{AnalysisException, Row, SQLContext} @@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect( assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) assert(tableDesc.viewText.isDefined) + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - val database = tableDesc.database - val viewName = tableDesc.name - if (hiveContext.catalog.tableExists(Seq(database, viewName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // view already exists, will do nothing, to keep consistent with Hive } else if (orReplace) { hiveContext.catalog.client.alertView(prepareTable()) } else { - throw new AnalysisException(s"View $database.$viewName already exists. " + + throw new AnalysisException(s"View $tableIdentifier already exists. " + "If you want to update the view definition, please use ALTER VIEW AS or " + "CREATE OR REPLACE VIEW AS") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 51ec92afd0..94210a5394 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(Seq(tableName)) + hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand { } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, @@ -131,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.catalog.tableExists(tableIdent.toSeq)) { + if (hiveContext.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -160,7 +159,6 @@ case class CreateMetastoreDataSource( } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSourceAsSelect( tableIdent: TableIdentifier, @@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect( } var existingSchema = None: Option[StructType] - if (sqlContext.catalog.tableExists(tableIdent.toSeq)) { + if (sqlContext.catalog.tableExists(tableIdent)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect( val resolved = ResolvedDataSource( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) - EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match { + EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) => if (l.relation != createdRelation.relation) { val errorDescription = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ff39ccb7c1..6883d305cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last } + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index c8d272794d..8c4af1b8ea 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.SaveMode; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.util.Utils; public class JavaMetastoreDataSourcesSuite { @@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable")); + hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 579631df77..183aca29cf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row @@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. - catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan) + catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - catalog.unregisterTable(Seq("ListTablesSuiteTable")) + catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d356538000..d292887688 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.util.Utils /** @@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[IOException] { - read.json(catalog.hiveDefaultTableFilePath("savedJsonTable")) + read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Manually create a metastore data source table. catalog.createDataSourceTable( - tableName = "wide_schema", + tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], provider = "json", @@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "EXTERNAL" -> "FALSE"), tableType = ManagedTable, serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(tableName))) + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))) catalog.client.createTable(hiveTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 6a692d6fce..9bb32f11b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag import org.apache.spark.sql.{Row, SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes + hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.catalog.unregisterTable(Seq("tempTable")) + hiveContext.catalog.unregisterTable(TableIdentifier("tempTable")) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6aa34605b0..c929ba5068 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.DefaultParserDialect +import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries} import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("CTAS without serde") { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { - val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) + val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName))) relation match { case LogicalRelation(r: ParquetRelation, _) => if (!isDataSourceParquet) { @@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { (1 to 100).par.map { i => val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - catalog.lookupRelation(Seq(tableName)) + catalog.lookupRelation(TableIdentifier(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 5eb39b1129..7efeab528c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { |