diff options
author | Michael Armbrust <michael@databricks.com> | 2014-05-13 23:27:22 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-13 23:27:22 -0700 |
commit | 6ce0884446d3571fd6e9d967a080a59c657543b1 (patch) | |
tree | 0538e6fe734503c88ea01dfd56d172bf30c27c7a /sql/catalyst | |
parent | 7bb9a521f35eb19576c6cc2da3fd385910270e46 (diff) | |
download | spark-6ce0884446d3571fd6e9d967a080a59c657543b1.tar.gz spark-6ce0884446d3571fd6e9d967a080a59c657543b1.tar.bz2 spark-6ce0884446d3571fd6e9d967a080a59c657543b1.zip |
[SQL] Improve column pruning.
Fixed a bug that was preventing us from ever pruning beneath Joins.
## TPC-DS Q3
### Before:
```
Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2]
Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150)
Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79]
Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43]
HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight
Exchange (HashPartitioning [ss_item_sk#36:30], 150)
HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight
Exchange (HashPartitioning [d_date_sk#6:0], 150)
Filter (d_moy#14:8 = 12)
HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None
Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150)
HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None
Exchange (HashPartitioning [i_item_sk#57:0], 150)
Filter (i_manufact_id#70:13 = 436)
HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None
```
### After
```
Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162]
Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150)
Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239]
Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0]
HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight
Exchange (HashPartitioning [ss_item_sk#196:2], 150)
Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3]
HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight
Exchange (HashPartitioning [d_date_sk#166:0], 150)
Project [d_date_sk#166:0,d_year#172:1]
Filter (d_moy#174:2 = 12)
HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None
Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150)
HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None
Exchange (HashPartitioning [i_item_sk#217:1], 150)
Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2]
Filter (i_manufact_id#230:3 = 436)
HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None
```
Author: Michael Armbrust <michael@databricks.com>
Closes #729 from marmbrus/fixPruning and squashes the following commits:
5feeff0 [Michael Armbrust] Improve column pruning.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3037d45cc6..406ffd6801 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("ConstantFolding", Once, + Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts) :: - Batch("Filter Pushdown", Once, + Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin, @@ -49,17 +49,19 @@ object Optimizer extends RuleExecutor[LogicalPlan] { */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Eliminate attributes that are not needed to calculate the specified aggregates. case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => - // Project away references that are not needed to calculate the required aggregates. a.copy(child = Project(a.references.toSeq, child)) + // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => // Collect the list of off references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) - /** Applies a projection when the child is producing unnecessary attributes */ + + /** Applies a projection only when the child is producing unnecessary attributes */ def prunedChild(c: LogicalPlan) = - if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) { + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { Project(allReferences.filter(c.outputSet.contains).toSeq, c) } else { c @@ -67,6 +69,7 @@ object ColumnPruning extends Rule[LogicalPlan] { Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] { }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection, child) + + // Eliminate no-op Projects + case Project(projectList, child) if(child.output == projectList) => child } } |