aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-14 16:05:37 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-14 16:05:37 -0700
commit56d7da14ab8f89bf4f303b27f51fd22d23967ffb (patch)
treed21fc4a4d5a197788a8b8e165d89d3e95bff13be /sql
parent9a430a027faafb083ca569698effb697af26a1db (diff)
downloadspark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.gz
spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.bz2
spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.zip
[SPARK-10104] [SQL] Consolidate different forms of table identifiers
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string. Author: Wenchen Fan <wenchen@databricks.com> Author: Wenchen Fan <cloud0fan@163.com> Closes #8453 from cloud-fan/table-name.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala174
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala134
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala5
32 files changed, 212 insertions, 327 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index dfab239885..2595e1f90c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
joinedRelation | relationFactor
protected lazy val relationFactor: Parser[LogicalPlan] =
- ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+ ( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ {
case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
}
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
index d701559bf2..4d4e4ded99 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
@@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst
/**
* Identifies a `table` in `database`. If `database` is not defined, the current database is used.
*/
-private[sql] case class TableIdentifier(table: String, database: Option[String] = None) {
- def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))
-
- def toSeq: Seq[String] = database.toSeq :+ table
+private[sql] case class TableIdentifier(table: String, database: Option[String]) {
+ def this(table: String) = this(table, None)
override def toString: String = quotedString
- def quotedString: String = toSeq.map("`" + _ + "`").mkString(".")
+ def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
+
+ def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
+}
- def unquotedString: String = toSeq.mkString(".")
+private[sql] object TableIdentifier {
+ def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 041ab22827..e6046055bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -105,7 +105,7 @@ class Analyzer(
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
- val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+ val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
@@ -257,7 +257,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
- u.failAnalysis(s"no such table ${u.tableName}")
+ u.failAnalysis(s"Table Not Found: ${u.tableName}")
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 4cc9a5520a..8f4ce74a2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -42,11 +42,9 @@ trait Catalog {
val conf: CatalystConf
- def tableExists(tableIdentifier: Seq[String]): Boolean
+ def tableExists(tableIdent: TableIdentifier): Boolean
- def lookupRelation(
- tableIdentifier: Seq[String],
- alias: Option[String] = None): LogicalPlan
+ def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
/**
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
@@ -56,89 +54,59 @@ trait Catalog {
def refreshTable(tableIdent: TableIdentifier): Unit
- // TODO: Refactor it in the work of SPARK-10104
- def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
+ def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
- // TODO: Refactor it in the work of SPARK-10104
- def unregisterTable(tableIdentifier: Seq[String]): Unit
+ def unregisterTable(tableIdent: TableIdentifier): Unit
def unregisterAllTables(): Unit
- // TODO: Refactor it in the work of SPARK-10104
- protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
- if (conf.caseSensitiveAnalysis) {
- tableIdentifier
- } else {
- tableIdentifier.map(_.toLowerCase)
- }
- }
-
- // TODO: Refactor it in the work of SPARK-10104
- protected def getDbTableName(tableIdent: Seq[String]): String = {
- val size = tableIdent.size
- if (size <= 2) {
- tableIdent.mkString(".")
- } else {
- tableIdent.slice(size - 2, size).mkString(".")
- }
- }
-
- // TODO: Refactor it in the work of SPARK-10104
- protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
- (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
- }
-
/**
- * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
- * We use this method to check it.
+ * Get the table name of TableIdentifier for temporary tables.
*/
- protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
- if (tableIdentifier.length > 1) {
+ protected def getTableName(tableIdent: TableIdentifier): String = {
+ // It is not allowed to specify database name for temporary tables.
+ // We check it here and throw exception if database is defined.
+ if (tableIdent.database.isDefined) {
throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
"for temporary tables. If the table name has dots (.) in it, please quote the " +
"table name with backticks (`).")
}
+ if (conf.caseSensitiveAnalysis) {
+ tableIdent.table
+ } else {
+ tableIdent.table.toLowerCase
+ }
}
}
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
- val tables = new ConcurrentHashMap[String, LogicalPlan]
-
- override def registerTable(
- tableIdentifier: Seq[String],
- plan: LogicalPlan): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.put(getDbTableName(tableIdent), plan)
+ private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
+
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ tables.put(getTableName(tableIdent), plan)
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.remove(getDbTableName(tableIdent))
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ tables.remove(getTableName(tableIdent))
}
override def unregisterAllTables(): Unit = {
tables.clear()
}
- override def tableExists(tableIdentifier: Seq[String]): Boolean = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- tables.containsKey(getDbTableName(tableIdent))
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ tables.containsKey(getTableName(tableIdent))
}
override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- val tableFullName = getDbTableName(tableIdent)
- val table = tables.get(tableFullName)
+ val tableName = getTableName(tableIdent)
+ val table = tables.get(tableName)
if (table == null) {
- sys.error(s"Table Not Found: $tableFullName")
+ throw new NoSuchTableException
}
- val tableWithQualifiers = Subquery(tableIdent.last, table)
+ val tableWithQualifiers = Subquery(tableName, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
@@ -146,11 +114,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- val result = ArrayBuffer.empty[(String, Boolean)]
- for (name <- tables.keySet().asScala) {
- result += ((name, true))
- }
- result
+ tables.keySet().asScala.map(_ -> true).toSeq
}
override def refreshTable(tableIdent: TableIdentifier): Unit = {
@@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
* lost when the JVM exits.
*/
trait OverrideCatalog extends Catalog {
+ private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
- // TODO: This doesn't work when the database changes...
- val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]()
-
- abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- // A temporary tables only has a single part in the tableIdentifier.
- val overriddenTable = if (tableIdentifier.length > 1) {
- None: Option[LogicalPlan]
+ private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
+ if (tableIdent.database.isDefined) {
+ None
} else {
- overrides.get(getDBTable(tableIdent))
+ Option(overrides.get(getTableName(tableIdent)))
}
- overriddenTable match {
+ }
+
+ abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ getOverriddenTable(tableIdent) match {
case Some(_) => true
- case None => super.tableExists(tableIdentifier)
+ case None => super.tableExists(tableIdent)
}
}
abstract override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- // A temporary tables only has a single part in the tableIdentifier.
- val overriddenTable = if (tableIdentifier.length > 1) {
- None: Option[LogicalPlan]
- } else {
- overrides.get(getDBTable(tableIdent))
- }
- val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
+ getOverriddenTable(tableIdent) match {
+ case Some(table) =>
+ val tableName = getTableName(tableIdent)
+ val tableWithQualifiers = Subquery(tableName, table)
- // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
- // properly qualified with this alias.
- val withAlias =
- tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
+ // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
+ // are properly qualified with this alias.
+ alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
- withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
+ case None => super.lookupRelation(tableIdent, alias)
+ }
}
abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- // We always return all temporary tables.
- val temporaryTables = overrides.map {
- case ((_, tableName), _) => (tableName, true)
- }.toSeq
-
- temporaryTables ++ super.getTables(databaseName)
+ overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
}
- override def registerTable(
- tableIdentifier: Seq[String],
- plan: LogicalPlan): Unit = {
- checkTableIdentifier(tableIdentifier)
- val tableIdent = processTableIdentifier(tableIdentifier)
- overrides.put(getDBTable(tableIdent), plan)
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+ overrides.put(getTableName(tableIdent), plan)
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
- // A temporary tables only has a single part in the tableIdentifier.
- // If tableIdentifier has more than one parts, it is not a temporary table
- // and we do not need to do anything at here.
- if (tableIdentifier.length == 1) {
- val tableIdent = processTableIdentifier(tableIdentifier)
- overrides.remove(getDBTable(tableIdent))
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+ if (tableIdent.database.isEmpty) {
+ overrides.remove(getTableName(tableIdent))
}
}
@@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog {
override val conf: CatalystConf = EmptyConf
- override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
throw new UnsupportedOperationException
}
override def lookupRelation(
- tableIdentifier: Seq[String],
+ tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
throw new UnsupportedOperationException
}
@@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog {
throw new UnsupportedOperationException
}
- override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
- override def unregisterAllTables(): Unit = {}
+ override def unregisterAllTables(): Unit = {
+ throw new UnsupportedOperationException
+ }
override def refreshTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 43ee319193..c973650039 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.errors
+import org.apache.spark.sql.catalyst.{TableIdentifier, errors}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
- tableIdentifier: Seq[String],
+ tableIdentifier: TableIdentifier,
alias: Option[String] = None) extends LeafNode {
/** Returns a `.` separated name for this relation. */
- def tableName: String = tableIdentifier.mkString(".")
+ def tableName: String = tableIdentifier.unquotedString
override def output: Seq[Attribute] = Nil
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 699c4cc63d..27b3cd84b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -286,7 +286,8 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
- analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
+ analysis.UnresolvedRelation(TableIdentifier(tableName)),
+ Map.empty, logicalPlan, overwrite, false)
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan))
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 820b336aac..ec05cfa63c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -53,32 +54,39 @@ class AnalysisSuite extends AnalysisTest {
Project(testRelation.output, testRelation))
checkAnalysis(
- Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("TbL.a")),
+ UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation))
assertAnalysisError(
- Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Seq("cannot resolve"))
checkAnalysis(
- Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation),
caseSensitive = false)
checkAnalysis(
- Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+ Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+ TableIdentifier("TaBlE"), Some("TbL"))),
Project(testRelation.output, testRelation),
caseSensitive = false)
}
test("resolve relations") {
- assertAnalysisError(UnresolvedRelation(Seq("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+ assertAnalysisError(
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
- checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation)
+ checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
- checkAnalysis(UnresolvedRelation(Seq("tAbLe"), None), testRelation, caseSensitive = false)
+ checkAnalysis(
+ UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
- checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation, caseSensitive = false)
+ checkAnalysis(
+ UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
}
test("divide should be casted into fractional types") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 53b3695a86..23861ed15d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
trait AnalysisTest extends PlanTest {
@@ -30,8 +31,8 @@ trait AnalysisTest extends PlanTest {
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
- caseSensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
- caseInsensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
+ caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+ caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
@@ -67,8 +68,7 @@ trait AnalysisTest extends PlanTest {
expectedErrors: Seq[String],
caseSensitive: Boolean = true): Unit = {
val analyzer = getAnalyzer(caseSensitive)
- // todo: make sure we throw AnalysisException during analysis
- val e = intercept[Exception] {
+ val e = intercept[AnalysisException] {
analyzer.checkAnalysis(analyzer.execute(inputPlan))
}
assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b4ad618c23..40c4ae7920 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SimpleCatalystConf(true)
@@ -47,7 +47,7 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
val b: Expression = UnresolvedAttribute("b")
before {
- catalog.registerTable(Seq("table"), relation)
+ catalog.registerTable(TableIdentifier("table"), relation)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 97a8b6518a..eacdea2c1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}
+import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
/**
* :: Experimental ::
@@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
- DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+ DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName)))
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03e973666e..764510ab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val overwrite = mode == SaveMode.Overwrite
df.sqlContext.executePlan(
InsertIntoTable(
- UnresolvedRelation(tableIdent.toSeq),
+ UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
df.logicalPlan,
overwrite,
@@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq)
+ val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3d5e35ab31..361eb576c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -714,7 +714,7 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- catalog.registerTable(Seq(tableName), df.logicalPlan)
+ catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
}
/**
@@ -728,7 +728,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
- catalog.unregisterTable(Seq(tableName))
+ catalog.unregisterTable(TableIdentifier(tableName))
}
/**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
}
private def table(tableIdent: TableIdentifier): DataFrame = {
- DataFrame(this, catalog.lookupRelation(tableIdent.toSeq))
+ DataFrame(this, catalog.lookupRelation(tableIdent))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index f7a88b98c0..446739d5b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
case e ~ tableIdent =>
- DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined)
+ DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31d6b75e13..e7deeff13d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -71,7 +71,6 @@ case class CreateTableUsing(
* can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
case class CreateTableUsingAsSelect(
tableIdent: TableIdentifier,
provider: String,
@@ -93,7 +92,7 @@ case class CreateTempTableUsing(
val resolved = ResolvedDataSource(
sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect(
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq)
+ val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 8efc8016f9..b00e5680fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) {
+ if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
// Need to remove SubQuery operator.
- EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 356d4ff3fa..fd566c8276 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.PhysicalRDD
import scala.concurrent.duration._
@@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
testData.select('key).registerTempTable("t1")
sqlContext.table("t1")
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
}
test("Drops cached temporary table") {
@@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert(sqlContext.isCached("t2"))
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
assert(!sqlContext.isCached("t2"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7a027e1308..b1fb068158 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.test.SharedSQLContext
@@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")
- val left = UnresolvedRelation(Seq("left"), None)
- val right = UnresolvedRelation(Seq("right"), None)
+ val left = UnresolvedRelation(TableIdentifier("left"), None)
+ val right = UnresolvedRelation(TableIdentifier("right"), None)
checkAnswer(
left.join(right, $"left.N" === $"right.N", "full"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index eab0fbb196..5688f46e5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.TableIdentifier
class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
import testImplicits._
@@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}
test("get all tables") {
@@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
@@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index cc02ef81c9..baff7f5752 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
@@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e620d7fb82..4d8a3f728e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -358,7 +358,7 @@ class HiveContext private[hive](
@Experimental
def analyze(tableName: String) {
val tableIdent = SqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq))
+ val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent))
relation match {
case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1f8223e1ff..5819cb9d08 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.execution.{FileRelation, datasources}
@@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
- // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
- case class QualifiedTableName(database: String, name: String) {
- def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+ case class QualifiedTableName(database: String, name: String)
+
+ private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+ QualifiedTableName(
+ tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+ tableIdent.table.toLowerCase)
+ }
+
+ private def getQualifiedTableName(hiveTable: HiveTable) = {
+ QualifiedTableName(
+ hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+ hiveTable.name.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
def invalidateTable(tableIdent: TableIdentifier): Unit = {
- val databaseName = tableIdent.database.getOrElse(client.currentDatabase)
- val tableName = tableIdent.table
-
- cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
- }
-
- val caseSensitive: Boolean = false
-
- /**
- * Creates a data source table (a table created with USING clause) in Hive's metastore.
- * Returns true when the table has been created. Otherwise, false.
- */
- // TODO: Remove this in SPARK-10104.
- def createDataSourceTable(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- createDataSourceTable(
- SqlParser.parseTableIdentifier(tableName),
- userSpecifiedSchema,
- partitionColumns,
- provider,
- options,
- isExternal)
+ cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
def createDataSourceTable(
@@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
- val (dbName, tblName) = {
- val database = tableIdent.database.getOrElse(client.currentDatabase)
- processDatabaseAndTableName(database, tableIdent.table)
- }
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
@@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
- val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
@@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
(None, message)
}
- (hiveCompitiableTable, logMessage) match {
+ (hiveCompatibleTable, logMessage) match {
case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatiable way.
+ // We first try to save the metadata of the table in a Hive compatible way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
@@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
- def hiveDefaultTableFilePath(tableName: String): String = {
- hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName))
- }
-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
- val database = tableIdent.database.getOrElse(client.currentDatabase)
-
- new Path(
- new Path(client.getDatabase(database).location),
- tableIdent.table.toLowerCase).toString
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ new Path(new Path(client.getDatabase(dbName).location), tblName).toString
}
- def tableExists(tableIdentifier: Seq[String]): Boolean = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName =
- tableIdent
- .lift(tableIdent.size - 2)
- .getOrElse(client.currentDatabase)
- val tblName = tableIdent.last
- client.getTableOption(databaseName, tblName).isDefined
+ override def tableExists(tableIdent: TableIdentifier): Boolean = {
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+ client.getTableOption(dbName, tblName).isDefined
}
- def lookupRelation(
- tableIdentifier: Seq[String],
+ override def lookupRelation(
+ tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
- val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
- client.currentDatabase)
- val tblName = tableIdent.last
- val table = client.getTable(databaseName, tblName)
+ val qualifiedTableName = getQualifiedTableName(tableIdent)
+ val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
if (table.properties.get("spark.sql.sources.provider").isDefined) {
- val dataSourceTable =
- cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
+ val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
- val withAlias =
- alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
- Subquery(tableIdent.last, dataSourceTable))
-
- withAlias
+ alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
} else if (table.tableType == VirtualView) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
@@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
}
} else {
- MetastoreRelation(databaseName, tblName, alias)(table)(hive)
+ MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
}
}
@@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
client.listTables(db).map(tableName => (tableName, false))
}
- protected def processDatabaseAndTableName(
- databaseName: Option[String],
- tableName: String): (Option[String], String) = {
- if (!caseSensitive) {
- (databaseName.map(_.toLowerCase), tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
- protected def processDatabaseAndTableName(
- databaseName: String,
- tableName: String): (String, String) = {
- if (!caseSensitive) {
- (databaseName.toLowerCase, tableName.toLowerCase)
- } else {
- (databaseName, tableName)
- }
- }
-
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
@@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}
- val (dbName, tblName) = processDatabaseAndTableName(
- table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateViewAsSelect(
table.copy(
@@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.name),
- hive.conf.defaultDataSourceName,
+ conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
mode,
@@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
table
}
- val (dbName, tblName) =
- processDatabaseAndTableName(
- desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateTableAsSelect(
desc.copy(
@@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+ override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
@@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 1d50501940..d4ff5cc0f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{logical, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveShim._
@@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging {
throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
}
- protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
- val (db, tableName) =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
- }
-
- (db, tableName)
- }
-
- protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+ protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
tableNameParts.getChildren.asScala.map {
case Token(part, Nil) => cleanIdentifier(part)
} match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
+ case Seq(tableOnly) => TableIdentifier(tableOnly)
+ case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
case other => sys.error("Hive only supports tables names like 'tableName' " +
s"or 'databaseName.tableName', found '$other'")
}
@@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
properties: Map[String, String],
allowExist: Boolean,
replace: Boolean): CreateViewAsSelect = {
- val (db, viewName) = extractDbNameTableName(viewNameParts)
+ val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
val originalText = context.getTokenRewriteStream
.toString(query.getTokenStartIndex, query.getTokenStopIndex)
val tableDesc = HiveTable(
- specifiedDatabase = db,
+ specifiedDatabase = dbName,
name = viewName,
schema = schema,
partitionColumns = Seq.empty[HiveColumn],
@@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
- UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined)
+ UnresolvedRelation(TableIdentifier(tableName.getText), None),
+ isExtended = extended.isDefined)
}
}
// All other cases.
@@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
- val (db, tableName) = extractDbNameTableName(tableNameParts)
+ val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
// TODO add bucket support
var tableDesc: HiveTable = HiveTable(
- specifiedDatabase = db,
- name = tableName,
+ specifiedDatabase = dbName,
+ name = tblName,
schema = Seq.empty[HiveColumn],
partitionColumns = Seq.empty[HiveColumn],
properties = Map[String, String](),
@@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
nonAliasClauses)
}
- val tableIdent =
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => Seq(tableOnly)
- case Seq(databaseName, table) => Seq(databaseName, table)
- case other => sys.error("Hive only supports tables names like 'tableName' " +
- s"or 'databaseName.tableName', found '$other'")
- }
+ val tableIdent = extractTableIdent(tableNameParts)
val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
val relation = UnresolvedRelation(tableIdent, alias)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 8422287e17..e72a60b42e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
@@ -37,8 +38,7 @@ case class CreateTableAsSelect(
allowExisting: Boolean)
extends RunnableCommand {
- def database: String = tableDesc.database
- def tableName: String = tableDesc.name
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
override def children: Seq[LogicalPlan] = Seq(query)
@@ -72,18 +72,18 @@ case class CreateTableAsSelect(
hiveContext.catalog.client.createTable(withSchema)
// Get the Metastore Relation
- hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
+ hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
- throw new AnalysisException(s"$database.$tableName already exists.")
+ throw new AnalysisException(s"$tableIdentifier already exists.")
}
} else {
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
}
override def argString: String = {
- s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]"
+ s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 2b504ac974..2c81115ee4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
@@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect(
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
+ val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- val database = tableDesc.database
- val viewName = tableDesc.name
- if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+ if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// view already exists, will do nothing, to keep consistent with Hive
} else if (orReplace) {
hiveContext.catalog.client.alertView(prepareTable())
} else {
- throw new AnalysisException(s"View $database.$viewName already exists. " +
+ throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 51ec92afd0..94210a5394 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(Seq(tableName))
+ hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
@@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,
@@ -131,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (hiveContext.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
@@ -160,7 +159,6 @@ case class CreateMetastoreDataSource(
}
}
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
private[hive]
case class CreateMetastoreDataSourceAsSelect(
tableIdent: TableIdentifier,
@@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect(
}
var existingSchema = None: Option[StructType]
- if (sqlContext.catalog.tableExists(tableIdent.toSeq)) {
+ if (sqlContext.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect(
val resolved = ResolvedDataSource(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
- EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ff39ccb7c1..6883d305cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
+ logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
val referencedTestTables = referencedTables.filter(testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index c8d272794d..8c4af1b8ea 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.SaveMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.util.Utils;
public class JavaMetastoreDataSourcesSuite {
@@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite {
if (path.exists()) {
path.delete();
}
- hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+ hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+ new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 579631df77..183aca29cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
@@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def beforeAll(): Unit = {
// The catalog in HiveContext is a case insensitive one.
- catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan)
+ catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
}
override def afterAll(): Unit = {
- catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d356538000..d292887688 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.util.Utils
/**
@@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|)
""".stripMargin)
- val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+ val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
intercept[IOException] {
- read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+ read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
}
}
@@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Manually create a metastore data source table.
catalog.createDataSourceTable(
- tableName = "wide_schema",
+ tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
provider = "json",
@@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
"EXTERNAL" -> "FALSE"),
tableType = ManagedTable,
serdeProperties = Map(
- "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+ "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
catalog.client.createTable(hiveTable)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6a692d6fce..9bb32f11b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.reflect.ClassTag
import org.apache.spark.sql.{Row, SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
+ hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
intercept[UnsupportedOperationException] {
hiveContext.analyze("tempTable")
}
- hiveContext.catalog.unregisterTable(Seq("tempTable"))
+ hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6aa34605b0..c929ba5068 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.DefaultParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries}
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
- val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+ val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
relation match {
case LogicalRelation(r: ParquetRelation, _) =>
if (!isDataSourceParquet) {
@@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
- catalog.lookupRelation(Seq(tableName))
+ catalog.lookupRelation(TableIdentifier(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5eb39b1129..7efeab528c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- catalog.unregisterTable(Seq("tmp"))
+ catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {