aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAlex Liu <alex_liu68@yahoo.com>2015-01-10 13:23:09 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-10 13:23:09 -0800
commit4b39fd1e63188821fc84a13f7ccb6e94277f4be7 (patch)
tree3c98f9ff0d0562c6096ff0e82189bf543ab003bd /sql/hive
parent1e56eba5d906bef793dfd6f199db735a6116a764 (diff)
downloadspark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.tar.gz
spark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.tar.bz2
spark-4b39fd1e63188821fc84a13f7ccb6e94277f4be7.zip
[SPARK-4943][SQL] Allow table name having dot for db/catalog
The pull only fixes the parsing error and changes API to use tableIdentifier. Joining different catalog datasource related change is not done in this pull. Author: Alex Liu <alex_liu68@yahoo.com> Closes #3941 from alexliu68/SPARK-SQL-4943-3 and squashes the following commits: 343ae27 [Alex Liu] [SPARK-4943][SQL] refactoring according to review 29e5e55 [Alex Liu] [SPARK-4943][SQL] fix failed Hive CTAS tests 6ae77ce [Alex Liu] [SPARK-4943][SQL] fix TestHive matching error 3652997 [Alex Liu] [SPARK-4943][SQL] Allow table name having dot to support db/catalog ...
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala52
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala4
7 files changed, 66 insertions, 29 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 982e0593fc..1648fa826b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* in the Hive metastore.
*/
def analyze(tableName: String) {
- val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
+ val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
relation match {
case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b31a3ec250..2c859894cf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -57,18 +58,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
- def tableExists(db: Option[String], tableName: String): Boolean = {
- val (databaseName, tblName) = processDatabaseAndTableName(
- db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
- client.getTable(databaseName, tblName, false) != null
+ def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+ hive.sessionState.getCurrentDatabase)
+ val tblName = tableIdent.last
+ try {
+ client.getTable(databaseName, tblName) != null
+ } catch {
+ case ie: InvalidTableException => false
+ }
}
def lookupRelation(
- db: Option[String],
- tableName: String,
+ tableIdentifier: Seq[String],
alias: Option[String]): LogicalPlan = synchronized {
- val (databaseName, tblName) =
- processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+ val tableIdent = processTableIdentifier(tableIdentifier)
+ val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
+ hive.sessionState.getCurrentDatabase)
+ val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
if (table.isView) {
// if the unresolved relation is from hive view
@@ -251,6 +259,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
+ protected def processDatabaseAndTableName(
+ databaseName: Option[String],
+ tableName: String): (Option[String], String) = {
+ if (!caseSensitive) {
+ (databaseName.map(_.toLowerCase), tableName.toLowerCase)
+ } else {
+ (databaseName, tableName)
+ }
+ }
+
+ protected def processDatabaseAndTableName(
+ databaseName: String,
+ tableName: String): (String, String) = {
+ if (!caseSensitive) {
+ (databaseName.toLowerCase, tableName.toLowerCase)
+ } else {
+ (databaseName, tableName)
+ }
+ }
+
/**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
@@ -270,7 +298,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
// Get the CreateTableDesc from Hive SemanticAnalyzer
- val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
+ val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) {
None
} else {
val sa = new SemanticAnalyzer(hive.hiveconf) {
@@ -352,15 +380,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def registerTable(
- databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???
+ override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = ???
/**
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
- override def unregisterTable(
- databaseName: Option[String], tableName: String): Unit = ???
+ override def unregisterTable(tableIdentifier: Seq[String]): Unit = ???
override def unregisterAllTables() = {}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 8a9613cf96..c2ab3579c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -386,6 +386,15 @@ private[hive] object HiveQl {
(db, tableName)
}
+ protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+ tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match {
+ case Seq(tableOnly) => Seq(tableOnly)
+ case Seq(databaseName, table) => Seq(databaseName, table)
+ case other => sys.error("Hive only supports tables names like 'tableName' " +
+ s"or 'databaseName.tableName', found '$other'")
+ }
+ }
+
/**
* SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), (k2))
* is equivalent to
@@ -475,16 +484,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token(".", dbName :: tableName :: Nil) =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue.
- val (db, tableName) = extractDbNameTableName(nameParts.head)
+ val tableIdent = extractTableIdent(nameParts.head)
DescribeCommand(
- UnresolvedRelation(db, tableName, None), extended.isDefined)
+ UnresolvedRelation(tableIdent, None), extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
// It is describing a column with the format like "describe db.table column".
NativePlaceholder
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
- UnresolvedRelation(None, tableName.getText, None),
+ UnresolvedRelation(Seq(tableName.getText), None),
extended.isDefined)
}
}
@@ -757,13 +766,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
nonAliasClauses)
}
- val (db, tableName) =
+ val tableIdent =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
- case Seq(tableOnly) => (None, tableOnly)
- case Seq(databaseName, table) => (Some(databaseName), table)
+ case Seq(tableOnly) => Seq(tableOnly)
+ case Seq(databaseName, table) => Seq(databaseName, table)
+ case other => sys.error("Hive only supports tables names like 'tableName' " +
+ s"or 'databaseName.tableName', found '$other'")
}
val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
- val relation = UnresolvedRelation(db, tableName, alias)
+ val relation = UnresolvedRelation(tableIdent, alias)
// Apply sampling if requested.
(bucketSampleClause orElse splitSampleClause).map {
@@ -882,7 +893,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val Some(tableNameParts) :: partitionClause :: Nil =
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
- val (db, tableName) = extractDbNameTableName(tableNameParts)
+ val tableIdent = extractTableIdent(tableNameParts)
val partitionKeys = partitionClause.map(_.getChildren.map {
// Parse partitions. We also make keys case insensitive.
@@ -892,7 +903,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
- InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite)
case a: ASTNode =>
throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index b2149bd95a..8f2311cf83 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -167,7 +167,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
- logical.collect { case UnresolvedRelation(databaseName, name, _) => name }
+ logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
val referencedTestTables = referencedTables.filter(testTables.contains)
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index fe21454e7f..a547babceb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -53,14 +53,14 @@ case class CreateTableAsSelect(
hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc)
// Get the Metastore Relation
- hiveContext.catalog.lookupRelation(Some(database), tableName, None) match {
+ hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (hiveContext.catalog.tableExists(Some(database), tableName)) {
+ if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6fc4153f6a..6b733a280e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -53,7 +53,7 @@ case class DropTable(
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(None, tableName)
+ hiveContext.catalog.unregisterTable(Seq(tableName))
Seq.empty[Row]
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 4b6a9308b9..a758f921e0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -72,7 +72,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+ catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -123,7 +123,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
intercept[NotImplementedError] {
analyze("tempTable")
}
- catalog.unregisterTable(None, "tempTable")
+ catalog.unregisterTable(Seq("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {