From c6ea6d4b7dfb7cad440025b13a584d5200a4ef8f Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Sat, 10 Jan 2015 13:23:09 -0800 Subject: [SPARK-4943][SQL] Allow table name having dot for db/catalog The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull. Author: Alex Liu Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits: 343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review 29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests 6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error 3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ... (cherry picked from commit 4b39fd1e63188821fc84a13f7ccb6e94277f4be7) Signed-off-by: Michael Armbrust Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 6 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 8 +- .../spark/sql/catalyst/analysis/Catalog.scala | 106 ++++++++++----------- .../spark/sql/catalyst/analysis/unresolved.scala | 3 +- .../apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 20 ++-- .../catalyst/analysis/DecimalPrecisionSuite.scala | 2 +- 7 files changed, 70 insertions(+), 77 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index a2bcd73b60..3141942aba 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -177,10 +177,10 @@ class SqlParser extends AbstractSparkSQLParser { joinedRelation | relationFactor protected lazy val relationFactor: Parser[LogicalPlan] = - ( ident ~ (opt(AS) ~> opt(ident)) ^^ { - case tableName ~ alias => UnresolvedRelation(None, tableName, alias) + ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ { + case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) } - | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } + | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } ) protected lazy val joinedRelation: Parser[LogicalPlan] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index facbd8b975..afa6b17f27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -136,11 +136,11 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool */ object ResolveRelations extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) => + case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) => i.copy( - table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias))) - case UnresolvedRelation(databaseName, name, alias) => - catalog.lookupRelation(databaseName, name, alias) + table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias))) + case UnresolvedRelation(tableIdentifier, alias) => + catalog.lookupRelation(tableIdentifier, alias) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 0415d74bd8..df8d03b86c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -28,77 +28,74 @@ trait Catalog { def caseSensitive: Boolean - def tableExists(db: Option[String], tableName: String): Boolean + def tableExists(tableIdentifier: Seq[String]): Boolean def lookupRelation( - databaseName: Option[String], - tableName: String, - alias: Option[String] = None): LogicalPlan + tableIdentifier: Seq[String], + alias: Option[String] = None): LogicalPlan - def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit + def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit - def unregisterTable(databaseName: Option[String], tableName: String): Unit + def unregisterTable(tableIdentifier: Seq[String]): Unit def unregisterAllTables(): Unit - protected def processDatabaseAndTableName( - databaseName: Option[String], - tableName: String): (Option[String], String) = { + protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = { if (!caseSensitive) { - (databaseName.map(_.toLowerCase), tableName.toLowerCase) + tableIdentifier.map(_.toLowerCase) } else { - (databaseName, tableName) + tableIdentifier } } - protected def processDatabaseAndTableName( - databaseName: String, - tableName: String): (String, String) = { - if (!caseSensitive) { - (databaseName.toLowerCase, tableName.toLowerCase) + protected def getDbTableName(tableIdent: Seq[String]): String = { + val size = tableIdent.size + if (size <= 2) { + tableIdent.mkString(".") } else { - (databaseName, tableName) + tableIdent.slice(size - 2, size).mkString(".") } } + + protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = { + (tableIdent.lift(tableIdent.size - 2), tableIdent.last) + } } class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() override def registerTable( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - tables += ((tblName, plan)) + val tableIdent = processTableIdentifier(tableIdentifier) + tables += ((getDbTableName(tableIdent), plan)) } - override def unregisterTable( - databaseName: Option[String], - tableName: String) = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - tables -= tblName + override def unregisterTable(tableIdentifier: Seq[String]) = { + val tableIdent = processTableIdentifier(tableIdentifier) + tables -= getDbTableName(tableIdent) } override def unregisterAllTables() = { tables.clear() } - override def tableExists(db: Option[String], tableName: String): Boolean = { - val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - tables.get(tblName) match { + override def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier) + tables.get(getDbTableName(tableIdent)) match { case Some(_) => true case None => false } } override def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName")) - val tableWithQualifiers = Subquery(tblName, table) + val tableIdent = processTableIdentifier(tableIdentifier) + val tableFullName = getDbTableName(tableIdent) + val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName")) + val tableWithQualifiers = Subquery(tableIdent.last, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. @@ -117,41 +114,39 @@ trait OverrideCatalog extends Catalog { // TODO: This doesn't work when the database changes... val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]() - abstract override def tableExists(db: Option[String], tableName: String): Boolean = { - val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - overrides.get((dbName, tblName)) match { + abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.get(getDBTable(tableIdent)) match { case Some(_) => true - case None => super.tableExists(db, tableName) + case None => super.tableExists(tableIdentifier) } } abstract override def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None): LogicalPlan = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val overriddenTable = overrides.get((dbName, tblName)) - val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r)) + val tableIdent = processTableIdentifier(tableIdentifier) + val overriddenTable = overrides.get(getDBTable(tableIdent)) + val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r)) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. val withAlias = tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r)) - withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias)) + withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias)) } override def registerTable( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - overrides.put((dbName, tblName), plan) + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.put(getDBTable(tableIdent), plan) } - override def unregisterTable(databaseName: Option[String], tableName: String): Unit = { - val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - overrides.remove((dbName, tblName)) + override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + val tableIdent = processTableIdentifier(tableIdentifier) + overrides.remove(getDBTable(tableIdent)) } override def unregisterAllTables(): Unit = { @@ -167,22 +162,21 @@ object EmptyCatalog extends Catalog { val caseSensitive: Boolean = true - def tableExists(db: Option[String], tableName: String): Boolean = { + def tableExists(tableIdentifier: Seq[String]): Boolean = { throw new UnsupportedOperationException } def lookupRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None) = { throw new UnsupportedOperationException } - def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = { + def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } - def unregisterTable(databaseName: Option[String], tableName: String): Unit = { + def unregisterTable(tableIdentifier: Seq[String]): Unit = { throw new UnsupportedOperationException } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 77d84e1687..71a738a0b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. */ case class UnresolvedRelation( - databaseName: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String] = None) extends LeafNode { override def output = Nil override lazy val resolved = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 70dabc4e6c..c07b7ec917 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -279,7 +279,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false) = InsertIntoTable( - analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite) + analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite) def analyze = analysis.SimpleAnalyzer(logicalPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 33a3cba3d4..a8bc773619 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -42,8 +42,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { AttributeReference("e", ShortType)()) before { - caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation) - caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation) + caseSensitiveCatalog.registerTable(Seq("TaBlE"), testRelation) + caseInsensitiveCatalog.registerTable(Seq("TaBlE"), testRelation) } test("analyze project") { @@ -54,45 +54,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert( caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) val e = intercept[TreeNodeException[_]] { caseSensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) } assert(e.getMessage().toLowerCase.contains("unresolved")) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("TbL.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) assert( caseInsensitiveAnalyze( Project(Seq(UnresolvedAttribute("tBl.a")), - UnresolvedRelation(None, "TaBlE", Some("TbL")))) === + UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) === Project(testRelation.output, testRelation)) } test("resolve relations") { val e = intercept[RuntimeException] { - caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) + caseSensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) } assert(e.getMessage == "Table Not Found: tAbLe") assert( - caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) === + caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) === + caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) === testRelation) assert( - caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) === + caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index d5b7d2789a..fc7a001872 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter { val f: Expression = UnresolvedAttribute("f") before { - catalog.registerTable(None, "table", relation) + catalog.registerTable(Seq("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { -- cgit v1.2.3