diff options
author | Alex Liu <alex_liu68@yahoo.com> | 2015-01-10 13:23:09 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-01-10 13:23:09 -0800 |
commit | 4b39fd1e63188821fc84a13f7ccb6e94277f4be7 (patch) | |
tree | 3c98f9ff0d0562c6096ff0e82189bf543ab003bd /sql/hive | |
parent | 1e56eba5d906bef793dfd6f199db735a6116a764 (diff) | |
download | spark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.tar.gz spark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.tar.bz2 spark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.zip |
[SPARK-4943][SQL] Allow table name having dot for db/catalog
The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull.
Author: Alex Liu <alex_liu68@yahoo.com>
Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits:
343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review
29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests
6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error
3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ...
Diffstat (limited to 'sql/hive')
7 files changed, 66 insertions, 29 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 982e0593fc..1648fa826b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * in the Hive metastore. */ def analyze(tableName: String) { - val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) + val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName))) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec250..2c859894cf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} +import org.apache.hadoop.hive.ql.metadata.InvalidTableException import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} @@ -57,18 +58,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false - def tableExists(db: Option[String], tableName: String): Boolean = { - val (databaseName, tblName) = processDatabaseAndTableName( - db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) - client.getTable(databaseName, tblName, false) != null + def tableExists(tableIdentifier: Seq[String]): Boolean = { + val tableIdent = processTableIdentifier(tableIdentifier) + val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( + hive.sessionState.getCurrentDatabase) + val tblName = tableIdent.last + try { + client.getTable(databaseName, tblName) != null + } catch { + case ie: InvalidTableException => false + } } def lookupRelation( - db: Option[String], - tableName: String, + tableIdentifier: Seq[String], alias: Option[String]): LogicalPlan = synchronized { - val (databaseName, tblName) = - processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) + val tableIdent = processTableIdentifier(tableIdentifier) + val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( + hive.sessionState.getCurrentDatabase) + val tblName = tableIdent.last val table = client.getTable(databaseName, tblName) if (table.isView) { // if the unresolved relation is from hive view @@ -251,6 +259,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } + protected def processDatabaseAndTableName( + databaseName: Option[String], + tableName: String): (Option[String], String) = { + if (!caseSensitive) { + (databaseName.map(_.toLowerCase), tableName.toLowerCase) + } else { + (databaseName, tableName) + } + } + + protected def processDatabaseAndTableName( + databaseName: String, + tableName: String): (String, String) = { + if (!caseSensitive) { + (databaseName.toLowerCase, tableName.toLowerCase) + } else { + (databaseName, tableName) + } + } + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. @@ -270,7 +298,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) // Get the CreateTableDesc from Hive SemanticAnalyzer - val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) { + val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) { None } else { val sa = new SemanticAnalyzer(hive.hiveconf) { @@ -352,15 +380,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def registerTable( - databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ??? + override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = ??? /** * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def unregisterTable( - databaseName: Option[String], tableName: String): Unit = ??? + override def unregisterTable(tableIdentifier: Seq[String]): Unit = ??? override def unregisterAllTables() = {} } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 8a9613cf96..c2ab3579c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -386,6 +386,15 @@ private[hive] object HiveQl { (db, tableName) } + protected def extractTableIdent(tableNameParts: Node): Seq[String] = { + tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { + case Seq(tableOnly) => Seq(tableOnly) + case Seq(databaseName, table) => Seq(databaseName, table) + case other => sys.error("Hive only supports tables names like 'tableName' " + + s"or 'databaseName.tableName', found '$other'") + } + } + /** * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), (k2)) * is equivalent to @@ -475,16 +484,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token(".", dbName :: tableName :: Nil) => // It is describing a table with the format like "describe db.table". // TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue. - val (db, tableName) = extractDbNameTableName(nameParts.head) + val tableIdent = extractTableIdent(nameParts.head) DescribeCommand( - UnresolvedRelation(db, tableName, None), extended.isDefined) + UnresolvedRelation(tableIdent, None), extended.isDefined) case Token(".", dbName :: tableName :: colName :: Nil) => // It is describing a column with the format like "describe db.table column". NativePlaceholder case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(None, tableName.getText, None), + UnresolvedRelation(Seq(tableName.getText), None), extended.isDefined) } } @@ -757,13 +766,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nonAliasClauses) } - val (db, tableName) = + val tableIdent = tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) + case Seq(tableOnly) => Seq(tableOnly) + case Seq(databaseName, table) => Seq(databaseName, table) + case other => sys.error("Hive only supports tables names like 'tableName' " + + s"or 'databaseName.tableName', found '$other'") } val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } - val relation = UnresolvedRelation(db, tableName, alias) + val relation = UnresolvedRelation(tableIdent, alias) // Apply sampling if requested. (bucketSampleClause orElse splitSampleClause).map { @@ -882,7 +893,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val Some(tableNameParts) :: partitionClause :: Nil = getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs) - val (db, tableName) = extractDbNameTableName(tableNameParts) + val tableIdent = extractTableIdent(tableNameParts) val partitionKeys = partitionClause.map(_.getChildren.map { // Parse partitions. We also make keys case insensitive. @@ -892,7 +903,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) - InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite) + InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index b2149bd95a..8f2311cf83 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -167,7 +167,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(databaseName, name, _) => name } + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index fe21454e7f..a547babceb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -53,14 +53,14 @@ case class CreateTableAsSelect( hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(Some(database), tableName, None) match { + hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(Some(database), tableName)) { + if (hiveContext.catalog.tableExists(Seq(database, tableName))) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6fc4153f6a..6b733a280e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -53,7 +53,7 @@ case class DropTable( val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(None, tableName) + hiveContext.catalog.unregisterTable(Seq(tableName)) Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 4b6a9308b9..a758f921e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - catalog.lookupRelation(None, tableName).statistics.sizeInBytes + catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { intercept[NotImplementedError] { analyze("tempTable") } - catalog.unregisterTable(None, "tempTable") + catalog.unregisterTable(Seq("tempTable")) } test("estimates the size of a test MetastoreRelation") { |