diff options
author | Yin Huai <yhuai@databricks.com> | 2015-02-16 15:54:01 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-16 15:54:01 -0800 |
commit | f3ff1eb2985ff3e1567645b898f6b42e4b01f237 (patch) | |
tree | d5668f0aa079bba14ab800df416a47de0b32f865 /sql/hive | |
parent | 5b6cd65cd611b1a46a7d5eb33139c6224b96264e (diff) | |
download | spark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.tar.gz spark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.tar.bz2 spark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.zip |
[SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and aliases of data source tables.
JIRA: https://issues.apache.org/jira/browse/SPARK-5839
Author: Yin Huai <yhuai@databricks.com>
Closes #4626 from yhuai/SPARK-5839 and squashes the following commits:
f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation.
2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-5839
f1ba6ca [Yin Huai] Address comment.
2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 18 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 34 |
2 files changed, 50 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 72211fe2e4..87bc9fe4fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } if (table.getProperty("spark.sql.sources.provider") != null) { - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + val dataSourceTable = + cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + // Then, if alias is specified, wrap the table with a Subquery using the alias. + // Othersie, wrap the table with a Subquery using the table name. + val withAlias = + alias.map(a => Subquery(a, dataSourceTable)).getOrElse( + Subquery(tableIdent.last, dataSourceTable)) + + withAlias } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. @@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) lastPlan.transformUp { - case r: MetastoreRelation if r == relation => parquetRelation + case r: MetastoreRelation if r == relation => { + val withAlias = + r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( + Subquery(r.tableName, parquetRelation)) + + withAlias + } case other => other.transformExpressions { case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 375aae5d51..0263e3bb56 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE jsonTable").collect().foreach(println) } + test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") { + val originalDefaultSource = conf.defaultDataSourceName + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") + // Save the df as a managed table (by not specifiying the path). + df.saveAsTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str${i}"))) + + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str${i}"))) + + invalidateTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str${i}"))) + + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str${i}"))) + + // Drop table will also delete the data. + sql("DROP TABLE savedJsonTable") + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + } + test("save table") { val originalDefaultSource = conf.defaultDataSourceName |