aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-25 17:40:19 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-25 17:40:19 -0700
commit8c3b0052f4792d97d23244ade335676e37cb1fae (patch)
tree4213930cba7701afb2d0118682372febc2814b14 /sql/hive
parentd44a3362ed8cf3068f8ff233e13851a39da42219 (diff)
downloadspark-8c3b0052f4792d97d23244ade335676e37cb1fae.tar.gz
spark-8c3b0052f4792d97d23244ade335676e37cb1fae.tar.bz2
spark-8c3b0052f4792d97d23244ade335676e37cb1fae.zip
[SPARK-6450] [SQL] Fixes metastore Parquet table conversion
The `ParquetConversions` analysis rule generates a hash map, which maps from the original `MetastoreRelation` instances to the newly created `ParquetRelation2` instances. However, `MetastoreRelation.equals` doesn't compare output attributes. Thus, if a single metastore Parquet table appears multiple times in a query, only a single entry ends up in the hash map, and the conversion is not correctly performed. Proper fix for this issue should be overriding `equals` and `hashCode` for MetastoreRelation. Unfortunately, this breaks more tests than expected. It's possible that these tests are ill-formed from the very beginning. As 1.3.1 release is approaching, we'd like to make the change more surgical to avoid potential regressions. The proposed fix here is to make both the metastore relations and their output attributes as keys in the hash map used in ParquetConversions. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5183) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5183 from liancheng/spark-6450 and squashes the following commits: 3536780 [Cheng Lian] Fixes metastore Parquet table conversion
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala34
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala25
2 files changed, 43 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4c5eb48661..d1a99555e9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
}
+ // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
+ // their output attributes as the key of the map. This is because MetastoreRelation.equals
+ // doesn't take output attributes into account, thus multiple MetastoreRelation instances
+ // pointing to the same table get collapsed into a single entry in the map. A proper fix for
+ // this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
- case r: MetastoreRelation if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
- val withAlias =
- r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
- Subquery(r.tableName, parquetRelation))
+ case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
+ val alias = r.alias.getOrElse(r.tableName)
+ Subquery(alias, parquetRelation)
- withAlias
- }
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
- if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
+ if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
- }
+
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
- if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
+ if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
- }
+
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 8a31bd0309..432d65a874 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -365,6 +365,31 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE IF EXISTS test_insert_parquet")
}
+
+ test("SPARK-6450 regression test") {
+ sql(
+ """CREATE TABLE IF NOT EXISTS ms_convert (key INT)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // This shouldn't throw AnalysisException
+ val analyzed = sql(
+ """SELECT key FROM ms_convert
+ |UNION ALL
+ |SELECT key FROM ms_convert
+ """.stripMargin).queryExecution.analyzed
+
+ assertResult(2) {
+ analyzed.collect {
+ case r @ LogicalRelation(_: ParquetRelation2) => r
+ }.size
+ }
+
+ sql("DROP TABLE ms_convert")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {