aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-16 15:54:01 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-16 15:54:01 -0800
commitf3ff1eb2985ff3e1567645b898f6b42e4b01f237 (patch)
treed5668f0aa079bba14ab800df416a47de0b32f865 /sql/hive
parent5b6cd65cd611b1a46a7d5eb33139c6224b96264e (diff)
downloadspark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.tar.gz
spark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.tar.bz2
spark-f3ff1eb2985ff3e1567645b898f6b42e4b01f237.zip
[SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and aliases of data source tables.
JIRA: https://issues.apache.org/jira/browse/SPARK-5839 Author: Yin Huai <yhuai@databricks.com> Closes #4626 from yhuai/SPARK-5839 and squashes the following commits: f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation. 2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-5839 f1ba6ca [Yin Huai] Address comment. 2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala34
2 files changed, 50 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 72211fe2e4..87bc9fe4fe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
if (table.getProperty("spark.sql.sources.provider") != null) {
- cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ val dataSourceTable =
+ cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ // Then, if alias is specified, wrap the table with a Subquery using the alias.
+ // Othersie, wrap the table with a Subquery using the table name.
+ val withAlias =
+ alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
+ Subquery(tableIdent.last, dataSourceTable))
+
+ withAlias
} else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
@@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
lastPlan.transformUp {
- case r: MetastoreRelation if r == relation => parquetRelation
+ case r: MetastoreRelation if r == relation => {
+ val withAlias =
+ r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+ Subquery(r.tableName, parquetRelation))
+
+ withAlias
+ }
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 375aae5d51..0263e3bb56 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE jsonTable").collect().foreach(println)
}
+ test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") {
+ val originalDefaultSource = conf.defaultDataSourceName
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ val df = jsonRDD(rdd)
+
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+ // Save the df as a managed table (by not specifiying the path).
+ df.saveAsTable("savedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+ (1 to 4).map(i => Row(i, s"str${i}")))
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+ (6 to 10).map(i => Row(i, s"str${i}")))
+
+ invalidateTable("savedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+ (1 to 4).map(i => Row(i, s"str${i}")))
+
+ checkAnswer(
+ sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+ (6 to 10).map(i => Row(i, s"str${i}")))
+
+ // Drop table will also delete the data.
+ sql("DROP TABLE savedJsonTable")
+
+ conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+ }
+
test("save table") {
val originalDefaultSource = conf.defaultDataSourceName