aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-02-17 12:23:18 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 12:23:18 -0800
commit4611de1cef7363bc71ec608560dfd866ae477747 (patch)
tree6f7e0e1dc318bdc9b82719d41e1514a240898389 /sql/hive
parent31efb39c1deb253032b38e8fbafde4b2b1dde1f6 (diff)
downloadspark-4611de1cef7363bc71ec608560dfd866ae477747.tar.gz
spark-4611de1cef7363bc71ec608560dfd866ae477747.tar.bz2
spark-4611de1cef7363bc71ec608560dfd866ae477747.zip
[SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog
Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the given plan multiple times if there are many Metastore Parquet tables. Since the transformUp operation is recursive, it should be better to only perform it once. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #4651 from viirya/parquet_atonce and squashes the following commits: c1ed29d [Liang-Chi Hsieh] Fix bug. e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala37
1 files changed, 20 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0e43faa8af..cfd6f27371 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- relation
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = relation.output.zip(parquetRelation.output)
+ (relation, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- relation
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = relation.output.zip(parquetRelation.output)
+ (relation, parquetRelation, attributedRewrites)
}
+ val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
+ val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
+
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
- toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
- val parquetRelation = convertToParquetRelation(relation)
- val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
-
- lastPlan.transformUp {
- case r: MetastoreRelation if r == relation => {
- val withAlias =
- r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
- Subquery(r.tableName, parquetRelation))
-
- withAlias
- }
- case other => other.transformExpressions {
- case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
- }
+ plan.transformUp {
+ case r: MetastoreRelation if relationMap.contains(r) => {
+ val parquetRelation = relationMap(r)
+ val withAlias =
+ r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+ Subquery(r.tableName, parquetRelation))
+
+ withAlias
+ }
+ case other => other.transformExpressions {
+ case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
}
}