diff options
Diffstat (limited to 'sql/hive')
13 files changed, 86 insertions, 161 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e620d7fb82..4d8a3f728e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -358,7 +358,7 @@ class HiveContext private[hive]( @Experimental def analyze(tableName: String) { val tableIdent = SqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) + val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1f8223e1ff..5819cb9d08 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.execution.{FileRelation, datasources} @@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) - // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) { - def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase) + case class QualifiedTableName(database: String, name: String) + + private def getQualifiedTableName(tableIdent: TableIdentifier) = { + QualifiedTableName( + tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, + tableIdent.table.toLowerCase) + } + + private def getQualifiedTableName(hiveTable: HiveTable) = { + QualifiedTableName( + hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, + hiveTable.name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } def invalidateTable(tableIdent: TableIdentifier): Unit = { - val databaseName = tableIdent.database.getOrElse(client.currentDatabase) - val tableName = tableIdent.table - - cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) - } - - val caseSensitive: Boolean = false - - /** - * Creates a data source table (a table created with USING clause) in Hive's metastore. - * Returns true when the table has been created. Otherwise, false. - */ - // TODO: Remove this in SPARK-10104. - def createDataSourceTable( - tableName: String, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - createDataSourceTable( - SqlParser.parseTableIdentifier(tableName), - userSpecifiedSchema, - partitionColumns, - provider, - options, - isExternal) + cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } def createDataSourceTable( @@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive provider: String, options: Map[String, String], isExternal: Boolean): Unit = { - val (dbName, tblName) = { - val database = tableIdent.database.getOrElse(client.currentDatabase) - processDatabaseAndTableName(database, tableIdent.table) - } + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) val tableProperties = new mutable.HashMap[String, String] tableProperties.put("spark.sql.sources.provider", provider) @@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // TODO: Support persisting partitioned data source relations in Hive compatible format val qualifiedTableName = tableIdent.quotedString - val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match { + val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match { case (Some(serde), relation: HadoopFsRelation) if relation.paths.length == 1 && relation.partitionColumns.isEmpty => val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) @@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive (None, message) } - (hiveCompitiableTable, logMessage) match { + (hiveCompatibleTable, logMessage) match { case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatiable way. + // We first try to save the metadata of the table in a Hive compatible way. // If Hive throws an error, we fall back to save its metadata in the Spark SQL // specific way. try { @@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } - def hiveDefaultTableFilePath(tableName: String): String = { - hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName)) - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val database = tableIdent.database.getOrElse(client.currentDatabase) - - new Path( - new Path(client.getDatabase(database).location), - tableIdent.table.toLowerCase).toString + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + new Path(new Path(client.getDatabase(dbName).location), tblName).toString } - def tableExists(tableIdentifier: Seq[String]): Boolean = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = - tableIdent - .lift(tableIdent.size - 2) - .getOrElse(client.currentDatabase) - val tblName = tableIdent.last - client.getTableOption(databaseName, tblName).isDefined + override def tableExists(tableIdent: TableIdentifier): Boolean = { + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + client.getTableOption(dbName, tblName).isDefined } - def lookupRelation( - tableIdentifier: Seq[String], + override def lookupRelation( + tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - client.currentDatabase) - val tblName = tableIdent.last - val table = client.getTable(databaseName, tblName) + val qualifiedTableName = getQualifiedTableName(tableIdent) + val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) if (table.properties.get("spark.sql.sources.provider").isDefined) { - val dataSourceTable = - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + val dataSourceTable = cachedDataSourceTables(qualifiedTableName) + val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - val withAlias = - alias.map(a => Subquery(a, dataSourceTable)).getOrElse( - Subquery(tableIdent.last, dataSourceTable)) - - withAlias + alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) } else if (table.tableType == VirtualView) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { @@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText)) } } else { - MetastoreRelation(databaseName, tblName, alias)(table)(hive) + MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive) } } @@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive client.listTables(db).map(tableName => (tableName, false)) } - protected def processDatabaseAndTableName( - databaseName: Option[String], - tableName: String): (Option[String], String) = { - if (!caseSensitive) { - (databaseName.map(_.toLowerCase), tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - - protected def processDatabaseAndTableName( - databaseName: String, - tableName: String): (String, String) = { - if (!caseSensitive) { - (databaseName.toLowerCase, tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") } - val (dbName, tblName) = processDatabaseAndTableName( - table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( table.copy( @@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( TableIdentifier(desc.name), - hive.conf.defaultDataSourceName, + conf.defaultDataSourceName, temporary = false, Array.empty[String], mode, @@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive table } - val (dbName, tblName) = - processDatabaseAndTableName( - desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( desc.copy( @@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } @@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + override def unregisterTable(tableIdent: TableIdentifier): Unit = { throw new UnsupportedOperationException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 1d50501940..d4ff5cc0f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{logical, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.HiveShim._ @@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging { throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ") } - protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = { - val (db, tableName) = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) - } - - (db, tableName) - } - - protected def extractTableIdent(tableNameParts: Node): Seq[String] = { + protected def extractTableIdent(tableNameParts: Node): TableIdentifier = { tableNameParts.getChildren.asScala.map { case Token(part, Nil) => cleanIdentifier(part) } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) + case Seq(tableOnly) => TableIdentifier(tableOnly) + case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName)) case other => sys.error("Hive only supports tables names like 'tableName' " + s"or 'databaseName.tableName', found '$other'") } @@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { - val (db, viewName) = extractDbNameTableName(viewNameParts) + val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts) val originalText = context.getTokenRewriteStream .toString(query.getTokenStartIndex, query.getTokenStopIndex) val tableDesc = HiveTable( - specifiedDatabase = db, + specifiedDatabase = dbName, name = viewName, schema = schema, partitionColumns = Seq.empty[HiveColumn], @@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined) + UnresolvedRelation(TableIdentifier(tableName.getText), None), + isExtended = extended.isDefined) } } // All other cases. @@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLELOCATION", "TOK_TABLEPROPERTIES"), children) - val (db, tableName) = extractDbNameTableName(tableNameParts) + val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) // TODO add bucket support var tableDesc: HiveTable = HiveTable( - specifiedDatabase = db, - name = tableName, + specifiedDatabase = dbName, + name = tblName, schema = Seq.empty[HiveColumn], partitionColumns = Seq.empty[HiveColumn], properties = Map[String, String](), @@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nonAliasClauses) } - val tableIdent = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) - case other => sys.error("Hive only supports tables names like 'tableName' " + - s"or 'databaseName.tableName', found '$other'") - } + val tableIdent = extractTableIdent(tableNameParts) val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } val relation = UnresolvedRelation(tableIdent, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 8422287e17..e72a60b42e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} @@ -37,8 +38,7 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends RunnableCommand { - def database: String = tableDesc.database - def tableName: String = tableDesc.name + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) override def children: Seq[LogicalPlan] = Seq(query) @@ -72,18 +72,18 @@ case class CreateTableAsSelect( hiveContext.catalog.client.createTable(withSchema) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { + hiveContext.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(Seq(database, tableName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { - throw new AnalysisException(s"$database.$tableName already exists.") + throw new AnalysisException(s"$tableIdentifier already exists.") } } else { hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd @@ -93,6 +93,6 @@ case class CreateTableAsSelect( } override def argString: String = { - s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]" + s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 2b504ac974..2c81115ee4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} import org.apache.spark.sql.{AnalysisException, Row, SQLContext} @@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect( assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) assert(tableDesc.viewText.isDefined) + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - val database = tableDesc.database - val viewName = tableDesc.name - if (hiveContext.catalog.tableExists(Seq(database, viewName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // view already exists, will do nothing, to keep consistent with Hive } else if (orReplace) { hiveContext.catalog.client.alertView(prepareTable()) } else { - throw new AnalysisException(s"View $database.$viewName already exists. " + + throw new AnalysisException(s"View $tableIdentifier already exists. " + "If you want to update the view definition, please use ALTER VIEW AS or " + "CREATE OR REPLACE VIEW AS") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 51ec92afd0..94210a5394 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(Seq(tableName)) + hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand { } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, @@ -131,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.catalog.tableExists(tableIdent.toSeq)) { + if (hiveContext.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -160,7 +159,6 @@ case class CreateMetastoreDataSource( } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSourceAsSelect( tableIdent: TableIdentifier, @@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect( } var existingSchema = None: Option[StructType] - if (sqlContext.catalog.tableExists(tableIdent.toSeq)) { + if (sqlContext.catalog.tableExists(tableIdent)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect( val resolved = ResolvedDataSource( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) - EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match { + EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) => if (l.relation != createdRelation.relation) { val errorDescription = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ff39ccb7c1..6883d305cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last } + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index c8d272794d..8c4af1b8ea 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.SaveMode; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.util.Utils; public class JavaMetastoreDataSourcesSuite { @@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable")); + hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 579631df77..183aca29cf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row @@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. - catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan) + catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - catalog.unregisterTable(Seq("ListTablesSuiteTable")) + catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d356538000..d292887688 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.util.Utils /** @@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[IOException] { - read.json(catalog.hiveDefaultTableFilePath("savedJsonTable")) + read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Manually create a metastore data source table. catalog.createDataSourceTable( - tableName = "wide_schema", + tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], provider = "json", @@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "EXTERNAL" -> "FALSE"), tableType = ManagedTable, serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(tableName))) + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))) catalog.client.createTable(hiveTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 6a692d6fce..9bb32f11b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag import org.apache.spark.sql.{Row, SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes + hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.catalog.unregisterTable(Seq("tempTable")) + hiveContext.catalog.unregisterTable(TableIdentifier("tempTable")) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6aa34605b0..c929ba5068 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.DefaultParserDialect +import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries} import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("CTAS without serde") { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { - val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) + val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName))) relation match { case LogicalRelation(r: ParquetRelation, _) => if (!isDataSourceParquet) { @@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { (1 to 100).par.map { i => val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - catalog.lookupRelation(Seq(tableName)) + catalog.lookupRelation(TableIdentifier(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 5eb39b1129..7efeab528c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { |