aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala34
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala25
2 files changed, 43 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4c5eb48661..d1a99555e9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
- (relation, parquetRelation, attributedRewrites)
+ (relation -> relation.output, parquetRelation, attributedRewrites)
}
+ // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
+ // their output attributes as the key of the map. This is because MetastoreRelation.equals
+ // doesn't take output attributes into account, thus multiple MetastoreRelation instances
+ // pointing to the same table get collapsed into a single entry in the map. A proper fix for
+ // this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
- case r: MetastoreRelation if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
- val withAlias =
- r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
- Subquery(r.tableName, parquetRelation))
+ case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
+ val alias = r.alias.getOrElse(r.tableName)
+ Subquery(alias, parquetRelation)
- withAlias
- }
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
- if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
+ if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
- }
+
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
- if relationMap.contains(r) => {
- val parquetRelation = relationMap(r)
+ if relationMap.contains(r -> r.output) =>
+ val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
- }
+
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 8a31bd0309..432d65a874 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -365,6 +365,31 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE IF EXISTS test_insert_parquet")
}
+
+ test("SPARK-6450 regression test") {
+ sql(
+ """CREATE TABLE IF NOT EXISTS ms_convert (key INT)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // This shouldn't throw AnalysisException
+ val analyzed = sql(
+ """SELECT key FROM ms_convert
+ |UNION ALL
+ |SELECT key FROM ms_convert
+ """.stripMargin).queryExecution.analyzed
+
+ assertResult(2) {
+ analyzed.collect {
+ case r @ LogicalRelation(_: ParquetRelation2) => r
+ }.size
+ }
+
+ sql("DROP TABLE ms_convert")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {