aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAlex Liu <alex_liu68@yahoo.com>2015-01-10 13:23:09 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-10 13:42:36 -0800
commitc6ea6d4b7dfb7cad440025b13a584d5200a4ef8f (patch)
tree9848611ffda0f76bf0ebb961cffcbb4b1c34feec /sql
parent09eef3b5cd27f1f4e5d0a63440efe1d59436dbf7 (diff)
downloadspark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.tar.gz
spark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.tar.bz2
spark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.zip
[SPARK-4943][SQL] Allow table name having dot for db/catalog
The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull. Author: Alex Liu <alex_liu68@yahoo.com> Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits: 343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review 29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests 6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error 3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ... (cherry picked from commit 4b39fd1e63188821fc84a13f7ccb6e94277f4be7) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
Diffstat (limited to 'sql')
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala106
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala3
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala20
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala52
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala4
17 files changed, 143 insertions, 113 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index a2bcd73b60..3141942aba 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -177,10 +177,10 @@ class SqlParser extends AbstractSparkSQLParser {
joinedRelation | relationFactor
protected lazy val relationFactor: Parser[LogicalPlan] =
- ( ident ~ (opt(AS) ~> opt(ident)) ^^ {
- case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
+ ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+ case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
}
- | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
+ | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
)
protected lazy val joinedRelation: Parser[LogicalPlan] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index facbd8b975..afa6b17f27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -136,11 +136,11 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
+ case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
i.copy(
- table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
- case UnresolvedRelation(databaseName, name, alias) =>
- catalog.lookupRelation(databaseName, name, alias)
+ table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
+ case UnresolvedRelation(tableIdentifier, alias) =>
+ catalog.lookupRelation(tableIdentifier, alias)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 0415d74bd8..df8d03b86c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,77 +28,74 @@ trait Catalog {
def caseSensitive: Boolean
- def tableExists(db: Option[String], tableName: String): Boolean
+ def tableExists(tableIdentifier: Seq[String]): Boolean
def lookupRelation(
- databaseName: Option[String],
- tableName: String,
- alias: Option[String] = None): LogicalPlan
+ tableIdentifier: Seq[String],
+ alias: Option[String] = None): LogicalPlan
- def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+ def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
- def unregisterTable(databaseName: Option[String], tableName: String): Unit
+ def unregisterTable(tableIdentifier: Seq[String]): Unit
def unregisterAllTables(): Unit
- protected def processDatabaseAndTableName(
- databaseName: Option[String],
- tableName: String): (Option[String], String) = {
+ protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
- (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+ tableIdentifier.map(_.toLowerCase)
} else {
- (databaseName, tableName)
+ tableIdentifier
}
}
- protected def processDatabaseAndTableName(
- databaseName: String,
- tableName: String): (String, String) = {
- if (!caseSensitive) {
- (databaseName.toLowerCase, tableName.toLowerCase)
+ protected def getDbTableName(tableIdent: Seq[String]): String = {
+ val size = tableIdent.size
+ if (size <= 2) {
+ tableIdent.mkString(".")
} else {
- (databaseName, tableName)
+ tableIdent.slice(size - 2, size).mkString(".")
}
}
+
+ protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
+ (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
+ }
}
class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
override def registerTable(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- tables += ((tblName, plan))
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables += ((getDbTableName(tableIdent), plan))
}
- override def unregisterTable(
- databaseName: Option[String],
- tableName: String) = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- tables -= tblName
+ override def unregisterTable(tableIdentifier: Seq[String]) = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables -= getDbTableName(tableIdent)
}
override def unregisterAllTables() = {
tables.clear()
}
- override def tableExists(db: Option[String], tableName: String): Boolean = {
- val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- tables.get(tblName) match {
+ override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables.get(getDbTableName(tableIdent)) match {
case Some(_) => true
case None => false
}
}
override def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
- val tableWithQualifiers = Subquery(tblName, table)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val tableFullName = getDbTableName(tableIdent)
+ val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+ val tableWithQualifiers = Subquery(tableIdent.last, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
@@ -117,41 +114,39 @@ trait OverrideCatalog extends Catalog {
// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
- abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
- val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- overrides.get((dbName, tblName)) match {
+ abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.get(getDBTable(tableIdent)) match {
case Some(_) => true
- case None => super.tableExists(db, tableName)
+ case None => super.tableExists(tableIdentifier)
}
}
abstract override def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val overriddenTable = overrides.get((dbName, tblName))
- val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val overriddenTable = overrides.get(getDBTable(tableIdent))
+ val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
val withAlias =
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
- withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
+ withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
}
override def registerTable(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- overrides.put((dbName, tblName), plan)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.put(getDBTable(tableIdent), plan)
}
- override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- overrides.remove((dbName, tblName))
+ override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.remove(getDBTable(tableIdent))
}
override def unregisterAllTables(): Unit = {
@@ -167,22 +162,21 @@ object EmptyCatalog extends Catalog {
val caseSensitive: Boolean = true
- def tableExists(db: Option[String], tableName: String): Boolean = {
+ def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
}
def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None) = {
throw new UnsupportedOperationException
}
- def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
+ def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
- def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
+ def unregisterTable(tableIdentifier: Seq[String]): Unit = {
throw new UnsupportedOperationException
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 77d84e1687..71a738a0b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 70dabc4e6c..c07b7ec917 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -279,7 +279,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false) =
InsertIntoTable(
- analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
+ analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
def analyze = analysis.SimpleAnalyzer(logicalPlan)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 33a3cba3d4..a8bc773619 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -42,8 +42,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
AttributeReference("e", ShortType)())
before {
- caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
- caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
+ caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
+ caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
}
test("analyze project") {
@@ -54,45 +54,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
val e = intercept[TreeNodeException[_]] {
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL"))))
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL"))))
}
assert(e.getMessage().toLowerCase.contains("unresolved"))
assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
}
test("resolve relations") {
val e = intercept[RuntimeException] {
- caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
+ caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None))
}
assert(e.getMessage == "Table Not Found: tAbLe")
assert(
- caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+ caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
+ caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) ===
testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+ caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
testRelation)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index d5b7d2789a..fc7a001872 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val f: Expression = UnresolvedAttribute("f")
before {
- catalog.registerTable(None, "table", relation)
+ catalog.registerTable(Seq("table"), relation)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ebd4cc920b..832d5b9938 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
- catalog.registerTable(None, tableName, rdd.queryExecution.logical)
+ catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
}
/**
@@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def dropTempTable(tableName: String): Unit = {
tryUncacheQuery(table(tableName))
- catalog.unregisterTable(None, tableName)
+ catalog.unregisterTable(Seq(tableName))
}
/**
@@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
- new SchemaRDD(this, catalog.lookupRelation(None, tableName))
+ new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
/**
* :: DeveloperApi ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fd5f4abcbc..3cf9209465 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit =
- sqlContext.executePlan(
- InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
+ sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
+ Map.empty, logicalPlan, overwrite)).toRdd
/**
* :: Experimental ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8b4cf5bac0..1dcbaf525e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -296,8 +296,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")
- val left = UnresolvedRelation(None, "left", None)
- val right = UnresolvedRelation(None, "right", None)
+ val left = UnresolvedRelation(Seq("left"), None)
+ val right = UnresolvedRelation(Seq("right"), None)
checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e006d3b469..4a6d7d01cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* in the Hive metastore.
*/
def analyze(tableName: String) {
- val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
+ val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
relation match {
case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d8b10b78c6..bbf6752a56 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -55,18 +56,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
- def tableExists(db: Option[String], tableName: String): Boolean = {
- val (databaseName, tblName) = processDatabaseAndTableName(
- db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
- client.getTable(databaseName, tblName, false) != null
+ def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+ hive.sessionState.getCurrentDatabase)
+ val tblName = tableIdent.last
+ try {
+ client.getTable(databaseName, tblName) != null
+ } catch {
+ case ie: InvalidTableException => false
+ }
}
def lookupRelation(
- db: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String]): LogicalPlan = synchronized {
- val (databaseName, tblName) =
- processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+ hive.sessionState.getCurrentDatabase)
+ val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
if (table.isView) {
// if the unresolved relation is from hive view
@@ -249,6 +257,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
+ protected def processDatabaseAndTableName(
+ databaseName: Option[String],
+ tableName: String): (Option[String], String) = {
+ if (!caseSensitive) {
+ (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+ } else {
+ (databaseName, tableName)
+ }
+ }
+
+ protected def processDatabaseAndTableName(
+ databaseName: String,
+ tableName: String): (String, String) = {
+ if (!caseSensitive) {
+ (databaseName.toLowerCase, tableName.toLowerCase)
+ } else {
+ (databaseName, tableName)
+ }
+ }
+
/**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
@@ -268,7 +296,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
// Get the CreateTableDesc from Hive SemanticAnalyzer
- val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
+ val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) {
None
} else {
val sa = new SemanticAnalyzer(hive.hiveconf) {
@@ -340,15 +368,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def registerTable(
- databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???
+ override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = ???
/**
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def unregisterTable(
- databaseName: Option[String], tableName: String): Unit = ???
+ override def unregisterTable(tableIdentifier: Seq[String]): Unit = ???
override def unregisterAllTables() = {}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index cd4e5a239e..3d39e6c1bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -393,6 +393,15 @@ private[hive] object HiveQl {
(db, tableName)
}
+ protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+ tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match {
+ case Seq(tableOnly) => Seq(tableOnly)
+ case Seq(databaseName, table) => Seq(databaseName, table)
+ case other => sys.error("Hive only supports tables names like 'tableName' " +
+ s"or 'databaseName.tableName', found '$other'")
+ }
+ }
+
protected def nodeToPlan(node: Node): LogicalPlan = node match {
// Special drop table that also uncaches.
case Token("TOK_DROPTABLE",
@@ -446,16 +455,16 @@ private[hive] object HiveQl {
case Token(".", dbName :: tableName :: Nil) =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue.
- val (db, tableName) = extractDbNameTableName(nameParts.head)
+ val tableIdent = extractTableIdent(nameParts.head)
DescribeCommand(
- UnresolvedRelation(db, tableName, None), extended.isDefined)
+ UnresolvedRelation(tableIdent, None), extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
// It is describing a column with the format like "describe db.table column".
NativePlaceholder
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
- UnresolvedRelation(None, tableName.getText, None),
+ UnresolvedRelation(Seq(tableName.getText), None),
extended.isDefined)
}
}
@@ -705,13 +714,15 @@ private[hive] object HiveQl {
nonAliasClauses)
}
- val (db, tableName) =
+ val tableIdent =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
+ case Seq(tableOnly) => Seq(tableOnly)
+ case Seq(databaseName, table) => Seq(databaseName, table)
+ case other => sys.error("Hive only supports tables names like 'tableName' " +
+ s"or 'databaseName.tableName', found '$other'")
}
val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
- val relation = UnresolvedRelation(db, tableName, alias)
+ val relation = UnresolvedRelation(tableIdent, alias)
// Apply sampling if requested.
(bucketSampleClause orElse splitSampleClause).map {
@@ -830,7 +841,7 @@ private[hive] object HiveQl {
val Some(tableNameParts) :: partitionClause :: Nil =
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
- val (db, tableName) = extractDbNameTableName(tableNameParts)
+ val tableIdent = extractTableIdent(tableNameParts)
val partitionKeys = partitionClause.map(_.getChildren.map {
// Parse partitions. We also make keys case insensitive.
@@ -840,7 +851,7 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
- InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite)
case a: ASTNode =>
throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index bb79ad5538..53b6487632 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -169,7 +169,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(databaseName, name, _) => name }
+ logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
val referencedTestTables = referencedTables.filter(testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index b83689ceab..cd3c4400db 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -56,7 +56,7 @@ case class CreateTableAsSelect(
sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
// Get the Metastore Relation
- sc.catalog.lookupRelation(Some(database), tableName, None) match {
+ sc.catalog.lookupRelation(Seq(database, tableName), None) match {
case r: MetastoreRelation => r
}
}
@@ -65,7 +65,7 @@ case class CreateTableAsSelect(
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (sc.catalog.tableExists(Some(database), tableName)) {
+ if (sc.catalog.tableExists(Seq(database, tableName))) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 903075edf7..a42a7c8588 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -56,7 +56,7 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
override protected lazy val sideEffectResult: Seq[Row] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(None, tableName)
+ hiveContext.catalog.unregisterTable(Seq(tableName))
Seq.empty[Row]
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a90fc023e6..f5a3192055 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+ catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
intercept[NotImplementedError] {
analyze("tempTable")
}
- catalog.unregisterTable(None, "tempTable")
+ catalog.unregisterTable(Seq("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {