aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala174
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala134
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala5
32 files changed, 212 insertions, 327 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index dfab239885..2595e1f90c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
joinedRelation | relationFactor
protected lazy val relationFactor: Parser[LogicalPlan] =
- ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+ ( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ {
case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
}
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
index d701559bf2..4d4e4ded99 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
@@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst
/**
* Identifies a `table` in `database`. If `database` is not defined, the current database is used.
*/
-private[sql] case class TableIdentifier(table: String, database: Option[String] = None) {
- def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))
-
- def toSeq: Seq[String] = database.toSeq :+ table
+private[sql] case class TableIdentifier(table: String, database: Option[String]) {
+ def this(table: String) = this(table, None)
override def toString: String = quotedString
- def quotedString: String = toSeq.map("`" + _ + "`").mkString(".")
+ def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
+
+ def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
+}
- def unquotedString: String = toSeq.mkString(".")
+private[sql] object TableIdentifier {
+ def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 041ab22827..e6046055bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -105,7 +105,7 @@ class Analyzer(
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
- val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+ val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
@@ -257,7 +257,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
- u.failAnalysis(s"no such table ${u.tableName}")
+ u.failAnalysis(s"Table Not Found: ${u.tableName}")
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 4cc9a5520a..8f4ce74a2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -42,11 +42,9 @@ trait Catalog {
val conf: CatalystConf
- def tableExists(tableIdentifier: Seq[String]): Boolean
+ def tableExists(tableIdent: TableIdentifier): Boolean
- def lookupRelation(
- tableIdentifier: Seq[String],
- alias: Option[String] = None): LogicalPlan
+ def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
/**
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
@@ -56,89 +54,59 @@ trait Catalog {
def refreshTable(tableIdent: TableIdentifier): Unit
- // TODO: Refactor it in the work of SPARK-10104
- def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
+ def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
- // TODO: Refactor it in the work of SPARK-10104
- def unregisterTable(tableIdentifier: Seq[String]): Unit
+ def unregisterTable(tableIdent: TableIdentifier): Unit
def unregisterAllTables(): Unit
- // TODO: Refactor it in the work of SPARK-10104
- protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
- if (conf.caseSensitiveAnalysis) {
- tableIdentifier
- } else {
- tableIdentifier.map(_.toLowerCase)
- }
- }
-
- // TODO: Refactor it in the work of SPARK-10104
- protected def getDbTableName(tableIdent: Seq[String]): String = {
- val size = tableIdent.size
- if (size <= 2) {
- tableIdent.mkString(".")
- } else {
- tableIdent.slice(size - 2, size).mkString(".")
- }
- }
-
- // TODO: Refactor it in the work of SPARK-10104
- protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
- (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
- }
-
/**
- * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
- * We use this method to check it.
+ * Get the table name of TableIdentifier for temporary tables.
*/
- protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
- if (tableIdentifier.length > 1) {
+ protected def getTableName(tableIdent: TableIdentifier): String = {
+ // It is not allowed to specify database name for temporary tables.
+ // We check it here and throw exception if database is defined.
+ if (tableIdent.database.isDefined) {
throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
"for temporary tables. If the table name has dots (.) in it, please quote the " +
"table name with backticks (`).")
}
+ if (conf.caseSensitiveAnalysis) {
+ tableIdent.table
+ } else {
+ tableIdent.table.toLowerCase
+ }
}
}
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
- val tables = new ConcurrentHashMap[String, LogicalPlan]
-
- override def registerTable(
- tableIdentifier: Seq[String],
- plan: LogicalPlan): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.put(getDbTableName(tableIdent), plan)
+ private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
+
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ tables.put(getTableName(tableIdent), plan)
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.remove(getDbTableName(tableIdent))
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ tables.remove(getTableName(tableIdent))
}
override def unregisterAllTables(): Unit = {
tables.clear()
}
- override def tableExists(tableIdentifier: Seq[String]): Boolean = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.containsKey(getDbTableName(tableIdent))
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ tables.containsKey(getTableName(tableIdent))
}
override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- val tableFullName = getDbTableName(tableIdent)
- val table = tables.get(tableFullName)
+ val tableName = getTableName(tableIdent)
+ val table = tables.get(tableName)
if (table == null) {
- sys.error(s"Table Not Found: $tableFullName")
+ throw new NoSuchTableException
}
- val tableWithQualifiers = Subquery(tableIdent.last, table)
+ val tableWithQualifiers = Subquery(tableName, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
@@ -146,11 +114,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- val result = ArrayBuffer.empty[(String, Boolean)]
- for (name <- tables.keySet().asScala) {
- result += ((name, true))
- }
- result
+ tables.keySet().asScala.map(_ -> true).toSeq
}
override def refreshTable(tableIdent: TableIdentifier): Unit = {
@@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
* lost when the JVM exits.
*/
trait OverrideCatalog extends Catalog {
+ private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
- // TODO: This doesn't work when the database changes...
- val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]()
-
- abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- // A temporary tables only has a single part in the tableIdentifier.
- val overriddenTable = if (tableIdentifier.length > 1) {
- None: Option[LogicalPlan]
+ private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
+ if (tableIdent.database.isDefined) {
+ None
} else {
- overrides.get(getDBTable(tableIdent))
+ Option(overrides.get(getTableName(tableIdent)))
}
- overriddenTable match {
+ }
+
+ abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ getOverriddenTable(tableIdent) match {
case Some(_) => true
- case None => super.tableExists(tableIdentifier)
+ case None => super.tableExists(tableIdent)
}
}
abstract override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- // A temporary tables only has a single part in the tableIdentifier.
- val overriddenTable = if (tableIdentifier.length > 1) {
- None: Option[LogicalPlan]
- } else {
- overrides.get(getDBTable(tableIdent))
- }
- val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
+ getOverriddenTable(tableIdent) match {
+ case Some(table) =>
+ val tableName = getTableName(tableIdent)
+ val tableWithQualifiers = Subquery(tableName, table)
- // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
- // properly qualified with this alias.
- val withAlias =
- tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
+ // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
+ // are properly qualified with this alias.
+ alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
- withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
+ case None => super.lookupRelation(tableIdent, alias)
+ }
}
abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- // We always return all temporary tables.
- val temporaryTables = overrides.map {
- case ((_, tableName), _) => (tableName, true)
- }.toSeq
-
- temporaryTables ++ super.getTables(databaseName)
+ overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
}
- override def registerTable(
- tableIdentifier: Seq[String],
- plan: LogicalPlan): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- overrides.put(getDBTable(tableIdent), plan)
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ overrides.put(getTableName(tableIdent), plan)
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
- // A temporary tables only has a single part in the tableIdentifier.
- // If tableIdentifier has more than one parts, it is not a temporary table
- // and we do not need to do anything at here.
- if (tableIdentifier.length == 1) {
- val tableIdent = processTableIdentifier(tableIdentifier)
- overrides.remove(getDBTable(tableIdent))
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ if (tableIdent.database.isEmpty) {
+ overrides.remove(getTableName(tableIdent))
}
}
@@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog {
override val conf: CatalystConf = EmptyConf
- override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
throw new UnsupportedOperationException
}
override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
throw new UnsupportedOperationException
}
@@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog {
throw new UnsupportedOperationException
}
- override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
- override def unregisterAllTables(): Unit = {}
+ override def unregisterAllTables(): Unit = {
+ throw new UnsupportedOperationException
+ }
override def refreshTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 43ee319193..c973650039 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.errors
+import org.apache.spark.sql.catalyst.{TableIdentifier, errors}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
- tableIdentifier: Seq[String],
+ tableIdentifier: TableIdentifier,
alias: Option[String] = None) extends LeafNode {
/** Returns a `.` separated name for this relation. */
- def tableName: String = tableIdentifier.mkString(".")
+ def tableName: String = tableIdentifier.unquotedString
override def output: Seq[Attribute] = Nil
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 699c4cc63d..27b3cd84b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -286,7 +286,8 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
- analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
+ analysis.UnresolvedRelation(TableIdentifier(tableName)),
+ Map.empty, logicalPlan, overwrite, false)
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan))
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 820b336aac..ec05cfa63c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -53,32 +54,39 @@ class AnalysisSuite extends AnalysisTest {
Project(testRelation.output, testRelation))
checkAnalysis(
- Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("TbL.a")),
+ UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation))
assertAnalysisError(
- Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Seq("cannot resolve"))
checkAnalysis(
- Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation),
caseSensitive = false)
checkAnalysis(
- Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation),
caseSensitive = false)
}
test("resolve relations") {
- assertAnalysisError(UnresolvedRelation(Seq("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+ assertAnalysisError(
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
- checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation)
+ checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
- checkAnalysis(UnresolvedRelation(Seq("tAbLe"), None), testRelation, caseSensitive = false)
+ checkAnalysis(
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
- checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation, caseSensitive = false)
+ checkAnalysis(
+ UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
}
test("divide should be casted into fractional types") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 53b3695a86..23861ed15d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
trait AnalysisTest extends PlanTest {
@@ -30,8 +31,8 @@ trait AnalysisTest extends PlanTest {
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
- caseSensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
- caseInsensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
+ caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+ caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
@@ -67,8 +68,7 @@ trait AnalysisTest extends PlanTest {
expectedErrors: Seq[String],
caseSensitive: Boolean = true): Unit = {
val analyzer = getAnalyzer(caseSensitive)
- // todo: make sure we throw AnalysisException during analysis
- val e = intercept[Exception] {
+ val e = intercept[AnalysisException] {
analyzer.checkAnalysis(analyzer.execute(inputPlan))
}
assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b4ad618c23..40c4ae7920 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SimpleCatalystConf(true)
@@ -47,7 +47,7 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
val b: Expression = UnresolvedAttribute("b")
before {
- catalog.registerTable(Seq("table"), relation)
+ catalog.registerTable(TableIdentifier("table"), relation)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 97a8b6518a..eacdea2c1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}
+import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
/**
* :: Experimental ::
@@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
- DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+ DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName)))
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03e973666e..764510ab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val overwrite = mode == SaveMode.Overwrite
df.sqlContext.executePlan(
InsertIntoTable(
- UnresolvedRelation(tableIdent.toSeq),
+ UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
df.logicalPlan,
overwrite,
@@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq)
+ val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3d5e35ab31..361eb576c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -714,7 +714,7 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- catalog.registerTable(Seq(tableName), df.logicalPlan)
+ catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
}
/**
@@ -728,7 +728,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
- catalog.unregisterTable(Seq(tableName))
+ catalog.unregisterTable(TableIdentifier(tableName))
}
/**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
}
private def table(tableIdent: TableIdentifier): DataFrame = {
- DataFrame(this, catalog.lookupRelation(tableIdent.toSeq))
+ DataFrame(this, catalog.lookupRelation(tableIdent))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index f7a88b98c0..446739d5b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
case e ~ tableIdent =>
- DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined)
+ DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31d6b75e13..e7deeff13d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -71,7 +71,6 @@ case class CreateTableUsing(
* can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
case class CreateTableUsingAsSelect(
tableIdent: TableIdentifier,
provider: String,
@@ -93,7 +92,7 @@ case class CreateTempTableUsing(
val resolved = ResolvedDataSource(
sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect(
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq)
+ val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 8efc8016f9..b00e5680fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) {
+ if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
// Need to remove SubQuery operator.
- EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 356d4ff3fa..fd566c8276 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.PhysicalRDD
import scala.concurrent.duration._
@@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
testData.select('key).registerTempTable("t1")
sqlContext.table("t1")
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
}
test("Drops cached temporary table") {
@@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert(sqlContext.isCached("t2"))
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
assert(!sqlContext.isCached("t2"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7a027e1308..b1fb068158 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.test.SharedSQLContext
@@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")
- val left = UnresolvedRelation(Seq("left"), None)
- val right = UnresolvedRelation(Seq("right"), None)
+ val left = UnresolvedRelation(TableIdentifier("left"), None)
+ val right = UnresolvedRelation(TableIdentifier("right"), None)
checkAnswer(
left.join(right, $"left.N" === $"right.N", "full"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index eab0fbb196..5688f46e5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.TableIdentifier
class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
import testImplicits._
@@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}
test("get all tables") {
@@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
@@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index cc02ef81c9..baff7f5752 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
@@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e620d7fb82..4d8a3f728e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -358,7 +358,7 @@ class HiveContext private[hive](
@Experimental
def analyze(tableName: String) {
val tableIdent = SqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq))
+ val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent))
relation match {
case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1f8223e1ff..5819cb9d08 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.execution.{FileRelation, datasources}
@@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
- // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
- case class QualifiedTableName(database: String, name: String) {
- def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+ case class QualifiedTableName(database: String, name: String)
+
+ private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+ QualifiedTableName(
+ tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+ tableIdent.table.toLowerCase)
+ }
+
+ private def getQualifiedTableName(hiveTable: HiveTable) = {
+ QualifiedTableName(
+ hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+ hiveTable.name.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
def invalidateTable(tableIdent: TableIdentifier): Unit = {
- val databaseName = tableIdent.database.getOrElse(client.currentDatabase)
- val tableName = tableIdent.table
-
- cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
- }
-
- val caseSensitive: Boolean = false
-
- /**
- * Creates a data source table (a table created with USING clause) in Hive's metastore.
- * Returns true when the table has been created. Otherwise, false.
- */
- // TODO: Remove this in SPARK-10104.
- def createDataSourceTable(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- createDataSourceTable(
- SqlParser.parseTableIdentifier(tableName),
- userSpecifiedSchema,
- partitionColumns,
- provider,
- options,
- isExternal)
+ cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
def createDataSourceTable(
@@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
- val (dbName, tblName) = {
- val database = tableIdent.database.getOrElse(client.currentDatabase)
- processDatabaseAndTableName(database, tableIdent.table)
- }
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
@@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
- val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
@@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
(None, message)
}
- (hiveCompitiableTable, logMessage) match {
+ (hiveCompatibleTable, logMessage) match {
case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatiable way.
+ // We first try to save the metadata of the table in a Hive compatible way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
@@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
- def hiveDefaultTableFilePath(tableName: String): String = {
- hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName))
- }
-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
- val database = tableIdent.database.getOrElse(client.currentDatabase)
-
- new Path(
- new Path(client.getDatabase(database).location),
- tableIdent.table.toLowerCase).toString
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ new Path(new Path(client.getDatabase(dbName).location), tblName).toString
}
- def tableExists(tableIdentifier: Seq[String]): Boolean = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName =
- tableIdent
- .lift(tableIdent.size - 2)
- .getOrElse(client.currentDatabase)
- val tblName = tableIdent.last
- client.getTableOption(databaseName, tblName).isDefined
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ client.getTableOption(dbName, tblName).isDefined
}
- def lookupRelation(
- tableIdentifier: Seq[String],
+ override def lookupRelation(
+ tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
- client.currentDatabase)
- val tblName = tableIdent.last
- val table = client.getTable(databaseName, tblName)
+ val qualifiedTableName = getQualifiedTableName(tableIdent)
+ val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
if (table.properties.get("spark.sql.sources.provider").isDefined) {
- val dataSourceTable =
- cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
+ val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
- val withAlias =
- alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
- Subquery(tableIdent.last, dataSourceTable))
-
- withAlias
+ alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
} else if (table.tableType == VirtualView) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
@@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
}
} else {
- MetastoreRelation(databaseName, tblName, alias)(table)(hive)
+ MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
}
}
@@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
client.listTables(db).map(tableName => (tableName, false))
}
- protected def processDatabaseAndTableName(
- databaseName: Option[String],
- tableName: String): (Option[String], String) = {
- if (!caseSensitive) {
- (databaseName.map(_.toLowerCase), tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
- protected def processDatabaseAndTableName(
- databaseName: String,
- tableName: String): (String, String) = {
- if (!caseSensitive) {
- (databaseName.toLowerCase, tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
@@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}
- val (dbName, tblName) = processDatabaseAndTableName(
- table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateViewAsSelect(
table.copy(
@@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.name),
- hive.conf.defaultDataSourceName,
+ conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
mode,
@@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
table
}
- val (dbName, tblName) =
- processDatabaseAndTableName(
- desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateTableAsSelect(
desc.copy(
@@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
@@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 1d50501940..d4ff5cc0f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{logical, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveShim._
@@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging {
throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
}
- protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
- val (db, tableName) =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
- }
-
- (db, tableName)
- }
-
- protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+ protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
tableNameParts.getChildren.asScala.map {
case Token(part, Nil) => cleanIdentifier(part)
} match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
+ case Seq(tableOnly) => TableIdentifier(tableOnly)
+ case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
case other => sys.error("Hive only supports tables names like 'tableName' " +
s"or 'databaseName.tableName', found '$other'")
}
@@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
properties: Map[String, String],
allowExist: Boolean,
replace: Boolean): CreateViewAsSelect = {
- val (db, viewName) = extractDbNameTableName(viewNameParts)
+ val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
val originalText = context.getTokenRewriteStream
.toString(query.getTokenStartIndex, query.getTokenStopIndex)
val tableDesc = HiveTable(
- specifiedDatabase = db,
+ specifiedDatabase = dbName,
name = viewName,
schema = schema,
partitionColumns = Seq.empty[HiveColumn],
@@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
- UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined)
+ UnresolvedRelation(TableIdentifier(tableName.getText), None),
+ isExtended = extended.isDefined)
}
}
// All other cases.
@@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
- val (db, tableName) = extractDbNameTableName(tableNameParts)
+ val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
// TODO add bucket support
var tableDesc: HiveTable = HiveTable(
- specifiedDatabase = db,
- name = tableName,
+ specifiedDatabase = dbName,
+ name = tblName,
schema = Seq.empty[HiveColumn],
partitionColumns = Seq.empty[HiveColumn],
properties = Map[String, String](),
@@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
nonAliasClauses)
}
- val tableIdent =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
- case other => sys.error("Hive only supports tables names like 'tableName' " +
- s"or 'databaseName.tableName', found '$other'")
- }
+ val tableIdent = extractTableIdent(tableNameParts)
val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
val relation = UnresolvedRelation(tableIdent, alias)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 8422287e17..e72a60b42e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
@@ -37,8 +38,7 @@ case class CreateTableAsSelect(
allowExisting: Boolean)
extends RunnableCommand {
- def database: String = tableDesc.database
- def tableName: String = tableDesc.name
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
override def children: Seq[LogicalPlan] = Seq(query)
@@ -72,18 +72,18 @@ case class CreateTableAsSelect(
hiveContext.catalog.client.createTable(withSchema)
// Get the Metastore Relation
- hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
+ hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
- throw new AnalysisException(s"$database.$tableName already exists.")
+ throw new AnalysisException(s"$tableIdentifier already exists.")
}
} else {
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
}
override def argString: String = {
- s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]"
+ s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 2b504ac974..2c81115ee4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
@@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect(
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- val database = tableDesc.database
- val viewName = tableDesc.name
- if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// view already exists, will do nothing, to keep consistent with Hive
} else if (orReplace) {
hiveContext.catalog.client.alertView(prepareTable())
} else {
- throw new AnalysisException(s"View $database.$viewName already exists. " +
+ throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 51ec92afd0..94210a5394 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(Seq(tableName))
+ hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
@@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,
@@ -131,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (hiveContext.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
@@ -160,7 +159,6 @@ case class CreateMetastoreDataSource(
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSourceAsSelect(
tableIdent: TableIdentifier,
@@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect(
}
var existingSchema = None: Option[StructType]
- if (sqlContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (sqlContext.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect(
val resolved = ResolvedDataSource(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
- EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ff39ccb7c1..6883d305cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
+ logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
val referencedTestTables = referencedTables.filter(testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index c8d272794d..8c4af1b8ea 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.util.Utils;
public class JavaMetastoreDataSourcesSuite {
@@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite {
if (path.exists()) {
path.delete();
}
- hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+ hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+ new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 579631df77..183aca29cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
@@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def beforeAll(): Unit = {
// The catalog in HiveContext is a case insensitive one.
- catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan)
+ catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
}
override def afterAll(): Unit = {
- catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d356538000..d292887688 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.util.Utils
/**
@@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|)
""".stripMargin)
- val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+ val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
intercept[IOException] {
- read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+ read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
}
}
@@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Manually create a metastore data source table.
catalog.createDataSourceTable(
- tableName = "wide_schema",
+ tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
provider = "json",
@@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
"EXTERNAL" -> "FALSE"),
tableType = ManagedTable,
serdeProperties = Map(
- "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+ "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
catalog.client.createTable(hiveTable)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6a692d6fce..9bb32f11b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.reflect.ClassTag
import org.apache.spark.sql.{Row, SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
+ hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
intercept[UnsupportedOperationException] {
hiveContext.analyze("tempTable")
}
- hiveContext.catalog.unregisterTable(Seq("tempTable"))
+ hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6aa34605b0..c929ba5068 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.DefaultParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries}
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
- val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+ val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation, _) =>
if (!isDataSourceParquet) {
@@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
- catalog.lookupRelation(Seq(tableName))
+ catalog.lookupRelation(TableIdentifier(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5eb39b1129..7efeab528c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {