diff options
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa8af..cfd6f27371 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) // Read path case p @ PhysicalOperation(_, _, relation: MetastoreRelation) if hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) } + val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap + val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) + // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes // attribute IDs referenced in other nodes. - toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) => - val parquetRelation = convertToParquetRelation(relation) - val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) - - lastPlan.transformUp { - case r: MetastoreRelation if r == relation => { - val withAlias = - r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( - Subquery(r.tableName, parquetRelation)) - - withAlias - } - case other => other.transformExpressions { - case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) - } + plan.transformUp { + case r: MetastoreRelation if relationMap.contains(r) => { + val parquetRelation = relationMap(r) + val withAlias = + r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( + Subquery(r.tableName, parquetRelation)) + + withAlias + } + case other => other.transformExpressions { + case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) } } } |