aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala134
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala5
13 files changed, 86 insertions, 161 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e620d7fb82..4d8a3f728e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -358,7 +358,7 @@ class HiveContext private[hive](
@Experimental
def analyze(tableName: String) {
val tableIdent = SqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq))
+ val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent))
relation match {
case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1f8223e1ff..5819cb9d08 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.execution.{FileRelation, datasources}
@@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
- // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
- case class QualifiedTableName(database: String, name: String) {
- def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+ case class QualifiedTableName(database: String, name: String)
+
+ private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+ QualifiedTableName(
+ tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+ tableIdent.table.toLowerCase)
+ }
+
+ private def getQualifiedTableName(hiveTable: HiveTable) = {
+ QualifiedTableName(
+ hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+ hiveTable.name.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
def invalidateTable(tableIdent: TableIdentifier): Unit = {
- val databaseName = tableIdent.database.getOrElse(client.currentDatabase)
- val tableName = tableIdent.table
-
- cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
- }
-
- val caseSensitive: Boolean = false
-
- /**
- * Creates a data source table (a table created with USING clause) in Hive's metastore.
- * Returns true when the table has been created. Otherwise, false.
- */
- // TODO: Remove this in SPARK-10104.
- def createDataSourceTable(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- createDataSourceTable(
- SqlParser.parseTableIdentifier(tableName),
- userSpecifiedSchema,
- partitionColumns,
- provider,
- options,
- isExternal)
+ cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
def createDataSourceTable(
@@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
- val (dbName, tblName) = {
- val database = tableIdent.database.getOrElse(client.currentDatabase)
- processDatabaseAndTableName(database, tableIdent.table)
- }
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
@@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
- val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
@@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
(None, message)
}
- (hiveCompitiableTable, logMessage) match {
+ (hiveCompatibleTable, logMessage) match {
case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatiable way.
+ // We first try to save the metadata of the table in a Hive compatible way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
@@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
- def hiveDefaultTableFilePath(tableName: String): String = {
- hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName))
- }
-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
- val database = tableIdent.database.getOrElse(client.currentDatabase)
-
- new Path(
- new Path(client.getDatabase(database).location),
- tableIdent.table.toLowerCase).toString
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ new Path(new Path(client.getDatabase(dbName).location), tblName).toString
}
- def tableExists(tableIdentifier: Seq[String]): Boolean = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName =
- tableIdent
- .lift(tableIdent.size - 2)
- .getOrElse(client.currentDatabase)
- val tblName = tableIdent.last
- client.getTableOption(databaseName, tblName).isDefined
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ client.getTableOption(dbName, tblName).isDefined
}
- def lookupRelation(
- tableIdentifier: Seq[String],
+ override def lookupRelation(
+ tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
- client.currentDatabase)
- val tblName = tableIdent.last
- val table = client.getTable(databaseName, tblName)
+ val qualifiedTableName = getQualifiedTableName(tableIdent)
+ val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
if (table.properties.get("spark.sql.sources.provider").isDefined) {
- val dataSourceTable =
- cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
+ val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
- val withAlias =
- alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
- Subquery(tableIdent.last, dataSourceTable))
-
- withAlias
+ alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
} else if (table.tableType == VirtualView) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
@@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
}
} else {
- MetastoreRelation(databaseName, tblName, alias)(table)(hive)
+ MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
}
}
@@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
client.listTables(db).map(tableName => (tableName, false))
}
- protected def processDatabaseAndTableName(
- databaseName: Option[String],
- tableName: String): (Option[String], String) = {
- if (!caseSensitive) {
- (databaseName.map(_.toLowerCase), tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
- protected def processDatabaseAndTableName(
- databaseName: String,
- tableName: String): (String, String) = {
- if (!caseSensitive) {
- (databaseName.toLowerCase, tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
@@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}
- val (dbName, tblName) = processDatabaseAndTableName(
- table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateViewAsSelect(
table.copy(
@@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.name),
- hive.conf.defaultDataSourceName,
+ conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
mode,
@@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
table
}
- val (dbName, tblName) =
- processDatabaseAndTableName(
- desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateTableAsSelect(
desc.copy(
@@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
@@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 1d50501940..d4ff5cc0f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{logical, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveShim._
@@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging {
throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
}
- protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
- val (db, tableName) =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
- }
-
- (db, tableName)
- }
-
- protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+ protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
tableNameParts.getChildren.asScala.map {
case Token(part, Nil) => cleanIdentifier(part)
} match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
+ case Seq(tableOnly) => TableIdentifier(tableOnly)
+ case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
case other => sys.error("Hive only supports tables names like 'tableName' " +
s"or 'databaseName.tableName', found '$other'")
}
@@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
properties: Map[String, String],
allowExist: Boolean,
replace: Boolean): CreateViewAsSelect = {
- val (db, viewName) = extractDbNameTableName(viewNameParts)
+ val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
val originalText = context.getTokenRewriteStream
.toString(query.getTokenStartIndex, query.getTokenStopIndex)
val tableDesc = HiveTable(
- specifiedDatabase = db,
+ specifiedDatabase = dbName,
name = viewName,
schema = schema,
partitionColumns = Seq.empty[HiveColumn],
@@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
- UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined)
+ UnresolvedRelation(TableIdentifier(tableName.getText), None),
+ isExtended = extended.isDefined)
}
}
// All other cases.
@@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
- val (db, tableName) = extractDbNameTableName(tableNameParts)
+ val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
// TODO add bucket support
var tableDesc: HiveTable = HiveTable(
- specifiedDatabase = db,
- name = tableName,
+ specifiedDatabase = dbName,
+ name = tblName,
schema = Seq.empty[HiveColumn],
partitionColumns = Seq.empty[HiveColumn],
properties = Map[String, String](),
@@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
nonAliasClauses)
}
- val tableIdent =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
- case other => sys.error("Hive only supports tables names like 'tableName' " +
- s"or 'databaseName.tableName', found '$other'")
- }
+ val tableIdent = extractTableIdent(tableNameParts)
val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
val relation = UnresolvedRelation(tableIdent, alias)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 8422287e17..e72a60b42e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
@@ -37,8 +38,7 @@ case class CreateTableAsSelect(
allowExisting: Boolean)
extends RunnableCommand {
- def database: String = tableDesc.database
- def tableName: String = tableDesc.name
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
override def children: Seq[LogicalPlan] = Seq(query)
@@ -72,18 +72,18 @@ case class CreateTableAsSelect(
hiveContext.catalog.client.createTable(withSchema)
// Get the Metastore Relation
- hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
+ hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
- throw new AnalysisException(s"$database.$tableName already exists.")
+ throw new AnalysisException(s"$tableIdentifier already exists.")
}
} else {
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
}
override def argString: String = {
- s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]"
+ s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 2b504ac974..2c81115ee4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
@@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect(
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- val database = tableDesc.database
- val viewName = tableDesc.name
- if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// view already exists, will do nothing, to keep consistent with Hive
} else if (orReplace) {
hiveContext.catalog.client.alertView(prepareTable())
} else {
- throw new AnalysisException(s"View $database.$viewName already exists. " +
+ throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 51ec92afd0..94210a5394 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(Seq(tableName))
+ hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
@@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,
@@ -131,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (hiveContext.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
@@ -160,7 +159,6 @@ case class CreateMetastoreDataSource(
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSourceAsSelect(
tableIdent: TableIdentifier,
@@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect(
}
var existingSchema = None: Option[StructType]
- if (sqlContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (sqlContext.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect(
val resolved = ResolvedDataSource(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
- EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ff39ccb7c1..6883d305cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
+ logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
val referencedTestTables = referencedTables.filter(testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index c8d272794d..8c4af1b8ea 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.util.Utils;
public class JavaMetastoreDataSourcesSuite {
@@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite {
if (path.exists()) {
path.delete();
}
- hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+ hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+ new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 579631df77..183aca29cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
@@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def beforeAll(): Unit = {
// The catalog in HiveContext is a case insensitive one.
- catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan)
+ catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
}
override def afterAll(): Unit = {
- catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d356538000..d292887688 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.util.Utils
/**
@@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|)
""".stripMargin)
- val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+ val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
intercept[IOException] {
- read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+ read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
}
}
@@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Manually create a metastore data source table.
catalog.createDataSourceTable(
- tableName = "wide_schema",
+ tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
provider = "json",
@@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
"EXTERNAL" -> "FALSE"),
tableType = ManagedTable,
serdeProperties = Map(
- "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+ "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
catalog.client.createTable(hiveTable)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6a692d6fce..9bb32f11b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.reflect.ClassTag
import org.apache.spark.sql.{Row, SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
+ hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
intercept[UnsupportedOperationException] {
hiveContext.analyze("tempTable")
}
- hiveContext.catalog.unregisterTable(Seq("tempTable"))
+ hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6aa34605b0..c929ba5068 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.DefaultParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries}
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
- val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+ val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation, _) =>
if (!isDataSourceParquet) {
@@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
- catalog.lookupRelation(Seq(tableName))
+ catalog.lookupRelation(TableIdentifier(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5eb39b1129..7efeab528c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {