aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAlex Liu <alex_liu68@yahoo.com>2015-01-10 13:23:09 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-10 13:42:36 -0800
commitc6ea6d4b7dfb7cad440025b13a584d5200a4ef8f (patch)
tree9848611ffda0f76bf0ebb961cffcbb4b1c34feec /sql/catalyst
parent09eef3b5cd27f1f4e5d0a63440efe1d59436dbf7 (diff)
downloadspark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.tar.gz
spark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.tar.bz2
spark-c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f.zip
[SPARK-4943][SQL] Allow table name having dot for db/catalog
The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull. Author: Alex Liu <alex_liu68@yahoo.com> Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits: 343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review 29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests 6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error 3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ... (cherry picked from commit 4b39fd1e63188821fc84a13f7ccb6e94277f4be7) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
Diffstat (limited to 'sql/catalyst')
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala106
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala3
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala20
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala2
7 files changed, 70 insertions, 77 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index a2bcd73b60..3141942aba 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -177,10 +177,10 @@ class SqlParser extends AbstractSparkSQLParser {
joinedRelation | relationFactor
protected lazy val relationFactor: Parser[LogicalPlan] =
- ( ident ~ (opt(AS) ~> opt(ident)) ^^ {
- case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
+ ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+ case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
}
- | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
+ | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
)
protected lazy val joinedRelation: Parser[LogicalPlan] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index facbd8b975..afa6b17f27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -136,11 +136,11 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
+ case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
i.copy(
- table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
- case UnresolvedRelation(databaseName, name, alias) =>
- catalog.lookupRelation(databaseName, name, alias)
+ table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
+ case UnresolvedRelation(tableIdentifier, alias) =>
+ catalog.lookupRelation(tableIdentifier, alias)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 0415d74bd8..df8d03b86c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,77 +28,74 @@ trait Catalog {
def caseSensitive: Boolean
- def tableExists(db: Option[String], tableName: String): Boolean
+ def tableExists(tableIdentifier: Seq[String]): Boolean
def lookupRelation(
- databaseName: Option[String],
- tableName: String,
- alias: Option[String] = None): LogicalPlan
+ tableIdentifier: Seq[String],
+ alias: Option[String] = None): LogicalPlan
- def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+ def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
- def unregisterTable(databaseName: Option[String], tableName: String): Unit
+ def unregisterTable(tableIdentifier: Seq[String]): Unit
def unregisterAllTables(): Unit
- protected def processDatabaseAndTableName(
- databaseName: Option[String],
- tableName: String): (Option[String], String) = {
+ protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
- (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+ tableIdentifier.map(_.toLowerCase)
} else {
- (databaseName, tableName)
+ tableIdentifier
}
}
- protected def processDatabaseAndTableName(
- databaseName: String,
- tableName: String): (String, String) = {
- if (!caseSensitive) {
- (databaseName.toLowerCase, tableName.toLowerCase)
+ protected def getDbTableName(tableIdent: Seq[String]): String = {
+ val size = tableIdent.size
+ if (size <= 2) {
+ tableIdent.mkString(".")
} else {
- (databaseName, tableName)
+ tableIdent.slice(size - 2, size).mkString(".")
}
}
+
+ protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
+ (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
+ }
}
class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
override def registerTable(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- tables += ((tblName, plan))
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables += ((getDbTableName(tableIdent), plan))
}
- override def unregisterTable(
- databaseName: Option[String],
- tableName: String) = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- tables -= tblName
+ override def unregisterTable(tableIdentifier: Seq[String]) = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables -= getDbTableName(tableIdent)
}
override def unregisterAllTables() = {
tables.clear()
}
- override def tableExists(db: Option[String], tableName: String): Boolean = {
- val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- tables.get(tblName) match {
+ override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ tables.get(getDbTableName(tableIdent)) match {
case Some(_) => true
case None => false
}
}
override def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
- val tableWithQualifiers = Subquery(tblName, table)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val tableFullName = getDbTableName(tableIdent)
+ val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+ val tableWithQualifiers = Subquery(tableIdent.last, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
@@ -117,41 +114,39 @@ trait OverrideCatalog extends Catalog {
// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
- abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
- val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- overrides.get((dbName, tblName)) match {
+ abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.get(getDBTable(tableIdent)) match {
case Some(_) => true
- case None => super.tableExists(db, tableName)
+ case None => super.tableExists(tableIdentifier)
}
}
abstract override def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val overriddenTable = overrides.get((dbName, tblName))
- val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val overriddenTable = overrides.get(getDBTable(tableIdent))
+ val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
val withAlias =
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
- withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
+ withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
}
override def registerTable(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- overrides.put((dbName, tblName), plan)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.put(getDBTable(tableIdent), plan)
}
- override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- overrides.remove((dbName, tblName))
+ override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ overrides.remove(getDBTable(tableIdent))
}
override def unregisterAllTables(): Unit = {
@@ -167,22 +162,21 @@ object EmptyCatalog extends Catalog {
val caseSensitive: Boolean = true
- def tableExists(db: Option[String], tableName: String): Boolean = {
+ def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
}
def lookupRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None) = {
throw new UnsupportedOperationException
}
- def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
+ def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
- def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
+ def unregisterTable(tableIdentifier: Seq[String]): Unit = {
throw new UnsupportedOperationException
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 77d84e1687..71a738a0b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
- databaseName: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 70dabc4e6c..c07b7ec917 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -279,7 +279,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false) =
InsertIntoTable(
- analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
+ analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
def analyze = analysis.SimpleAnalyzer(logicalPlan)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 33a3cba3d4..a8bc773619 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -42,8 +42,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
AttributeReference("e", ShortType)())
before {
- caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
- caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
+ caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
+ caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation)
}
test("analyze project") {
@@ -54,45 +54,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
val e = intercept[TreeNodeException[_]] {
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL"))))
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL"))))
}
assert(e.getMessage().toLowerCase.contains("unresolved"))
assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
- UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
+ UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
}
test("resolve relations") {
val e = intercept[RuntimeException] {
- caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
+ caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None))
}
assert(e.getMessage == "Table Not Found: tAbLe")
assert(
- caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+ caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
+ caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) ===
testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
+ caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
testRelation)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index d5b7d2789a..fc7a001872 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val f: Expression = UnresolvedAttribute("f")
before {
- catalog.registerTable(None, "table", relation)
+ catalog.registerTable(Seq("table"), relation)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {