aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala37
1 files changed, 20 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0e43faa8af..cfd6f27371 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- relation
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = relation.output.zip(parquetRelation.output)
+ (relation, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- relation
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = relation.output.zip(parquetRelation.output)
+ (relation, parquetRelation, attributedRewrites)
}
+ val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
+ val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
+
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
- toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
-
- lastPlan.transformUp {
- case r: MetastoreRelation if r == relation => {
- val withAlias =
- r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
- Subquery(r.tableName, parquetRelation))
-
- withAlias
- }
- case other => other.transformExpressions {
- case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
- }
+ plan.transformUp {
+ case r: MetastoreRelation if relationMap.contains(r) => {
+ val parquetRelation = relationMap(r)
+ val withAlias =
+ r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+ Subquery(r.tableName, parquetRelation))
+
+ withAlias
+ }
+ case other => other.transformExpressions {
+ case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
}
}